This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.
DataStream programs in Flink are regular programs that implement transformations on data streams
(e.g., filtering, updating state, defining windows, aggregating). The data streams are initially
created from various sources (e.g., message queues, socket streams, files). Results are returned via
sinks, which may for example write the data to files, or to standard output (for example the command
line terminal).
Python DataStream API is a Python version of DataStream API which allows Python users could write
Python DatStream API jobs.
The StreamExecutionEnvironment is a central concept of the DataStream API program.
The following code example shows how to create a StreamExecutionEnvironment:
The DataStream API gets its name from the special DataStream class that is
used to represent a collection of data in a Flink program. You can think of
them as immutable collections of data that can contain duplicates. This data
can either be finite or unbounded, the API that you use to work on them is the
same.
A DataStream is similar to a regular Python Collection in terms of usage but
is quite different in some key ways. They are immutable, meaning that once they
are created you cannot add or remove elements. You can also not simply inspect
the elements inside but only work on them using the DataStream API
operations, which are also called transformations.
You can create an initial DataStream by adding a source in a Flink program.
Then you can derive new streams from this and combine them by using API methods
such as map, filter, and so on.
Create from a list object
You can create a DataStream from a list object:
The parameter type_info is optional, if not specified, the output type of the returned DataStream
will be Types.PICKLED_BYTE_ARRAY().
Create using DataStream connectors
You can also create a DataStream using DataStream connectors with method add_source as following:
Note It currently only supports FlinkKafkaConsumer to be
used as DataStream source connectors.
Create using Table & SQL connectors
Table & SQL connectors could also be used to create a DataStream. You could firstly create a
Table using Table & SQL connectors and then convert it to a DataStream.
Note The StreamExecutionEnvironment env should be specified
when creating the TableEnvironment t_env.
Note As all the Java Table & SQL connectors could be used in
PyFlink Table API, this means that all of them could also be used in PyFlink DataStream API.
Operators transform one or more DataStream into a new DataStream. Programs can combine multiple
transformations into sophisticated dataflow topologies.
The following example shows a simple example about how to convert a DataStream into another
DataStream using map transformation:
Please see operators for an overview of the
available DataStream transformations.
Conversion between DataStream and Table
It also supports to convert a DataStream to a Table and vice verse.
You can call the print method to print the data of a DataStream to the standard output:
Emit results to a DataStream sink connector
You can call the add_sink method to emit the data of a DataStream to a DataStream sink connector:
Note It currently only supports FlinkKafkaProducer,
JdbcSink and StreamingFileSink to be used as DataStream sink connectors.
Emit results to a Table & SQL sink connector
Table & SQL connectors could also be used to write out a DataStream. You need firstly convert a
DataStream to a Table and then write it to a Table & SQL sink connector.
Note The output type of DataStream ds must be composite type.
Submit Job
Finally, you should call the StreamExecutionEnvironment.execute method to submit the DataStream
API job for execution:
If you convert the DataStream to a Table and then write it to a Table API & SQL sink connector,
it may happen that you need to submit the job using TableEnvironment.execute method.