This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.

Python API Tutorial

In this guide we will start from scratch and go from setting up a Flink Python project to running a Python Table API program.

Setting up a Python Project

You can begin by creating a Python project and installing the PyFlink package. PyFlink is available via PyPI and can be easily installed using pip.

# install the latest 1.9 version of PyFlink
$ python -m pip install apache-flink==1.9.*

You can also build PyFlink from source by following the development guide.

The first step in a Flink Python Table API program is to create a BatchTableEnvironment (or StreamTableEnvironment if you are writing a streaming job). It is the main entry point for Python Table API jobs.

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

The ExecutionEnvironment (or StreamExecutionEnvironment if you are writing a streaming job) can be used to set execution parameters, such as the restart strategy, default parallelism, etc.

The TableConfig can be used by setting the parameters such as the built-in catalog name, the threshold where generating code, etc.

Next we will create a source table and a sink table.

t_env.connect(FileSystem().path('/tmp/input')) \
    .with_format(OldCsv()
                 .field('word', DataTypes.STRING())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())) \
    .register_table_source('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
    .with_format(OldCsv()
                 .field_delimiter('\t')
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .register_table_sink('mySink')

You can also use the TableEnvironment.sql_update() method to register a source/sink table defined in DDL:

my_source_ddl = """
    create table mySource (
        word VARCHAR
    ) with (
        'connector.type' = 'filesystem',
        'format.type' = 'csv',
        'connector.path' = '/tmp/input'
    )
"""

my_sink_ddl = """
    create table mySink (
        word VARCHAR,
        `count` BIGINT
    ) with (
        'connector.type' = 'filesystem',
        'format.type' = 'csv',
        'connector.path' = '/tmp/output'
    )
"""

t_env.sql_update(my_source_ddl)
t_env.sql_update(my_sink_ddl)

This registers a table named mySource and a table named mySink in the ExecutionEnvironment. The table mySource has only one column: word. It represents the words read from file /tmp/input. The table mySink has two columns: word and count. It writes data to file /tmp/output, with \t as the field delimiter.

Then we need to create a job which reads input from table mySource, preforms some operations and writes the results to table mySink.

t_env.scan('mySource') \
    .group_by('word') \
    .select('word, count(1)') \
    .insert_into('mySink')

The last thing is to start the actual Flink Python Table API job. All operations, such as creating sources, transformations and sinks only build up a graph of internal operations. Only when t_env.execute(job_name) is called, this graph of operations will be thrown on a cluster or executed on your local machine.

t_env.execute("tutorial_job")

The complete code so far is as follows:

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

t_env.connect(FileSystem().path('/tmp/input')) \
    .with_format(OldCsv()
                 .field('word', DataTypes.STRING())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())) \
    .register_table_source('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
    .with_format(OldCsv()
                 .field_delimiter('\t')
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .register_table_sink('mySink')

t_env.scan('mySource') \
    .group_by('word') \
    .select('word, count(1)') \
    .insert_into('mySink')

t_env.execute("tutorial_job")

Firstly, you need to prepare input data in the “/tmp/input” file. You can choose the following command line to prepare the input data:

$ echo "flink\npyflink\nflink" > /tmp/input

Next, you can run this example on the command line (Note: if the result file “/tmp/output” has already existed, you need to remove the file before running the example):

$ python WordCount.py

The command builds and runs the Python Table API program in a local mini cluster. You can also submit the Python Table API program to a remote cluster, you can refer Job Submission Examples for more details.

Finally, you can see the execution result on the command line:

$ cat /tmp/output
flink	2
pyflink	1

This should get you started with writing your own Flink Python Table API programs. To learn more about the Python Table API, you can refer Flink Python Table API Docs for more details.