pyflink.datastream package¶
Module contents¶
Important classes of Flink Streaming API:
StreamExecutionEnvironment
: The context in which a streaming program is executed.
CheckpointConfig
: Configuration that captures all checkpointing related settings.
CheckpointingMode
: Defines what consistency guarantees the system gives in the presence of failures.
CoMapFunction
: Implements a map transformation over two connected streams.
CoFlatMapFunction
: Implements a flat-map transformation over two connected streams.
DataStream
: Represents a stream of elements of the same type. A DataStream can be transformed into another DataStream by applying a transformation
FlatMapFunction
: FlatMap functions take elements and transform them, into zero, one, or more elements.
FilterFunction
: A filter function is a predicate applied individually to each record.
KeySelector
: The extractor takes an object and returns the deterministic key for that object.
Partitioner
: Function to implement a custom partition assignment for keys.
ReduceFunction
: Reduce functions combine groups of elements to a single value.
SinkFunction
: Interface for implementing user defined sink functionality.
SourceFunction
: Interface for implementing user defined source functionality.
StateBackend
: Defines how the state of a streaming application is stored and checkpointed.
MapFunction
: Map functions take elements and transform them, element wise.
MemoryStateBackend
: This state backend holds the working state in the memory (JVM heap) of the TaskManagers.
FsStateBackend
: The state backend checkpoints state as files to a file system.
RocksDBStateBackend
: A State Backend that stores its state in RocksDB.
CustomStateBackend
: A wrapper of customized java state backend created from the provided StateBackendFactory.
PredefinedOptions
: Configuration settings for the RocksDBStateBackend.
ExternalizedCheckpointCleanup
: Cleanup behaviour for externalized checkpoints when the job is cancelled.
TimeCharacteristic
: The time characteristic defines how the system determines time for time-dependent order and operations that depend on time (such as time windows).
-
class
pyflink.datastream.
StreamExecutionEnvironment
(j_stream_execution_environment, serializer=PickleSerializer())[source]¶ Bases:
object
The StreamExecutionEnvironment is the context in which a streaming program is executed. A LocalStreamEnvironment will cause execution in the attached JVM, a RemoteStreamEnvironment will cause execution on a remote setup.
The environment provides methods to control the job execution (such as setting the parallelism or the fault tolerance/checkpointing parameters) and to interact with the outside world (data access).
-
add_classpaths
(*classpaths: str)[source]¶ Adds a list of URLs that are added to the classpath of each user code classloader of the program. Paths must specify a protocol (e.g. file://) and be accessible on all nodes
- Parameters
classpaths – Classpaths that will be added.
-
add_default_kryo_serializer
(type_class_name: str, serializer_class_name: str)[source]¶ Adds a new Kryo default serializer to the Runtime.
Example:
>>> env.add_default_kryo_serializer("com.aaa.bbb.TypeClass", "com.aaa.bbb.Serializer")
- Parameters
type_class_name – The full-qualified java class name of the types serialized with the given serializer.
serializer_class_name – The full-qualified java class name of the serializer to use.
-
add_jars
(*jars_path: str)[source]¶ Adds a list of jar files that will be uploaded to the cluster and referenced by the job.
- Parameters
jars_path – Path of jars.
-
add_python_archive
(archive_path: str, target_dir: str = None)[source]¶ Adds a python archive file. The file will be extracted to the working directory of python UDF worker.
If the parameter “target_dir” is specified, the archive file will be extracted to a directory named ${target_dir}. Otherwise, the archive file will be extracted to a directory with the same name of the archive file.
If python UDF depends on a specific python version which does not exist in the cluster, this method can be used to upload the virtual environment. Note that the path of the python interpreter contained in the uploaded environment should be specified via the method
pyflink.table.TableConfig.set_python_executable()
.The files uploaded via this method are also accessible in UDFs via relative path.
Example:
# command executed in shell # assert the relative path of python interpreter is py_env/bin/python $ zip -r py_env.zip py_env # python code >>> stream_env.add_python_archive("py_env.zip") >>> stream_env.set_python_executable("py_env.zip/py_env/bin/python") # or >>> stream_env.add_python_archive("py_env.zip", "myenv") >>> stream_env.set_python_executable("myenv/py_env/bin/python") # the files contained in the archive file can be accessed in UDF >>> def my_udf(): ... with open("myenv/py_env/data/data.txt") as f: ... ...
Note
Please make sure the uploaded python environment matches the platform that the cluster is running on and that the python version must be 3.5 or higher.
Note
Currently only zip-format is supported. i.e. zip, jar, whl, egg, etc. The other archive formats such as tar, tar.gz, 7z, rar, etc are not supported.
- Parameters
archive_path – The archive file path.
target_dir – Optional, the target dir name that the archive file extracted to.
-
add_python_file
(file_path: str)[source]¶ Adds a python dependency which could be python files, python packages or local directories. They will be added to the PYTHONPATH of the python UDF worker. Please make sure that these dependencies can be imported.
- Parameters
file_path – The path of the python dependency.
-
add_source
(source_func: pyflink.datastream.functions.SourceFunction, source_name: str = 'Custom Source', type_info: pyflink.common.typeinfo.TypeInformation = None) → pyflink.datastream.data_stream.DataStream[source]¶ Adds a data source to the streaming topology.
- Parameters
source_func – the user defined function.
source_name – name of the data source. Optional.
type_info – type of the returned stream. Optional.
- Returns
the data stream constructed.
-
disable_operator_chaining
() → pyflink.datastream.stream_execution_environment.StreamExecutionEnvironment[source]¶ Disables operator chaining for streaming operators. Operator chaining allows non-shuffle operations to be co-located in the same thread fully avoiding serialization and de-serialization.
- Returns
This object.
-
enable_checkpointing
(interval: int, mode: pyflink.datastream.checkpointing_mode.CheckpointingMode = None) → pyflink.datastream.stream_execution_environment.StreamExecutionEnvironment[source]¶ Enables checkpointing for the streaming job. The distributed state of the streaming dataflow will be periodically snapshotted. In case of a failure, the streaming dataflow will be restarted from the latest completed checkpoint.
The job draws checkpoints periodically, in the given interval. The system uses the given
CheckpointingMode
for the checkpointing (“exactly once” vs “at least once”). The state will be stored in the configured state backend.Note
Checkpointing iterative streaming dataflows in not properly supported at the moment. For that reason, iterative jobs will not be started if used with enabled checkpointing.
Example:
>>> env.enable_checkpointing(300000, CheckpointingMode.AT_LEAST_ONCE)
- Parameters
interval – Time interval between state checkpoints in milliseconds.
mode – The checkpointing mode, selecting between “exactly once” and “at least once” guaranteed.
- Returns
This object.
-
execute
(job_name: str = None) → pyflink.common.job_execution_result.JobExecutionResult[source]¶ Triggers the program execution. The environment will execute all parts of the program that have resulted in a “sink” operation. Sink operations are for example printing results or forwarding them to a message queue.
The program execution will be logged and displayed with the provided name
- Parameters
job_name – Desired name of the job, optional.
- Returns
The result of the job execution, containing elapsed time and accumulators.
-
execute_async
(job_name: str = 'Flink Streaming Job') → pyflink.common.job_client.JobClient[source]¶ Triggers the program asynchronously. The environment will execute all parts of the program that have resulted in a “sink” operation. Sink operations are for example printing results or forwarding them to a message queue. The program execution will be logged and displayed with a generated default name.
- Parameters
job_name – Desired name of the job.
- Returns
A JobClient that can be used to communicate with the submitted job, completed on submission succeeded.
-
from_collection
(collection: List[Any], type_info: pyflink.common.typeinfo.TypeInformation = None) → pyflink.datastream.data_stream.DataStream[source]¶ Creates a data stream from the given non-empty collection. The type of the data stream is that of the elements in the collection.
Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with parallelism one.
- Parameters
collection – The collection of elements to create the data stream from.
type_info – The TypeInformation for the produced data stream
- Returns
the data stream representing the given collection.
-
get_buffer_timeout
() → int[source]¶ Gets the maximum time frequency (milliseconds) for the flushing of the output buffers. For clarification on the extremal values see
set_buffer_timeout()
.- Returns
The timeout of the buffer.
-
get_checkpoint_config
() → pyflink.datastream.checkpoint_config.CheckpointConfig[source]¶ Gets the checkpoint config, which defines values like checkpoint interval, delay between checkpoints, etc.
- Returns
The
CheckpointConfig
.
-
get_checkpoint_interval
() → int[source]¶ Returns the checkpointing interval or -1 if checkpointing is disabled.
Shorthand for get_checkpoint_config().get_checkpoint_interval().
- Returns
The checkpointing interval or -1.
-
get_checkpointing_mode
() → pyflink.datastream.checkpointing_mode.CheckpointingMode[source]¶ Returns the checkpointing mode (exactly-once vs. at-least-once).
Shorthand for get_checkpoint_config().get_checkpointing_mode().
- Returns
The
CheckpointingMode
.
-
get_config
() → pyflink.common.execution_config.ExecutionConfig[source]¶ Gets the config object.
- Returns
The
ExecutionConfig
object.
-
get_default_local_parallelism
() → int[source]¶ Gets the default parallelism that will be used for the local execution environment.
- Returns
The default local parallelism.
-
static
get_execution_environment
() → pyflink.datastream.stream_execution_environment.StreamExecutionEnvironment[source]¶ Creates an execution environment that represents the context in which the program is currently executed. If the program is invoked standalone, this method returns a local execution environment.
- Returns
The execution environment of the context in which the program is executed.
-
get_execution_plan
() → str[source]¶ Creates the plan with which the system will execute the program, and returns it as a String using a JSON representation of the execution data flow graph. Note that this needs to be called, before the plan is executed.
If the compiler could not be instantiated, or the master could not be contacted to retrieve information relevant to the execution planning, an exception will be thrown.
- Returns
The execution plan of the program, as a JSON String.
-
get_max_parallelism
() → int[source]¶ Gets the maximum degree of parallelism defined for the program.
The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also defines the number of key groups used for partitioned state.
- Returns
Maximum degree of parallelism.
-
get_parallelism
() → int[source]¶ Gets the parallelism with which operation are executed by default. Operations can individually override this value to use a specific parallelism.
- Returns
The parallelism used by operations, unless they override that value.
-
get_restart_strategy
() → pyflink.common.restart_strategy.RestartStrategyConfiguration[source]¶ Returns the specified restart strategy configuration.
- Returns
The restart strategy configuration to be used.
-
get_state_backend
() → pyflink.datastream.state_backend.StateBackend[source]¶ Gets the state backend that defines how to store and checkpoint state.
See also
- Returns
The
StateBackend
.
-
get_stream_time_characteristic
() → pyflink.datastream.time_characteristic.TimeCharacteristic[source]¶ Gets the time characteristic.
See also
- Returns
The
TimeCharacteristic
.
-
is_chaining_enabled
() → bool[source]¶ Returns whether operator chaining is enabled.
- Returns
True if chaining is enabled, false otherwise.
-
read_text_file
(file_path: str, charset_name: str = 'UTF-8') → pyflink.datastream.data_stream.DataStream[source]¶ Reads the given file line-by-line and creates a DataStream that contains a string with the contents of each such line. The charset with the given name will be used to read the files.
Note that this interface is not fault tolerant that is supposed to be used for test purpose.
- Parameters
file_path – The path of the file, as a URI (e.g., “file:///some/local/file” or “hdfs://host:port/file/path”)
charset_name – The name of the character set used to read the file.
- Returns
The DataStream that represents the data read from the given file as text lines.
-
register_type
(type_class_name: str)[source]¶ Registers the given type with the serialization stack. If the type is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the type ends up being serialized with Kryo, then it will be registered at Kryo to make sure that only tags are written.
Example:
>>> env.register_type("com.aaa.bbb.TypeClass")
- Parameters
type_class_name – The full-qualified java class name of the type to register.
-
register_type_with_kryo_serializer
(type_class_name: str, serializer_class_name: str)[source]¶ Registers the given Serializer via its class as a serializer for the given type at the KryoSerializer.
Example:
>>> env.register_type_with_kryo_serializer("com.aaa.bbb.TypeClass", ... "com.aaa.bbb.Serializer")
- Parameters
type_class_name – The full-qualified java class name of the types serialized with the given serializer.
serializer_class_name – The full-qualified java class name of the serializer to use.
-
set_buffer_timeout
(timeout_millis: int) → pyflink.datastream.stream_execution_environment.StreamExecutionEnvironment[source]¶ Sets the maximum time frequency (milliseconds) for the flushing of the output buffers. By default the output buffers flush frequently to provide low latency and to aid smooth developer experience. Setting the parameter can result in three logical modes:
A positive integer triggers flushing periodically by that integer
0 triggers flushing after every record thus minimizing latency
-1 triggers flushing only when the output buffer is full thus maximizing throughput
- Parameters
timeout_millis – The maximum time between two output flushes.
- Returns
This object.
-
set_default_local_parallelism
(parallelism: int)[source]¶ Sets the default parallelism that will be used for the local execution environment.
- Parameters
parallelism – The parallelism to use as the default local parallelism.
-
set_max_parallelism
(max_parallelism: int) → pyflink.datastream.stream_execution_environment.StreamExecutionEnvironment[source]¶ Sets the maximum degree of parallelism defined for the program. The upper limit (inclusive) is 32767.
The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also defines the number of key groups used for partitioned state.
- Parameters
max_parallelism – Maximum degree of parallelism to be used for the program, with 0 < maxParallelism <= 2^15 - 1.
- Returns
This object.
-
set_parallelism
(parallelism: int) → pyflink.datastream.stream_execution_environment.StreamExecutionEnvironment[source]¶ Sets the parallelism for operations executed through this environment. Setting a parallelism of x here will cause all operators (such as map, batchReduce) to run with x parallel instances. This method overrides the default parallelism for this environment. The LocalStreamEnvironment uses by default a value equal to the number of hardware contexts (CPU cores / threads). When executing the program via the command line client from a JAR file, the default degree of parallelism is the one configured for that setup.
- Parameters
parallelism – The parallelism.
- Returns
This object.
-
set_python_executable
(python_exec: str)[source]¶ Sets the path of the python interpreter which is used to execute the python udf workers.
e.g. “/usr/local/bin/python3”.
If python UDF depends on a specific python version which does not exist in the cluster, the method
pyflink.datastream.StreamExecutionEnvironment.add_python_archive()
can be used to upload a virtual environment. The path of the python interpreter contained in the uploaded environment can be specified via this method.Example:
# command executed in shell # assume that the relative path of python interpreter is py_env/bin/python $ zip -r py_env.zip py_env # python code >>> stream_env.add_python_archive("py_env.zip") >>> stream_env.set_python_executable("py_env.zip/py_env/bin/python")
Note
Please make sure the uploaded python environment matches the platform that the cluster is running on and that the python version must be 3.5 or higher.
Note
The python udf worker depends on Apache Beam (version == 2.23.0). Please ensure that the specified environment meets the above requirements.
- Parameters
python_exec – The path of python interpreter.
-
set_python_requirements
(requirements_file_path: str, requirements_cache_dir: str = None)[source]¶ Specifies a requirements.txt file which defines the third-party dependencies. These dependencies will be installed to a temporary directory and added to the PYTHONPATH of the python UDF worker.
For the dependencies which could not be accessed in the cluster, a directory which contains the installation packages of these dependencies could be specified using the parameter “requirements_cached_dir”. It will be uploaded to the cluster to support offline installation.
Example:
# commands executed in shell $ echo numpy==1.16.5 > requirements.txt $ pip download -d cached_dir -r requirements.txt --no-binary :all: # python code >>> stream_env.set_python_requirements("requirements.txt", "cached_dir")
Note
Please make sure the installation packages matches the platform of the cluster and the python version used. These packages will be installed using pip, so also make sure the version of Pip (version >= 7.1.0) and the version of SetupTools (version >= 37.0.0).
- Parameters
requirements_file_path – The path of “requirements.txt” file.
requirements_cache_dir – The path of the local directory which contains the installation packages.
-
set_restart_strategy
(restart_strategy_configuration: pyflink.common.restart_strategy.RestartStrategyConfiguration)[source]¶ Sets the restart strategy configuration. The configuration specifies which restart strategy will be used for the execution graph in case of a restart.
Example:
>>> env.set_restart_strategy(RestartStrategies.no_restart())
- Parameters
restart_strategy_configuration – Restart strategy configuration to be set.
- Returns
-
set_state_backend
(state_backend: pyflink.datastream.state_backend.StateBackend) → pyflink.datastream.stream_execution_environment.StreamExecutionEnvironment[source]¶ Sets the state backend that describes how to store and checkpoint operator state. It defines both which data structures hold state during execution (for example hash tables, RockDB, or other data stores) as well as where checkpointed data will be persisted.
The
MemoryStateBackend
for example maintains the state in heap memory, as objects. It is lightweight without extra dependencies, but can checkpoint only small states(some counters).In contrast, the
FsStateBackend
stores checkpoints of the state (also maintained as heap objects) in files. When using a replicated file system (like HDFS, S3, MapR FS, Alluxio, etc) this will guarantee that state is not lost upon failures of individual nodes and that streaming program can be executed highly available and strongly consistent(assuming that Flink is run in high-availability mode).- The build-in state backend includes:
See also
Example:
>>> env.set_state_backend(RocksDBStateBackend("file://var/checkpoints/"))
- Parameters
state_backend – The
StateBackend
.- Returns
This object.
-
set_stream_time_characteristic
(characteristic: pyflink.datastream.time_characteristic.TimeCharacteristic)[source]¶ Sets the time characteristic for all streams create from this environment, e.g., processing time, event time, or ingestion time.
If you set the characteristic to IngestionTime of EventTime this will set a default watermark update interval of 200 ms. If this is not applicable for your application you should change it using
pyflink.common.ExecutionConfig.set_auto_watermark_interval()
.Example:
>>> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
- Parameters
characteristic – The time characteristic, which could be
TimeCharacteristic.ProcessingTime
,TimeCharacteristic.IngestionTime
,TimeCharacteristic.EventTime
.
-
-
class
pyflink.datastream.
CheckpointConfig
(j_checkpoint_config)[source]¶ Bases:
object
Configuration that captures all checkpointing related settings.
The default checkpoint mode: exactly once.
The default timeout of a checkpoint attempt: 10 minutes.
DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS
:The default minimum pause to be made between checkpoints: none.
DEFAULT_MAX_CONCURRENT_CHECKPOINTS
:The default limit of concurrently happening checkpoints: one.
-
DEFAULT_MAX_CONCURRENT_CHECKPOINTS
= 1¶
-
DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS
= 0¶
-
DEFAULT_MODE
= 0¶
-
DEFAULT_TIMEOUT
= 600000¶
-
disable_unaligned_checkpoints
() → pyflink.datastream.checkpoint_config.CheckpointConfig[source]¶ Enables unaligned checkpoints, which greatly reduce checkpointing times under backpressure (experimental).
Unaligned checkpoints contain data stored in buffers as part of the checkpoint state, which allows checkpoint barriers to overtake these buffers. Thus, the checkpoint duration becomes independent of the current throughput as checkpoint barriers are effectively not embedded into the stream of data anymore.
Unaligned checkpoints can only be enabled if
get_checkpointing_mode()
isCheckpointingMode.EXACTLY_ONCE
.
-
enable_externalized_checkpoints
(cleanup_mode: pyflink.datastream.checkpoint_config.ExternalizedCheckpointCleanup) → pyflink.datastream.checkpoint_config.CheckpointConfig[source]¶ Enables checkpoints to be persisted externally.
Externalized checkpoints write their meta data out to persistent storage and are not automatically cleaned up when the owning job fails or is suspended (terminating with job status
FAILED
orSUSPENDED
). In this case, you have to manually clean up the checkpoint state, both the meta data and actual program state.The
ExternalizedCheckpointCleanup
mode defines how an externalized checkpoint should be cleaned up on job cancellation. If you choose to retain externalized checkpoints on cancellation you have you handle checkpoint clean up manually when you cancel the job as well (terminating with job statusCANCELED
).The target directory for externalized checkpoints is configured via
org.apache.flink.configuration.CheckpointingOptions#CHECKPOINTS_DIRECTORY
.Example:
>>> config.enable_externalized_checkpoints( ... ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
- Parameters
cleanup_mode – Externalized checkpoint cleanup behaviour, the mode could be
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION
orExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
-
enable_unaligned_checkpoints
(enabled: bool = True) → pyflink.datastream.checkpoint_config.CheckpointConfig[source]¶ Enables unaligned checkpoints, which greatly reduce checkpointing times under backpressure.
Unaligned checkpoints contain data stored in buffers as part of the checkpoint state, which allows checkpoint barriers to overtake these buffers. Thus, the checkpoint duration becomes independent of the current throughput as checkpoint barriers are effectively not embedded into the stream of data anymore.
Unaligned checkpoints can only be enabled if
get_checkpointing_mode()
isCheckpointingMode.EXACTLY_ONCE
.- Parameters
enabled –
True
if a checkpoints should be taken in unaligned mode.
-
get_checkpoint_interval
() → int[source]¶ Gets the interval in which checkpoints are periodically scheduled.
This setting defines the base interval. Checkpoint triggering may be delayed by the settings
get_max_concurrent_checkpoints()
andget_min_pause_between_checkpoints()
.- Returns
The checkpoint interval, in milliseconds.
-
get_checkpoint_timeout
() → int[source]¶ Gets the maximum time that a checkpoint may take before being discarded.
- Returns
The checkpoint timeout, in milliseconds.
-
get_checkpointing_mode
() → pyflink.datastream.checkpointing_mode.CheckpointingMode[source]¶ Gets the checkpointing mode (exactly-once vs. at-least-once).
See also
- Returns
The
CheckpointingMode
.
-
get_externalized_checkpoint_cleanup
() → Optional[pyflink.datastream.checkpoint_config.ExternalizedCheckpointCleanup][source]¶ Returns the cleanup behaviour for externalized checkpoints.
- Returns
The cleanup behaviour for externalized checkpoints or
None
if none is configured.
-
get_max_concurrent_checkpoints
() → int[source]¶ Gets the maximum number of checkpoint attempts that may be in progress at the same time. If this value is n, then no checkpoints will be triggered while n checkpoint attempts are currently in flight. For the next checkpoint to be triggered, one checkpoint attempt would need to finish or expire.
- Returns
The maximum number of concurrent checkpoint attempts.
-
get_min_pause_between_checkpoints
() → int[source]¶ Gets the minimal pause between checkpointing attempts. This setting defines how soon the checkpoint coordinator may trigger another checkpoint after it becomes possible to trigger another checkpoint with respect to the maximum number of concurrent checkpoints (see
get_max_concurrent_checkpoints()
).- Returns
The minimal pause before the next checkpoint is triggered.
-
is_checkpointing_enabled
() → bool[source]¶ Checks whether checkpointing is enabled.
- Returns
True if checkpointing is enables, false otherwise.
-
is_externalized_checkpoints_enabled
() → bool[source]¶ Returns whether checkpoints should be persisted externally.
- Returns
True
if checkpoints should be externalized, false otherwise.
-
is_fail_on_checkpointing_errors
() → bool[source]¶ This determines the behaviour of tasks if there is an error in their local checkpointing. If this returns true, tasks will fail as a reaction. If this returns false, task will only decline the failed checkpoint.
- Returns
True
if failing on checkpointing errors, false otherwise.
-
is_prefer_checkpoint_for_recovery
() → bool[source]¶ Returns whether a job recovery should fallback to checkpoint when there is a more recent savepoint.
- Returns
True
if a job recovery should fallback to checkpoint, false otherwise.
-
is_unaligned_checkpoints_enabled
() → bool[source]¶ Returns whether unaligned checkpoints are enabled.
- Returns
True
if unaligned checkpoints are enabled.
-
set_checkpoint_interval
(checkpoint_interval: int) → pyflink.datastream.checkpoint_config.CheckpointConfig[source]¶ Sets the interval in which checkpoints are periodically scheduled.
This setting defines the base interval. Checkpoint triggering may be delayed by the settings
set_max_concurrent_checkpoints()
andset_min_pause_between_checkpoints()
.- Parameters
checkpoint_interval – The checkpoint interval, in milliseconds.
-
set_checkpoint_timeout
(checkpoint_timeout: int) → pyflink.datastream.checkpoint_config.CheckpointConfig[source]¶ Sets the maximum time that a checkpoint may take before being discarded.
- Parameters
checkpoint_timeout – The checkpoint timeout, in milliseconds.
-
set_checkpointing_mode
(checkpointing_mode: pyflink.datastream.checkpointing_mode.CheckpointingMode) → pyflink.datastream.checkpoint_config.CheckpointConfig[source]¶ Sets the checkpointing mode (
CheckpointingMode.EXACTLY_ONCE
vs.CheckpointingMode.AT_LEAST_ONCE
).Example:
>>> config.set_checkpointing_mode(CheckpointingMode.AT_LEAST_ONCE)
- Parameters
checkpointing_mode – The
CheckpointingMode
.
-
set_fail_on_checkpointing_errors
(fail_on_checkpointing_errors: bool) → pyflink.datastream.checkpoint_config.CheckpointConfig[source]¶ Sets the expected behaviour for tasks in case that they encounter an error in their checkpointing procedure. If this is set to true, the task will fail on checkpointing error. If this is set to false, the task will only decline a the checkpoint and continue running. The default is true.
Example:
>>> config.set_fail_on_checkpointing_errors(False)
- Parameters
fail_on_checkpointing_errors –
True
if failing on checkpointing errors, false otherwise.
-
set_max_concurrent_checkpoints
(max_concurrent_checkpoints: int) → pyflink.datastream.checkpoint_config.CheckpointConfig[source]¶ Sets the maximum number of checkpoint attempts that may be in progress at the same time. If this value is n, then no checkpoints will be triggered while n checkpoint attempts are currently in flight. For the next checkpoint to be triggered, one checkpoint attempt would need to finish or expire.
- Parameters
max_concurrent_checkpoints – The maximum number of concurrent checkpoint attempts.
-
set_min_pause_between_checkpoints
(min_pause_between_checkpoints: int) → pyflink.datastream.checkpoint_config.CheckpointConfig[source]¶ Sets the minimal pause between checkpointing attempts. This setting defines how soon the checkpoint coordinator may trigger another checkpoint after it becomes possible to trigger another checkpoint with respect to the maximum number of concurrent checkpoints (see
set_max_concurrent_checkpoints()
).If the maximum number of concurrent checkpoints is set to one, this setting makes effectively sure that a minimum amount of time passes where no checkpoint is in progress at all.
- Parameters
min_pause_between_checkpoints – The minimal pause before the next checkpoint is triggered.
-
set_prefer_checkpoint_for_recovery
(prefer_checkpoint_for_recovery: bool) → pyflink.datastream.checkpoint_config.CheckpointConfig[source]¶ Sets whether a job recovery should fallback to checkpoint when there is a more recent savepoint.
- Parameters
prefer_checkpoint_for_recovery –
True
if a job recovery should fallback to checkpoint, false otherwise.
-
-
class
pyflink.datastream.
CheckpointingMode
[source]¶ Bases:
enum.Enum
The checkpointing mode defines what consistency guarantees the system gives in the presence of failures.
When checkpointing is activated, the data streams are replayed such that lost parts of the processing are repeated. For stateful operations and functions, the checkpointing mode defines whether the system draws checkpoints such that a recovery behaves as if the operators/functions see each record “exactly once” (
CheckpointingMode.EXACTLY_ONCE
), or whether the checkpoints are drawn in a simpler fashion that typically encounters some duplicates upon recovery (CheckpointingMode.AT_LEAST_ONCE
)Sets the checkpointing mode to “exactly once”. This mode means that the system will checkpoint the operator and user function state in such a way that, upon recovery, every record will be reflected exactly once in the operator state.
For example, if a user function counts the number of elements in a stream, this number will consistently be equal to the number of actual elements in the stream, regardless of failures and recovery.
Note that this does not mean that each record flows through the streaming data flow only once. It means that upon recovery, the state of operators/functions is restored such that the resumed data streams pick up exactly at after the last modification to the state.
Note that this mode does not guarantee exactly-once behavior in the interaction with external systems (only state in Flink’s operators and user functions). The reason for that is that a certain level of “collaboration” is required between two systems to achieve exactly-once guarantees. However, for certain systems, connectors can be written that facilitate this collaboration.
This mode sustains high throughput. Depending on the data flow graph and operations, this mode may increase the record latency, because operators need to align their input streams, in order to create a consistent snapshot point. The latency increase for simple dataflows (no repartitioning) is negligible. For simple dataflows with repartitioning, the average latency remains small, but the slowest records typically have an increased latency.
Sets the checkpointing mode to “at least once”. This mode means that the system will checkpoint the operator and user function state in a simpler way. Upon failure and recovery, some records may be reflected multiple times in the operator state.
For example, if a user function counts the number of elements in a stream, this number will equal to, or larger, than the actual number of elements in the stream, in the presence of failure and recovery.
This mode has minimal impact on latency and may be preferable in very-low latency scenarios, where a sustained very-low latency (such as few milliseconds) is needed, and where occasional duplicate messages (on recovery) do not matter.
-
AT_LEAST_ONCE
= 1¶
-
EXACTLY_ONCE
= 0¶
-
-
class
pyflink.datastream.
CoMapFunction
[source]¶ Bases:
pyflink.datastream.functions.Function
A CoMapFunction implements a map() transformation over two connected streams.
The same instance of the transformation function is used to transform both of the connected streams. That way, the stream transformations can share state.
The basic syntax for using a CoMapFunction is as follows:
- ::
>>> ds1 = ... >>> ds2 = ... >>> new_ds = ds1.connect(ds2).map(MyCoMapFunction())
-
class
pyflink.datastream.
CoFlatMapFunction
[source]¶ Bases:
pyflink.datastream.functions.Function
A CoFlatMapFunction implements a flat-map transformation over two connected streams.
The same instance of the transformation function is used to transform both of the connected streams. That way, the stream transformations can share state.
An example for the use of connected streams would be to apply rules that change over time onto elements of a stream. One of the connected streams has the rules, the other stream the elements to apply the rules to. The operation on the connected stream maintains the current set of rules in the state. It may receive either a rule update (from the first stream) and update the state, or a data element (from the second stream) and apply the rules in the state to the element. The result of applying the rules would be emitted.
The basic syntax for using a CoFlatMapFunction is as follows:
- ::
>>> ds1 = ... >>> ds2 = ...
>>> class MyCoFlatMapFunction(CoFlatMapFunction): >>> def flat_map1(self, value): >>> for i in range(value): >>> yield i >>> def flat_map2(self, value): >>> for i in range(value): >>> yield i
>>> new_ds = ds1.connect(ds2).flat_map(MyCoFlatMapFunction())
-
class
pyflink.datastream.
DataStream
(j_data_stream)[source]¶ Bases:
object
A DataStream represents a stream of elements of the same type. A DataStream can be transformed into another DataStream by applying a transformation as for example:
- ::
>>> DataStream.map(MapFunctionImpl()) >>> DataStream.filter(FilterFunctionImpl())
-
add_sink
(sink_func: pyflink.datastream.functions.SinkFunction) → pyflink.datastream.data_stream.DataStreamSink[source]¶ Adds the given sink to this DataStream. Only streams with sinks added will be executed once the StreamExecutionEnvironment.execute() method is called.
- Parameters
sink_func – The SinkFunction object.
- Returns
The closed DataStream.
-
assign_timestamps_and_watermarks
(watermark_strategy: pyflink.common.watermark_strategy.WatermarkStrategy) → pyflink.datastream.data_stream.DataStream[source]¶ Assigns timestamps to the elements in the data stream and generates watermarks to signal event time progress. The given {@link WatermarkStrategy} is used to create a TimestampAssigner and WatermarkGenerator.
- Parameters
watermark_strategy – The strategy to generate watermarks based on event timestamps.
- Returns
The stream after the transformation, with assigned timestamps and watermarks.
-
broadcast
() → pyflink.datastream.data_stream.DataStream[source]¶ Sets the partitioning of the DataStream so that the output elements are broadcasted to every parallel instance of the next operation.
- Returns
The DataStream with broadcast partitioning set.
-
connect
(ds: pyflink.datastream.data_stream.DataStream) → pyflink.datastream.data_stream.ConnectedStreams[source]¶ Creates a new ‘ConnectedStreams’ by connecting ‘DataStream’ outputs of (possible) different types with each other. The DataStreams connected using this operator can be used with CoFunctions to apply joint transformations.
- Parameters
ds – The DataStream with which this stream will be connected.
- Returns
The ConnectedStreams.
-
disable_chaining
() → pyflink.datastream.data_stream.DataStream[source]¶ Turns off chaining for this operator so thread co-location will not be used as an optimization. Chaining can be turned off for the whole job by StreamExecutionEnvironment.disableOperatorChaining() however it is not advised for performance consideration.
- Returns
The operator with chaining disabled.
-
filter
(func: Union[Callable, pyflink.datastream.functions.FilterFunction]) → pyflink.datastream.data_stream.DataStream[source]¶ Applies a Filter transformation on a DataStream. The transformation calls a FilterFunction for each element of the DataStream and retains only those element for which the function returns true. Elements for which the function returns false are filtered. The user can also extend RichFilterFunction to gain access to other features provided by the RichFunction interface.
- Parameters
func – The FilterFunction that is called for each element of the DataStream.
- Returns
The filtered DataStream.
-
flat_map
(func: Union[Callable, pyflink.datastream.functions.FlatMapFunction], result_type: pyflink.common.typeinfo.TypeInformation = None) → pyflink.datastream.data_stream.DataStream[source]¶ Applies a FlatMap transformation on a DataStream. The transformation calls a FlatMapFunction for each element of the DataStream. Each FlatMapFunction call can return any number of elements including none. The user can also extend RichFlatMapFunction to gain access to other features provided by the RichFUnction.
- Parameters
func – The FlatMapFunction that is called for each element of the DataStream.
result_type – The type information of output data.
- Returns
The transformed DataStream.
-
force_non_parallel
() → pyflink.datastream.data_stream.DataStream[source]¶ Sets the parallelism and maximum parallelism of this operator to one. And mark this operator cannot set a non-1 degree of parallelism.
- Returns
The operator with only one parallelism.
-
forward
() → pyflink.datastream.data_stream.DataStream[source]¶ Sets the partitioning of the DataStream so that the output elements are forwarded to the local sub-task of the next operation.
- Returns
The DataStream with forward partitioning set.
-
get_execution_environment
()[source]¶ Returns the StreamExecutionEnvironment that was used to create this DataStream.
- Returns
The Execution Environment.
-
get_name
() → str[source]¶ Gets the name of the current data stream. This name is used by the visualization and logging during runtime.
- Returns
Name of the stream.
-
get_type
() → pyflink.common.typeinfo.TypeInformation[source]¶ Gets the type of the stream.
- Returns
The type of the DataStream.
-
key_by
(key_selector: Union[Callable, pyflink.datastream.functions.KeySelector], key_type_info: pyflink.common.typeinfo.TypeInformation = None) → pyflink.datastream.data_stream.KeyedStream[source]¶ Creates a new KeyedStream that uses the provided key for partitioning its operator states.
- Parameters
key_selector – The KeySelector to be used for extracting the key for partitioning.
key_type_info – The type information describing the key type.
- Returns
The DataStream with partitioned state(i.e. KeyedStream).
-
map
(func: Union[Callable, pyflink.datastream.functions.MapFunction], output_type: pyflink.common.typeinfo.TypeInformation = None) → pyflink.datastream.data_stream.DataStream[source]¶ Applies a Map transformation on a DataStream. The transformation calls a MapFunction for each element of the DataStream. Each MapFunction call returns exactly one element. The user can also extend RichMapFunction to gain access to other features provided by the RichFunction interface.
Note that If user does not specify the output data type, the output data will be serialized as pickle primitive byte array.
- Parameters
func – The MapFunction that is called for each element of the DataStream.
output_type – The type information of the MapFunction output data.
- Returns
The transformed DataStream.
-
name
(name: str) → pyflink.datastream.data_stream.DataStream[source]¶ Sets the name of the current data stream. This name is used by the visualization and logging during runting.
- Parameters
name – Name of the stream.
- Returns
The named operator.
-
partition_custom
(partitioner: Union[Callable, pyflink.datastream.functions.Partitioner], key_selector: Union[Callable, pyflink.datastream.functions.KeySelector]) → pyflink.datastream.data_stream.DataStream[source]¶ Partitions a DataStream on the key returned by the selector, using a custom partitioner. This method takes the key selector to get the key to partition on, and a partitioner that accepts the key type.
Note that this method works only on single field keys, i.e. the selector cannet return tuples of fields.
- Parameters
partitioner – The partitioner to assign partitions to keys.
key_selector – The KeySelector with which the DataStream is partitioned.
- Returns
The partitioned DataStream.
-
print
(sink_identifier: str = None) → pyflink.datastream.data_stream.DataStreamSink[source]¶ Writes a DataStream to the standard output stream (stdout). For each element of the DataStream the object string is writen.
NOTE: This will print to stdout on the machine where the code is executed, i.e. the Flink worker, and is not fault tolerant.
- Parameters
sink_identifier – The string to prefix the output with.
- Returns
The closed DataStream.
-
process
(func: pyflink.datastream.functions.ProcessFunction, output_type: pyflink.common.typeinfo.TypeInformation = None) → pyflink.datastream.data_stream.DataStream[source]¶ Applies the given ProcessFunction on the input stream, thereby creating a transformed output stream.
The function will be called for every element in the input streams and can produce zero or more output elements.
- Parameters
func – The ProcessFunction that is called for each element in the stream.
output_type – TypeInformation for the result type of the function.
- Returns
The transformed DataStream.
-
project
(*field_indexes: int) → pyflink.datastream.data_stream.DataStream[source]¶ Initiates a Project transformation on a Tuple DataStream.
Note that only Tuple DataStreams can be projected.
- Parameters
field_indexes – The field indexes of the input tuples that are retained. The order of fields in the output tuple corresponds to the order of field indexes.
- Returns
The projected DataStream.
-
rebalance
() → pyflink.datastream.data_stream.DataStream[source]¶ Sets the partitioning of the DataStream so that the output elements are distributed evenly to instances of the next operation in a round-robin fashion.
- Returns
The DataStream with rebalance partition set.
-
rescale
() → pyflink.datastream.data_stream.DataStream[source]¶ Sets the partitioning of the DataStream so that the output elements are distributed evenly to a subset of instances of the next operation in a round-robin fashion.
The subset of downstream operations to which the upstream operation sends elements depends on the degree of parallelism of both the upstream and downstream operation. For example, if the upstream operation has parallelism 2 and the downstream operation has parallelism 4, then one upstream operation would distribute elements to two downstream operations. If, on the other hand, the downstream operation has parallelism 4 then two upstream operations will distribute to one downstream operation while the other two upstream operations will distribute to the other downstream operations.
In cases where the different parallelisms are not multiples of each one or several downstream operations will have a differing number of inputs from upstream operations.
- Returns
The DataStream with rescale partitioning set.
-
set_buffer_timeout
(timeout_millis: int) → pyflink.datastream.data_stream.DataStream[source]¶ Sets the buffering timeout for data produced by this operation. The timeout defines how long data may linger ina partially full buffer before being sent over the network.
Lower timeouts lead to lower tail latencies, but may affect throughput. Timeouts of 1 ms still sustain high throughput, even for jobs with high parallelism.
A value of ‘-1’ means that the default buffer timeout should be used. A value of ‘0’ indicates that no buffering should happen, and all records/events should be immediately sent through the network, without additional buffering.
- Parameters
timeout_millis – The maximum time between two output flushes.
- Returns
The operator with buffer timeout set.
-
set_max_parallelism
(max_parallelism: int) → pyflink.datastream.data_stream.DataStream[source]¶ Sets the maximum parallelism of this operator.
The maximum parallelism specifies the upper bound for dynamic scaling. It also defines the number of key groups used for partitioned state.
- Parameters
max_parallelism – Maximum parallelism.
- Returns
The operator with set maximum parallelism.
-
set_parallelism
(parallelism: int) → pyflink.datastream.data_stream.DataStream[source]¶ Sets the parallelism for this operator.
- Parameters
parallelism – THe parallelism for this operator.
- Returns
The operator with set parallelism.
-
set_uid_hash
(uid_hash: str) → pyflink.datastream.data_stream.DataStream[source]¶ Sets an user provided hash for this operator. This will be used AS IS the create the JobVertexID. The user provided hash is an alternative to the generated hashed, that is considered when identifying an operator through the default hash mechanics fails (e.g. because of changes between Flink versions).
Important: this should be used as a workaround or for trouble shooting. The provided hash needs to be unique per transformation and job. Otherwise, job submission will fail. Furthermore, you cannot assign user-specified hash to intermediate nodes in an operator chain and trying so will let your job fail.
A use case for this is in migration between Flink versions or changing the jobs in a way that changes the automatically generated hashes. In this case, providing the previous hashes directly through this method (e.g. obtained from old logs) can help to reestablish a lost mapping from states to their target operator.
- Parameters
uid_hash – The user provided hash for this operator. This will become the jobVertexID, which is shown in the logs and web ui.
- Returns
The operator with the user provided hash.
-
shuffle
() → pyflink.datastream.data_stream.DataStream[source]¶ Sets the partitioning of the DataStream so that the output elements are shuffled uniformly randomly to the next operation.
- Returns
The DataStream with shuffle partitioning set.
-
slot_sharing_group
(slot_sharing_group: str) → pyflink.datastream.data_stream.DataStream[source]¶ Sets the slot sharing group of this operation. Parallel instances of operations that are in the same slot sharing group will be co-located in the same TaskManager slot, if possible.
Operations inherit the slot sharing group of input operations if all input operations are in the same slot sharing group and no slot sharing group was explicitly specified.
Initially an operation is in the default slot sharing group. An operation can be put into the default group explicitly by setting the slot sharing group to ‘default’.
- Parameters
slot_sharing_group – The slot sharing group name.
- Returns
This operator.
-
start_new_chain
() → pyflink.datastream.data_stream.DataStream[source]¶ Starts a new task chain beginning at this operator. This operator will be chained (thread co-located for increased performance) to any previous tasks even if possible.
- Returns
The operator with chaining set.
-
uid
(uid: str) → pyflink.datastream.data_stream.DataStream[source]¶ Sets an ID for this operator. The specified ID is used to assign the same operator ID across job submissions (for example when starting a job from a savepoint).
Important: this ID needs to be unique per transformation and job. Otherwise, job submission will fail.
- Parameters
uid – The unique user-specified ID of this transformation.
- Returns
The operator with the specified ID.
-
union
(*streams: pyflink.datastream.data_stream.DataStream) → pyflink.datastream.data_stream.DataStream[source]¶ Creates a new DataStream by merging DataStream outputs of the same type with each other. The DataStreams merged using this operator will be transformed simultaneously.
- Parameters
streams – The DataStream to union outputwith.
- Returns
The DataStream.
-
class
pyflink.datastream.
FlatMapFunction
[source]¶ Bases:
pyflink.datastream.functions.Function
Base class for flatMap functions. FlatMap functions take elements and transform them, into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists and arrays. Operations that produce multiple strictly one result element per input element can also use the MapFunction. The basic syntax for using a MapFUnction is as follows:
- ::
>>> ds = ... >>> new_ds = ds.flat_map(MyFlatMapFunction())
-
abstract
flat_map
(value)[source]¶ The core mthod of the FlatMapFunction. Takes an element from the input data and transforms it into zero, one, or more elements. A basic implementation of flat map is as follows:
- ::
>>> class MyFlatMapFunction(FlatMapFunction): >>> def flat_map(self, value): >>> for i in range(value): >>> yield i
- Parameters
value – The input value.
- Returns
A generator
-
class
pyflink.datastream.
FilterFunction
[source]¶ Bases:
pyflink.datastream.functions.Function
A filter function is a predicate applied individually to each record. The predicate decides whether to keep the element, or to discard it.
The basic syntax for using a FilterFunction is as follows:
- ::
>>> ds = ... >>> result = ds.filter(MyFilterFunction())
Note that the system assumes that the function does not modify the elements on which the predicate is applied. Violating this assumption can lead to incorrect results.
-
class
pyflink.datastream.
KeySelector
[source]¶ Bases:
pyflink.datastream.functions.Function
The KeySelector allows to use deterministic objects for operations such as reduce, reduceGroup, join coGroup, etc. If invoked multiple times on the same object, the returned key must be the same. The extractor takes an object an returns the deterministic key for that object.
-
class
pyflink.datastream.
Partitioner
[source]¶ Bases:
pyflink.datastream.functions.Function
Function to implement a custom partition assignment for keys.
-
class
pyflink.datastream.
ReduceFunction
[source]¶ Bases:
pyflink.datastream.functions.Function
Base interface for Reduce functions. Reduce functions combine groups of elements to a single value, by taking always two elements and combining them into one. Reduce functions may be used on entire data sets, or on grouped data sets. In the latter case, each group is reduced individually.
The basic syntax for using a ReduceFunction is as follows:
- ::
>>> ds = ... >>> new_ds = ds.key_by(lambda x: x[1]).reduce(MyReduceFunction())
-
abstract
reduce
(value1, value2)[source]¶ The core method of ReduceFunction, combining two values into one value of the same type. The reduce function is consecutively applied to all values of a group until only a single value remains.
- Parameters
value1 – The first value to combine.
value2 – The second value to combine.
- Returns
The combined value of both input values.
-
class
pyflink.datastream.
RuntimeContext
(task_name: str, task_name_with_subtasks: str, number_of_parallel_subtasks: int, max_number_of_parallel_subtasks: int, index_of_this_subtask: int, attempt_number: int, job_parameters: Dict[str, str])[source]¶ Bases:
object
A RuntimeContext contains information about the context in which functions are executed. Each parallel instance of the function will have a context through which it can access static contextual information (such as the current parallelism).
-
get_attempt_number
() → int[source]¶ Gets the attempt number of this parallel subtask. First attempt is numbered 0.
-
get_index_of_this_subtask
() → int[source]¶ Gets the number of this parallel subtask. The numbering starts from 0 and goes up to parallelism-1 (parallelism as returned by
get_number_of_parallel_subtasks()
).
-
get_job_parameter
(key: str, default_value: str)[source]¶ Gets the global job parameter value associated with the given key as a string.
-
get_max_number_of_parallel_subtasks
() → int[source]¶ Gets the number of max-parallelism with which the parallel task runs.
-
get_number_of_parallel_subtasks
() → int[source]¶ Gets the parallelism with which the parallel task runs.
-
get_task_name
() → str[source]¶ Returns the name of the task in which the UDF runs, as assigned during plan construction.
-
get_task_name_with_subtasks
() → str[source]¶ Returns the name of the task, appended with the subtask indicator, such as “MyTask (3/6)”, where 3 would be (
get_index_of_this_subtask()
+ 1), and 6 would beget_number_of_parallel_subtasks()
.
-
-
class
pyflink.datastream.
SinkFunction
(sink_func: Union[str, py4j.java_gateway.JavaObject])[source]¶ Bases:
pyflink.datastream.functions.JavaFunctionWrapper
The base class for SinkFunctions.
-
class
pyflink.datastream.
SourceFunction
(source_func: Union[str, py4j.java_gateway.JavaObject])[source]¶ Bases:
pyflink.datastream.functions.JavaFunctionWrapper
Base class for all stream data source in Flink.
-
class
pyflink.datastream.
StateBackend
(j_state_backend)[source]¶ Bases:
object
A State Backend defines how the state of a streaming application is stored and checkpointed. Different State Backends store their state in different fashions, and use different data structures to hold the state of a running application.
For example, the
MemoryStateBackend
keeps working state in the memory of the TaskManager and stores checkpoints in the memory of the JobManager. The backend is lightweight and without additional dependencies, but not highly available and supports only small state.The
FsStateBackend
keeps working state in the memory of the TaskManager and stores state checkpoints in a filesystem(typically a replicated highly-available filesystem, like HDFS, Ceph, S3, GCS, etc).The
RocksDBStateBackend
stores working state in RocksDB, and checkpoints the state by default to a filesystem (similar to theFsStateBackend
).Raw Bytes Storage and Backends
The
StateBackend
creates services for raw bytes storage and for keyed state and operator state.The raw bytes storage (through the org.apache.flink.runtime.state.CheckpointStreamFactory) is the fundamental service that simply stores bytes in a fault tolerant fashion. This service is used by the JobManager to store checkpoint and recovery metadata and is typically also used by the keyed- and operator state backends to store checkpointed state.
The org.apache.flink.runtime.state.AbstractKeyedStateBackend and `org.apache.flink.runtime.state.OperatorStateBackend created by this state backend define how to hold the working state for keys and operators. They also define how to checkpoint that state, frequently using the raw bytes storage (via the org.apache.flink.runtime.state.CheckpointStreamFactory). However, it is also possible that for example a keyed state backend simply implements the bridge to a key/value store, and that it does not need to store anything in the raw byte storage upon a checkpoint.
Serializability
State Backends need to be serializable(java.io.Serializable), because they distributed across parallel processes (for distributed execution) together with the streaming application code.
Because of that,
StateBackend
implementations are meant to be like factories that create the proper states stores that provide access to the persistent storage and hold the keyed- and operator state data structures. That way, the State Backend can be very lightweight (contain only configurations) which makes it easier to be serializable.Thread Safety
State backend implementations have to be thread-safe. Multiple threads may be creating streams and keyed-/operator state backends concurrently.
-
class
pyflink.datastream.
MapFunction
[source]¶ Bases:
pyflink.datastream.functions.Function
Base class for Map functions. Map functions take elements and transform them, element wise. A Map function always produces a single result element for each input element. Typical applications are parsing elements, converting data types, or projecting out fields. Operations that produce multiple result elements from a single input element can be implemented using the FlatMapFunction. The basic syntax for using a MapFunction is as follows:
- ::
>>> ds = ... >>> new_ds = ds.map(MyMapFunction())
-
class
pyflink.datastream.
MemoryStateBackend
(checkpoint_path=None, savepoint_path=None, max_state_size=None, using_asynchronous_snapshots=None, j_memory_state_backend=None)[source]¶ Bases:
pyflink.datastream.state_backend.StateBackend
This state backend holds the working state in the memory (JVM heap) of the TaskManagers. The state backend checkpoints state directly to the JobManager’s memory (hence the backend’s name), but the checkpoints will be persisted to a file system for high-availability setups and savepoints. The MemoryStateBackend is consequently a FileSystem-based backend that can work without a file system dependency in simple setups.
This state backend should be used only for experimentation, quick local setups, or for streaming applications that have very small state: Because it requires checkpoints to go through the JobManager’s memory, larger state will occupy larger portions of the JobManager’s main memory, reducing operational stability. For any other setup, the
FsStateBackend
should be used. TheFsStateBackend
holds the working state on the TaskManagers in the same way, but checkpoints state directly to files rather then to the JobManager’s memory, thus supporting large state sizes.State Size Considerations
State checkpointing with this state backend is subject to the following conditions:
Each individual state must not exceed the configured maximum state size (see
get_max_state_size()
.All state from one task (i.e., the sum of all operator states and keyed states from all chained operators of the task) must not exceed what the RPC system supports, which is be default < 10 MB. That limit can be configured up, but that is typically not advised.
The sum of all states in the application times all retained checkpoints must comfortably fit into the JobManager’s JVM heap space.
Persistence Guarantees
For the use cases where the state sizes can be handled by this backend, the backend does guarantee persistence for savepoints, externalized checkpoints (of configured), and checkpoints (when high-availability is configured).
Configuration
As for all state backends, this backend can either be configured within the application (by creating the backend with the respective constructor parameters and setting it on the execution environment) or by specifying it in the Flink configuration.
If the state backend was specified in the application, it may pick up additional configuration parameters from the Flink configuration. For example, if the backend if configured in the application without a default savepoint directory, it will pick up a default savepoint directory specified in the Flink configuration of the running job/cluster. That behavior is implemented via the
configure()
method.-
DEFAULT_MAX_STATE_SIZE
= 5242880¶
-
get_max_state_size
() → int[source]¶ Gets the maximum size that an individual state can have, as configured in the constructor (by default
DEFAULT_MAX_STATE_SIZE
).- Returns
The maximum size that an individual state can have.
-
is_using_asynchronous_snapshots
() → bool[source]¶ Gets whether the key/value data structures are asynchronously snapshotted.
If not explicitly configured, this is the default value of
org.apache.flink.configuration.CheckpointingOptions.ASYNC_SNAPSHOTS
.- Returns
True if the key/value data structures are asynchronously snapshotted, false otherwise.
-
class
pyflink.datastream.
FsStateBackend
(checkpoint_directory_uri=None, default_savepoint_directory_uri=None, file_state_size_threshold=None, write_buffer_size=None, using_asynchronous_snapshots=None, j_fs_state_backend=None)[source]¶ Bases:
pyflink.datastream.state_backend.StateBackend
This state backend holds the working state in the memory (JVM heap) of the TaskManagers. The state backend checkpoints state as files to a file system (hence the backend’s name).
Each checkpoint individually will store all its files in a subdirectory that includes the checkpoint number, such as
hdfs://namenode:port/flink-checkpoints/chk-17/
.State Size Considerations
Working state is kept on the TaskManager heap. If a TaskManager executes multiple tasks concurrently (if the TaskManager has multiple slots, or if slot-sharing is used) then the aggregate state of all tasks needs to fit into that TaskManager’s memory.
This state backend stores small state chunks directly with the metadata, to avoid creating many small files. The threshold for that is configurable. When increasing this threshold, the size of the checkpoint metadata increases. The checkpoint metadata of all retained completed checkpoints needs to fit into the JobManager’s heap memory. This is typically not a problem, unless the threshold
get_min_file_size_threshold()
is increased significantly.Persistence Guarantees
Checkpoints from this state backend are as persistent and available as filesystem that is written to. If the file system is a persistent distributed file system, this state backend supports highly available setups. The backend additionally supports savepoints and externalized checkpoints.
Configuration
As for all state backends, this backend can either be configured within the application (by creating the backend with the respective constructor parameters and setting it on the execution environment) or by specifying it in the Flink configuration.
If the state backend was specified in the application, it may pick up additional configuration parameters from the Flink configuration. For example, if the backend if configured in the application without a default savepoint directory, it will pick up a default savepoint directory specified in the Flink configuration of the running job/cluster. That behavior is implemented via the
configure()
method.-
get_checkpoint_path
() → str[source]¶ Gets the base directory where all the checkpoints are stored. The job-specific checkpoint directory is created inside this directory.
- Returns
The base directory for checkpoints.
-
get_min_file_size_threshold
() → int[source]¶ Gets the threshold below which state is stored as part of the metadata, rather than in files. This threshold ensures that the backend does not create a large amount of very small files, where potentially the file pointers are larger than the state itself.
If not explicitly configured, this is the default value of
org.apache.flink.configuration.CheckpointingOptions.FS_SMALL_FILE_THRESHOLD
.- Returns
The file size threshold, in bytes.
-
get_write_buffer_size
() → int[source]¶ Gets the write buffer size for created checkpoint stream.
If not explicitly configured, this is the default value of
org.apache.flink.configuration.CheckpointingOptions.FS_WRITE_BUFFER_SIZE
.- Returns
The write buffer size, in bytes.
-
is_using_asynchronous_snapshots
() → bool[source]¶ Gets whether the key/value data structures are asynchronously snapshotted.
If not explicitly configured, this is the default value of
org.apache.flink.configuration.CheckpointingOptions.ASYNC_SNAPSHOTS
.- Returns
True if the key/value data structures are asynchronously snapshotted, false otherwise.
-
-
class
pyflink.datastream.
RocksDBStateBackend
(checkpoint_data_uri=None, enable_incremental_checkpointing=None, checkpoint_stream_backend=None, j_rocks_db_state_backend=None)[source]¶ Bases:
pyflink.datastream.state_backend.StateBackend
A State Backend that stores its state in
RocksDB
. This state backend can store very large state that exceeds memory and spills to disk.All key/value state (including windows) is stored in the key/value index of RocksDB. For persistence against loss of machines, checkpoints take a snapshot of the RocksDB database, and persist that snapshot in a file system (by default) or another configurable state backend.
The behavior of the RocksDB instances can be parametrized by setting RocksDB Options using the methods
set_predefined_options()
andset_options()
.-
get_checkpoint_backend
()[source]¶ Gets the state backend that this RocksDB state backend uses to persist its bytes to.
This RocksDB state backend only implements the RocksDB specific parts, it relies on the ‘CheckpointBackend’ to persist the checkpoint and savepoint bytes streams.
- Returns
The state backend to persist the checkpoint and savepoint bytes streams.
-
get_db_storage_paths
() → List[str][source]¶ Gets the configured local DB storage paths, or null, if none were configured.
Under these directories on the TaskManager, RocksDB stores its SST files and metadata files. These directories do not need to be persistent, they can be ephermeral, meaning that they are lost on a machine failure, because state in RocksDB is persisted in checkpoints.
If nothing is configured, these directories default to the TaskManager’s local temporary file directories.
- Returns
The list of configured local DB storage paths.
-
get_number_of_transfering_threads
() → int[source]¶ Gets the number of threads used to transfer files while snapshotting/restoring.
- Returns
The number of threads used to transfer files while snapshotting/restoring.
-
get_options
() → Optional[str][source]¶ Gets the fully-qualified class name of the options factory in Java that lazily creates the RocksDB options.
- Returns
The fully-qualified class name of the options factory in Java.
-
get_predefined_options
() → pyflink.datastream.state_backend.PredefinedOptions[source]¶ Gets the current predefined options for RocksDB. The default options (if nothing was set via
setPredefinedOptions()
) arePredefinedOptions.DEFAULT
.If user-configured options within
RocksDBConfigurableOptions
is set (through flink-conf.yaml) or a user-defined options factory is set (viasetOptions()
), then the options from the factory are applied on top of the predefined and customized options.See also
- Returns
Current predefined options.
-
is_incremental_checkpoints_enabled
() → bool[source]¶ Gets whether incremental checkpoints are enabled for this state backend.
- Returns
True if incremental checkpoints are enabled, false otherwise.
-
set_db_storage_paths
(*paths: str)[source]¶ Sets the directories in which the local RocksDB database puts its files (like SST and metadata files). These directories do not need to be persistent, they can be ephemeral, meaning that they are lost on a machine failure, because state in RocksDB is persisted in checkpoints.
If nothing is configured, these directories default to the TaskManager’s local temporary file directories.
Each distinct state will be stored in one path, but when the state backend creates multiple states, they will store their files on different paths.
Passing
None
to this function restores the default behavior, where the configured temp directories will be used.- Parameters
paths – The paths across which the local RocksDB database files will be spread. this parameter is optional.
-
set_number_of_transfering_threads
(number_of_transfering_threads: int)[source]¶ Sets the number of threads used to transfer files while snapshotting/restoring.
- Parameters
number_of_transfering_threads – The number of threads used to transfer files while snapshotting/restoring.
-
set_options
(options_factory_class_name: str)[source]¶ Sets
org.rocksdb.Options
for the RocksDB instances. Because the options are not serializable and hold native code references, they must be specified through a factory.The options created by the factory here are applied on top of the pre-defined options profile selected via
set_predefined_options()
. If the pre-defined options profile is the default (PredefinedOptions.DEFAULT
), then the factory fully controls the RocksDB options.- Parameters
options_factory_class_name – The fully-qualified class name of the options factory in Java that lazily creates the RocksDB options. The options factory must have a default constructor.
-
set_predefined_options
(options: pyflink.datastream.state_backend.PredefinedOptions)[source]¶ Sets the predefined options for RocksDB.
If user-configured options within
RocksDBConfigurableOptions
is set (through flink-conf.yaml) or a user-defined options factory is set (viasetOptions()
), then the options from the factory are applied on top of the here specified predefined options and customized options.Example:
>>> state_backend.set_predefined_options(PredefinedOptions.SPINNING_DISK_OPTIMIZED)
- Parameters
options – The options to set (must not be null), see
PredefinedOptions
.
-
-
class
pyflink.datastream.
CustomStateBackend
(j_custom_state_backend)[source]¶ Bases:
pyflink.datastream.state_backend.StateBackend
A wrapper of customized java state backend created from the provided StateBackendFactory.
-
class
pyflink.datastream.
PredefinedOptions
[source]¶ Bases:
enum.Enum
The
PredefinedOptions
are configuration settings for theRocksDBStateBackend
. The various pre-defined choices are configurations that have been empirically determined to be beneficial for performance under different settings.Some of these settings are based on experiments by the Flink community, some follow guides from the RocksDB project.
Default options for all settings, except that writes are not forced to the disk.
Note
Because Flink does not rely on RocksDB data on disk for recovery, there is no need to sync data to stable storage.
Pre-defined options for regular spinning hard disks.
This constant configures RocksDB with some options that lead empirically to better performance when the machines executing the system use regular spinning hard disks.
The following options are set:
setCompactionStyle(CompactionStyle.LEVEL)
setLevelCompactionDynamicLevelBytes(true)
setIncreaseParallelism(4)
setUseFsync(false)
setDisableDataSync(true)
setMaxOpenFiles(-1)
Note
Because Flink does not rely on RocksDB data on disk for recovery, there is no need to sync data to stable storage.
SPINNING_DISK_OPTIMIZED_HIGH_MEM
:Pre-defined options for better performance on regular spinning hard disks, at the cost of a higher memory consumption.
Note
These settings will cause RocksDB to consume a lot of memory for block caching and compactions. If you experience out-of-memory problems related to, RocksDB, consider switching back to
SPINNING_DISK_OPTIMIZED
.The following options are set:
setLevelCompactionDynamicLevelBytes(true)
setTargetFileSizeBase(256 MBytes)
setMaxBytesForLevelBase(1 GByte)
setWriteBufferSize(64 MBytes)
setIncreaseParallelism(4)
setMinWriteBufferNumberToMerge(3)
setMaxWriteBufferNumber(4)
setUseFsync(false)
setMaxOpenFiles(-1)
BlockBasedTableConfig.setBlockCacheSize(256 MBytes)
BlockBasedTableConfigsetBlockSize(128 KBytes)
Note
Because Flink does not rely on RocksDB data on disk for recovery, there is no need to sync data to stable storage.
Pre-defined options for Flash SSDs.
This constant configures RocksDB with some options that lead empirically to better performance when the machines executing the system use SSDs.
The following options are set:
setIncreaseParallelism(4)
setUseFsync(false)
setDisableDataSync(true)
setMaxOpenFiles(-1)
Note
Because Flink does not rely on RocksDB data on disk for recovery, there is no need to sync data to stable storage.
-
DEFAULT
= 0¶
-
FLASH_SSD_OPTIMIZED
= 3¶
-
SPINNING_DISK_OPTIMIZED
= 1¶
-
SPINNING_DISK_OPTIMIZED_HIGH_MEM
= 2¶
-
class
pyflink.datastream.
ExternalizedCheckpointCleanup
[source]¶ Bases:
enum.Enum
Cleanup behaviour for externalized checkpoints when the job is cancelled.
Delete externalized checkpoints on job cancellation.
All checkpoint state will be deleted when you cancel the owning job, both the meta data and actual program state. Therefore, you cannot resume from externalized checkpoints after the job has been cancelled.
Note that checkpoint state is always kept if the job terminates with state
FAILED
.Retain externalized checkpoints on job cancellation.
All checkpoint state is kept when you cancel the owning job. You have to manually delete both the checkpoint meta data and actual program state after cancelling the job.
Note that checkpoint state is always kept if the job terminates with state
FAILED
.-
DELETE_ON_CANCELLATION
= 0¶
-
RETAIN_ON_CANCELLATION
= 1¶
-
-
class
pyflink.datastream.
TimeCharacteristic
[source]¶ Bases:
enum.Enum
The time characteristic defines how the system determines time for time-dependent order and operations that depend on time (such as time windows).
Processing time for operators means that the operator uses the system clock of the machine to determine the current time of the data stream. Processing-time windows trigger based on wall-clock time and include whatever elements happen to have arrived at the operator at that point in time.
Using processing time for window operations results in general in quite non-deterministic results, because the contents of the windows depends on the speed in which elements arrive. It is, however, the cheapest method of forming windows and the method that introduces the least latency.
Ingestion time means that the time of each individual element in the stream is determined when the element enters the Flink streaming data flow. Operations like windows group the elements based on that time, meaning that processing speed within the streaming dataflow does not affect windowing, but only the speed at which sources receive elements.
Ingestion time is often a good compromise between processing time and event time. It does not need any special manual form of watermark generation, and events are typically not too much out-or-order when they arrive at operators; in fact, out-of-orderness can only be introduced by streaming shuffles or split/join/union operations. The fact that elements are not very much out-of-order means that the latency increase is moderate, compared to event time.
Event time means that the time of each individual element in the stream (also called event) is determined by the event’s individual custom timestamp. These timestamps either exist in the elements from before they entered the Flink streaming dataflow, or are user-assigned at the sources. The big implication of this is that it allows for elements to arrive in the sources and in all operators out of order, meaning that elements with earlier timestamps may arrive after elements with later timestamps.
Operators that window or order data with respect to event time must buffer data until they can be sure that all timestamps for a certain time interval have been received. This is handled by the so called “time watermarks”.
Operations based on event time are very predictable - the result of windowing operations is typically identical no matter when the window is executed and how fast the streams operate. At the same time, the buffering and tracking of event time is also costlier than operating with processing time, and typically also introduces more latency. The amount of extra cost depends mostly on how much out of order the elements arrive, i.e., how long the time span between the arrival of early and late elements is. With respect to the “time watermarks”, this means that the cost typically depends on how early or late the watermarks can be generated for their timestamp.
In relation to
IngestionTime
, the event time is similar, but refers the the event’s original time, rather than the time assigned at the data source. Practically, that means that event time has generally more meaning, but also that it takes longer to determine that all elements for a certain time have arrived.-
EventTime
= 2¶
-
IngestionTime
= 1¶
-
ProcessingTime
= 0¶
-
-
class
pyflink.datastream.
TimeDomain
[source]¶ Bases:
enum.Enum
TimeDomain specifies whether a firing timer is based on event time or processing time.
EVENT_TIME: Time is based on timestamp of events. PROCESSING_TIME: Time is based on the current processing-time of a machine where processing happens.
-
EVENT_TIME
= 0¶
-
PROCESSING_TIME
= 1¶
-
-
class
pyflink.datastream.
ProcessFunction
[source]¶ Bases:
pyflink.datastream.functions.Function
A function that process elements of a stream.
For every element in the input stream process_element(value, ctx, out) is invoked. This can produce zero or more elements as output. Implementations can also query the time and set timers through the provided Context. For firing timers on_timer(long, ctx, out) will be invoked. This can again produce zero or more elements as output and register further timers.
Note that access to keyed state and timers (which are also scoped to a key) is only available if the ProcessFunction is applied on a KeyedStream.
-
class
Context
[source]¶ Bases:
abc.ABC
Information available in an invocation of process_element(value, ctx, out) or on_timer(value, ctx, out).
-
abstract
process_element
(value, ctx: pyflink.datastream.functions.ProcessFunction.Context)[source]¶ Process one element from the input stream.
This function can output zero or more elements using the Collector parameter and also update internal state or set timers using the Context parameter.
- Parameters
value – The input value.
ctx – A Context that allows querying the timestamp of the element and getting a TimerService for registering timers and querying the time. The context is only valid during the invocation of this method, do not store it.
-
class
-
class
pyflink.datastream.
TimerService
[source]¶ Bases:
abc.ABC
Interface for working with time and timers.
-
delete_event_time_timer
(time: int)[source]¶ Deletes the event-time timer with the given trigger time. This method has only an effect if such a timer was previously registered and did not already expire.
Timers can internally be scoped to keys and/or windows. When you delete a timer, it is removed from the current keyed context.
- Parameters
time – The given trigger time of timer to be deleted.
-
delete_processing_time_timer
(time: int)[source]¶ Deletes the processing-time timer with the given trigger time. This method has only an effect if such a timer was previously registered and did not already expire.
Timers can internally be scoped to keys and/or windows. When you delete a timer, it is removed from the current keyed context.
- Parameters
time – The given trigger time of timer to be deleted.
-
abstract
register_event_time_timer
(time: int)[source]¶ Registers a timer tobe fired when the event time watermark passes the given time.
Timers can internally be scoped to keys and/or windows. When you set a timer in a keyed context, such as in an operation on KeyedStream then that context will so be active when you receive the timer notification.
- Parameters
time – The event time of the timer to be registered.
-
abstract
register_processing_time_timer
(time: int)[source]¶ Registers a timer to be fired when processing time passes the given time.
Timers can internally be scoped to keys and/or windows. When you set a timer in a keyed context, such as in an operation on KeyedStream then that context will so be active when you receive the timer notification.
- Parameters
time – The processing time of the timer to be registered.
-