Analysis streaming programs in Flink are regular programs that implement transformations on streaming data sets (e.g., filtering, mapping, joining, grouping). The streaming data sets are initially created from certain sources (e.g., by reading files, or from collections). Results are returned via sinks, which may for example write the data to (distributed) files, or to standard output (for example the command line terminal). Flink streaming programs run in a variety of contexts, standalone, or embedded in other programs. The execution can happen in a local JVM, or on clusters of many machines.
In order to create your own Flink streaming program, we encourage you to start with the program skeleton and gradually add your own transformations. The remaining sections act as references for additional operations and advanced features.
Flink Python streaming API uses Jython framework (see http://www.jython.org/archive/21/docs/whatis.html) to drive the execution of a given script. The Python streaming layer, is actually a thin wrapper layer for the existing Java streaming APIs.
There are two main constraints for using Jython:
The following streaming program is a complete, working example of WordCount. You can copy & paste the code to run it locally (see notes later in this section). It counts the number of each word (case insensitive) in a stream of sentences, on a window size of 50 milliseconds and prints the results into the standard output.
Notes:
.out
file. If the script is executed inside the IntelliJ IDE,
then the output will be displayed in the console tab.As we already saw in the example, Flink streaming programs look like regular Python programs. Each program consists of the same basic parts:
main(factory)
function definition, with an environment factory argument - the program entry point,Environment
from the factory,We will now give an overview of each of those steps but please refer to the respective sections for more details.
The main(factory)
function is a must and it is used by Flink execution layer to run the
given Python streaming program.
The Environment
is the basis for all Flink programs. You can
obtain one using the factory methods provided by the factory:
For specifying data sources the streaming execution environment has several methods. To just read a text file as a sequence of lines, you can use:
This will give you a DataStream on which you can then apply transformations. For more information on data sources and input formats, please refer to Data Sources.
Once you have a DataStream you can apply transformations to create a new DataStream which you can then write to a file, transform again, or combine with other DataStreams. You apply transformations by calling methods on DataStream with your own custom transformation function. For example, a map transformation looks like this:
This will create a new DataStream by doubling every value in the original DataStream. For more information and a list of all the transformations, please refer to Transformations.
Once you have a DataStream that needs to be written to disk you can call one of these methods on DataStream:
The last method is only useful for developing/debugging on a local machine, it will output the contents of the DataSet to standard output. (Note that in a cluster, the result goes to the standard out stream of the cluster nodes and ends up in the .out files of the workers). The first two do as the name suggests. Please refer to Data Sinks for more information on writing to files.
Once you specified the complete program you need to call execute
on
the Environment
. This will either execute on your local machine or submit your program
for execution on a cluster, depending on how Flink was started.
Apart from setting up Flink, no additional work is required. Using Jython to execute the Python script, means that no external packages are needed and the program is executed as if it was a jar file.
The Python API was tested on Windows/Linux/OSX systems.
All Flink programs are executed lazily: When the program’s main method is executed, the data loading
and transformations do not happen directly. Rather, each operation is created and added to the
program’s plan. The operations are actually executed when one of the execute()
methods is invoked
on the Environment object. Whether the program is executed locally or on a cluster depends
on the environment of the program.
The lazy evaluation lets you construct sophisticated programs that Flink executes as one holistically planned unit.
Data transformations transform one or more DataStreams into a new DataStream. Programs can combine multiple transformations into sophisticated assemblies.
This section gives a brief overview of the available transformations. The transformations documentation has a full description of all transformations with examples.
Transformation | Description |
---|---|
Map PythonDataStream → PythonDataStream |
Takes one element and produces one element. |
FlatMap PythonDataStream → PythonDataStream |
Takes one element and produces zero, one, or more elements. |
Filter PythonDataStream → PythonDataStream |
Evaluates a boolean function for each element and retains those for which the function returns true. |
KeyBy PythonDataStream → PythonKeyedStream |
Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning. See keys on how to specify keys. This transformation returns a PythonKeyedDataStream. |
Reduce PythonKeyedStream → PythonDataStream |
A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value. |
Window PythonKeyedStream → PythonWindowedStream |
Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows. |
Window Apply PythonWindowedStream → PythonDataStream |
Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window. |
Window Reduce PythonWindowedStream → PythonDataStream |
Applies a functional reduce function to the window and returns the reduced value. |
Union PythonDataStream* → PythonDataStream |
Union of two or more data streams creating a new stream containing all the elements from all the streams. Note: If you union a data stream with itself you will get each element twice in the resulting stream. |
Split PythonDataStream → PythonSplitStream |
Split the stream into two or more streams according to some criterion. |
Select SplitStream → DataStream |
Select one or more streams from a split stream. |
Iterate PythonDataStream → PythonIterativeStream → PythonDataStream |
Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See iterations for a complete description. |
Certain operations require user-defined functions as arguments. All the functions should be defined as Python classes that derived from the relevant Flink function. User-defined functions are serialized and sent over to the TaskManagers for execution.
Rich functions (.e.g RichFilterFunction
) enable to define (override) the optional operations: open
& close
.
The user may use these functions for initialization and cleanups.
The open
function is called by the worker before starting the streaming pipeline.
The close
function is called by the worker after the streaming pipeline is stopped.
Flink’s Python Streaming API offers support for primitive Python types (int, float, bool, string), as well as byte arrays and user-defined classes.
You can use the tuples (or lists) for composite types. Python tuples are mapped to Jython native corresponding types, which are handled by the Python wrapper thin layer.
Data sources create the initial data streams, such as from files or from collections.
File-based:
read_text_file(path)
- reads files line wise and returns them as a stream of Strings.Collection-based:
from_elements(*args)
- Creates a data stream from all the elements.generate_sequence(from, to)
- Generates the sequence of numbers in the given interval, in parallel.Examples
Data sinks consume DataStreams and are used to store or return them:
write_as_text()
- Writes elements line-wise as Strings. The Strings are
obtained by calling the str() method of each element.output()
- Prints the str() value of each element on the
standard out.write_to_socket()
- Writes the DataStream to a socket [host:port] as a byte array.A DataStream can be input to multiple operations. Programs can write or print a data stream and at the same time run additional transformations on them.
Examples
Standard data sink methods:
This section describes how the parallel execution of programs can be configured in Flink. A Flink program consists of multiple tasks (operators, data sources, and sinks). A task is split into several parallel instances for execution and each parallel instance processes a subset of the task’s input data. The number of parallel instances of a task is called its parallelism or degree of parallelism (DOP).
The degree of parallelism of a task can be specified in Flink on different levels.
Flink programs are executed in the context of an execution environment. An execution environment defines a default parallelism for all operators, data sources, and data sinks it executes. Execution environment parallelism can be overwritten by explicitly configuring the parallelism of an operator.
The default parallelism of an execution environment can be specified by calling the
set_parallelism()
method. To execute all operators, data sources, and data sinks of the
WordCount example program with a parallelism of 3
, set the default parallelism of the
execution environment as follows:
A system-wide default parallelism for all execution environments can be defined by setting the
parallelism.default
property in ./conf/flink-conf.yaml
. See the
Configuration documentation for details.
To run the plan with Flink, go to your Flink distribution, and run the pyflink-stream.sh script from the /bin
folder. The script containing the plan has to be passed as the first argument, followed by a number of additional
Python packages, and finally, separated by -
additional arguments that will be fed to the script.