pyflink.common package¶
Module contents¶
Common classes used by both Flink DataStream API and Table API:
Configuration
: Lightweight configuration object which stores key/value pairs.
ExecutionConfig
: A config to define the behavior of the program execution.
ExecutionMode
: Specifies how a batch program is executed in terms of data exchange: pipelining or batched.
TypeInformation
: TypeInformation is the core class of Flink’s type system. FLink requires a type information for all types that are used as input or return type of a user function.
Types
: Contains utilities to access theTypeInformation
of the most common types for which Flink has provided built-in implementation.
WatermarkStrategy
: Defines how to generate Watermarks in the stream sources.
Row
: A row is a fixed-length, null-aware composite type for storing multiple values in a deterministic field order.
SerializationSchema
: Base class to describes how to turn a data object into a different serialized representation. Most data sinks (for example Apache Kafka) require the data to be handed to them in a specific format (for example as byte strings). SeeJsonRowSerializationSchema
,JsonRowDeserializationSchema
,CsvRowSerializationSchema
,CsvRowDeserializationSchema
,AvroRowSerializationSchema
,AvroRowDeserializationSchema
andSimpleStringSchema
for more details.
-
class
pyflink.common.
AssignerWithPeriodicWatermarksWrapper
(j_assigner_with_periodic_watermarks)[source]¶ Bases:
object
The AssignerWithPeriodicWatermarks assigns event time timestamps to elements, and generates low watermarks that signal event time progress within the stream. These timestamps and watermarks are used by functions and operators that operate on event time, for example event time windows.
-
class
pyflink.common.
CompletableFuture
(j_completable_future, py_class=None)[source]¶ Bases:
object
A Future that may be explicitly completed (setting its value and status), supporting dependent functions and actions that trigger upon its completion.
When two or more threads attempt to set_result, set_exception, or cancel a CompletableFuture, only one of them succeeds.
New in version 1.11.0.
-
cancel
() → bool[source]¶ Completes this CompletableFuture if not already completed.
- Returns
true if this task is now cancelled
New in version 1.11.0.
-
cancelled
() → bool[source]¶ Returns true if this CompletableFuture was cancelled before it completed normally.
New in version 1.11.0.
-
done
() → bool[source]¶ Returns true if completed in any fashion: normally, exceptionally, or via cancellation.
New in version 1.11.0.
-
-
class
pyflink.common.
ConfigOption
(*args, **kwds)[source]¶ Bases:
typing.Generic
A {@code ConfigOption} describes a configuration parameter. It encapsulates the configuration key, deprecated older versions of the key, and an optional default value for the configuration parameter.
{@code ConfigOptions} are built via the ConfigOptions class. Once created, a config option is immutable.
-
class
pyflink.common.
ConfigOptions
(j_config_options)[source]¶ Bases:
object
{@code ConfigOptions} are used to build a
ConfigOption
. The option is typically built in one of the following patterns:Example:
# simple string-valued option with a default value >>> ConfigOptions.key("tmp.dir").string_type().default_value("/tmp") # simple integer-valued option with a default value >>> ConfigOptions.key("application.parallelism").int_type().default_value(100) # option with no default value >>> ConfigOptions.key("user.name").string_type().no_default_value()
-
class
OptionBuilder
(j_option_builder)[source]¶ Bases:
object
-
boolean_type
() → pyflink.common.config_options.ConfigOptions.TypedConfigOptionBuilder[bool][source]¶ Defines that the value of the option should be of bool type.
-
double_type
() → pyflink.common.config_options.ConfigOptions.TypedConfigOptionBuilder[float][source]¶ Defines that the value of the option should be of float Double} type (8-byte double precision floating point number).
-
float_type
() → pyflink.common.config_options.ConfigOptions.TypedConfigOptionBuilder[float][source]¶ Defines that the value of the option should be of float type (4-byte single precision floating point number).
-
int_type
() → pyflink.common.config_options.ConfigOptions.TypedConfigOptionBuilder[int][source]¶ Defines that the value of the option should be of int type (from -2,147,483,648 to 2,147,483,647).
-
-
class
-
class
pyflink.common.
Configuration
(other: Optional[pyflink.common.configuration.Configuration] = None, j_configuration: py4j.java_gateway.JavaObject = None)[source]¶ Bases:
object
Lightweight configuration object which stores key/value pairs.
-
add_all
(other: pyflink.common.configuration.Configuration, prefix: str = None) → pyflink.common.configuration.Configuration[source]¶ Adds all entries from the given configuration into this configuration. The keys are prepended with the given prefix if exist.
- Parameters
other – The configuration whose entries are added to this configuration.
prefix – Optional, the prefix to prepend.
-
add_all_to_dict
(target_dict: Dict)[source]¶ Adds all entries in this configuration to the given dict.
- Parameters
target_dict – The dict to be updated.
-
contains_key
(key: str) → bool[source]¶ Checks whether there is an entry with the specified key.
- Parameters
key – Key of entry.
- Returns
True if the key is stored, false otherwise.
-
get_boolean
(key: str, default_value: bool) → bool[source]¶ Returns the value associated with the given key as a boolean.
- Parameters
key – The key pointing to the associated value.
default_value – The default value which is returned in case there is no value associated with the given key.
- Returns
The (default) value associated with the given key.
-
get_bytearray
(key: str, default_value: bytearray) → bytearray[source]¶ Returns the value associated with the given key as a byte array.
- Parameters
key – The key pointing to the associated value.
default_value – The default value which is returned in case there is no value associated with the given key.
- Returns
The (default) value associated with the given key.
-
get_float
(key: str, default_value: float) → float[source]¶ Returns the value associated with the given key as a float.
- Parameters
key – The key pointing to the associated value.
default_value – The default value which is returned in case there is no value associated with the given key.
- Returns
The (default) value associated with the given key.
-
get_integer
(key: str, default_value: int) → int[source]¶ Returns the value associated with the given key as an integer.
- Parameters
key – The key pointing to the associated value.
default_value – The default value which is returned in case there is no value associated with the given key.
- Returns
The (default) value associated with the given key.
-
get_string
(key: str, default_value: str) → str[source]¶ Returns the value associated with the given key as a string.
- Parameters
key – The key pointing to the associated value.
default_value – The default value which is returned in case there is no value associated with the given key.
- Returns
The (default) value associated with the given key.
-
key_set
() → Set[str][source]¶ Returns the keys of all key/value pairs stored inside this configuration object.
- Returns
The keys of all key/value pairs stored inside this configuration object.
-
remove_config
(key: str) → bool[source]¶ Removes given config key from the configuration.
- Parameters
key – The config key to remove.
- Returns
True if config has been removed, false otherwise.
-
set_boolean
(key: str, value: bool) → pyflink.common.configuration.Configuration[source]¶ Adds the given key/value pair to the configuration object.
- Parameters
key – The key of the key/value pair to be added.
value – The value of the key/value pair to be added.
-
set_bytearray
(key: str, value: bytearray) → pyflink.common.configuration.Configuration[source]¶ Adds the given byte array to the configuration object.
- Parameters
key – The key under which the bytes are added.
value – The byte array to be added.
-
set_float
(key: str, value: float) → pyflink.common.configuration.Configuration[source]¶ Adds the given key/value pair to the configuration object.
- Parameters
key – The key of the key/value pair to be added.
value – The value of the key/value pair to be added.
-
set_integer
(key: str, value: int) → pyflink.common.configuration.Configuration[source]¶ Adds the given key/value pair to the configuration object.
- Parameters
key – The key of the key/value pair to be added.
value – The value of the key/value pair to be added.
-
-
class
pyflink.common.
DeserializationSchema
(j_deserialization_schema=None)[source]¶ Bases:
object
Base class for DeserializationSchema. The deserialization schema describes how to turn the byte messages delivered by certain data sources (for example Apache Kafka) into data types (Java/ Scala objects) that are processed by Flink.
In addition, the DeserializationSchema describes the produced type which lets Flink create internal serializers and structures to handle the type.
-
class
pyflink.common.
Duration
(j_duration)[source]¶ Bases:
object
A time-based amount of time, such as ‘34.5 seconds’.
-
class
pyflink.common.
Encoder
(j_encoder)[source]¶ Bases:
object
Encoder is used by the file sink to perform the actual writing of the incoming elements to the files in a bucket.
-
class
pyflink.common.
ExecutionConfig
(j_execution_config)[source]¶ Bases:
object
A config to define the behavior of the program execution. It allows to define (among other options) the following settings:
The default parallelism of the program, i.e., how many parallel tasks to use for all functions that do not define a specific value directly.
The number of retries in the case of failed executions.
The delay between execution retries.
The
ExecutionMode
of the program: Batch or Pipelined. The default execution mode isExecutionMode.PIPELINED
Enabling or disabling the “closure cleaner”. The closure cleaner pre-processes the implementations of functions. In case they are (anonymous) inner classes, it removes unused references to the enclosing class to fix certain serialization-related problems and to reduce the size of the closure.
The config allows to register types and serializers to increase the efficiency of handling generic types and POJOs. This is usually only needed when the functions return not only the types declared in their signature, but also subclasses of those types.
The flag value indicating use of the default parallelism. This value can be used to reset the parallelism back to the default state.
The flag value indicating an unknown or unset parallelism. This value is not a valid parallelism and indicates that the parallelism should remain unchanged.
-
PARALLELISM_DEFAULT
= -1¶
-
PARALLELISM_UNKNOWN
= -2¶
-
add_default_kryo_serializer
(type_class_name: str, serializer_class_name: str) → pyflink.common.execution_config.ExecutionConfig[source]¶ Adds a new Kryo default serializer to the Runtime.
Example:
>>> config.add_default_kryo_serializer("com.aaa.bbb.PojoClass", ... "com.aaa.bbb.Serializer")
- Parameters
type_class_name – The full-qualified java class name of the types serialized with the given serializer.
serializer_class_name – The full-qualified java class name of the serializer to use.
-
disable_auto_generated_uids
() → pyflink.common.execution_config.ExecutionConfig[source]¶ Disables auto-generated UIDs. Forces users to manually specify UIDs on DataStream applications.
It is highly recommended that users specify UIDs before deploying to production since they are used to match state in savepoints to operators in a job. Because auto-generated ID’s are likely to change when modifying a job, specifying custom IDs allow an application to evolve overtime without discarding state.
-
disable_auto_type_registration
() → pyflink.common.execution_config.ExecutionConfig[source]¶ Control whether Flink is automatically registering all types in the user programs with Kryo.
-
disable_closure_cleaner
() → pyflink.common.execution_config.ExecutionConfig[source]¶ Disables the ClosureCleaner.
See also
- Returns
This object.
-
disable_force_avro
() → pyflink.common.execution_config.ExecutionConfig[source]¶ Disables the Apache Avro serializer as the forced serializer for POJOs.
-
disable_force_kryo
() → pyflink.common.execution_config.ExecutionConfig[source]¶ Disable use of Kryo serializer for all POJOs.
-
disable_generic_types
() → pyflink.common.execution_config.ExecutionConfig[source]¶ Disables the use of generic types (types that would be serialized via Kryo). If this option is used, Flink will throw an
UnsupportedOperationException
whenever it encounters a data type that would go through Kryo for serialization.Disabling generic types can be helpful to eagerly find and eliminate the use of types that would go through Kryo serialization during runtime. Rather than checking types individually, using this option will throw exceptions eagerly in the places where generic types are used.
Important: We recommend to use this option only during development and pre-production phases, not during actual production use. The application program and/or the input data may be such that new, previously unseen, types occur at some point. In that case, setting this option would cause the program to fail.
See also
-
disable_object_reuse
() → pyflink.common.execution_config.ExecutionConfig[source]¶ Disables reusing objects that Flink internally uses for deserialization and passing data to user-code functions.
See also
- Returns
This object.
-
enable_auto_generated_uids
() → pyflink.common.execution_config.ExecutionConfig[source]¶ Enables the Flink runtime to auto-generate UID’s for operators.
See also
-
enable_closure_cleaner
() → pyflink.common.execution_config.ExecutionConfig[source]¶ Enables the ClosureCleaner. This analyzes user code functions and sets fields to null that are not used. This will in most cases make closures or anonymous inner classes serializable that where not serializable due to some Scala or Java implementation artifact. User code must be serializable because it needs to be sent to worker nodes.
- Returns
This object.
-
enable_force_avro
() → pyflink.common.execution_config.ExecutionConfig[source]¶ Forces Flink to use the Apache Avro serializer for POJOs.
Important: Make sure to include the flink-avro module.
-
enable_force_kryo
() → pyflink.common.execution_config.ExecutionConfig[source]¶ Force TypeExtractor to use Kryo serializer for POJOS even though we could analyze as POJO. In some cases this might be preferable. For example, when using interfaces with subclasses that cannot be analyzed as POJO.
-
enable_generic_types
() → pyflink.common.execution_config.ExecutionConfig[source]¶ Enables the use generic types which are serialized via Kryo.
Generic types are enabled by default.
See also
-
enable_object_reuse
() → pyflink.common.execution_config.ExecutionConfig[source]¶ Enables reusing objects that Flink internally uses for deserialization and passing data to user-code functions. Keep in mind that this can lead to bugs when the user-code function of an operation is not aware of this behaviour.
- Returns
This object.
-
get_auto_watermark_interval
() → int[source]¶ Returns the interval of the automatic watermark emission.
See also
- Returns
The integer value interval in milliseconds of the automatic watermark emission.
-
get_default_input_dependency_constraint
() → pyflink.common.input_dependency_constraint.InputDependencyConstraint[source]¶ Gets the default input dependency constraint for vertex scheduling. It indicates when a task should be scheduled considering its inputs status.
The default constraint is
InputDependencyConstraint.ANY
.- Returns
The input dependency constraint of this job. The possible constraints are
InputDependencyConstraint.ANY
andInputDependencyConstraint.ALL
.
Note
Deprecated in 1.13.
InputDependencyConstraint
is not used anymore in the current scheduler implementations.
-
get_default_kryo_serializer_classes
() → Dict[str, str][source]¶ Returns the registered default Kryo Serializer classes.
- Returns
The dict which the keys are full-qualified java class names of the registered types and the values are full-qualified java class names of the Kryo default Serializer classes.
-
get_execution_mode
() → pyflink.common.execution_mode.ExecutionMode[source]¶ Gets the execution mode used to execute the program. The execution mode defines whether data exchanges are performed in a batch or on a pipelined manner.
The default execution mode is
ExecutionMode.PIPELINED
.See also
- Returns
The execution mode for the program.
-
get_global_job_parameters
() → Dict[str, str][source]¶ Gets current configuration dict.
- Returns
The configuration dict.
-
get_latency_tracking_interval
() → int[source]¶ Returns the latency tracking interval.
- Returns
The latency tracking interval in milliseconds.
-
get_max_parallelism
() → int[source]¶ Gets the maximum degree of parallelism defined for the program.
The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also defines the number of key groups used for partitioned state.
- Returns
Maximum degree of parallelism.
-
get_parallelism
() → int[source]¶ Gets the parallelism with which operation are executed by default. Operations can individually override this value to use a specific parallelism.
Other operations may need to run with a different parallelism - for example calling a reduce operation over the entire data set will involve an operation that runs with a parallelism of one (the final reduce to the single result value).
- Returns
The parallelism used by operations, unless they override that value. This method returns
ExecutionConfig.PARALLELISM_DEFAULT
if the environment’s default parallelism should be used.
-
get_registered_kryo_types
() → List[str][source]¶ Returns the registered Kryo types.
- Returns
The list of full-qualified java class names of the registered Kryo types.
-
get_registered_pojo_types
() → List[str][source]¶ Returns the registered POJO types.
- Returns
The list of full-qualified java class names of the registered POJO types.
-
get_registered_types_with_kryo_serializer_classes
() → Dict[str, str][source]¶ Returns the registered types with their Kryo Serializer classes.
- Returns
The dict which the keys are full-qualified java class names of the registered types and the values are full-qualified java class names of the Kryo Serializer classes.
-
get_restart_strategy
() → pyflink.common.restart_strategy.RestartStrategyConfiguration[source]¶ Returns the restart strategy which has been set for the current job.
See also
- Returns
The specified restart configuration.
-
get_task_cancellation_interval
() → int[source]¶ Gets the interval (in milliseconds) between consecutive attempts to cancel a running task.
- Returns
The integer value interval in milliseconds.
-
get_task_cancellation_timeout
() → int[source]¶ Returns the timeout (in milliseconds) after which an ongoing task cancellation leads to a fatal TaskManager error.
The value
0
means that the timeout is disabled. In this case a stuck cancellation will not lead to a fatal error.- Returns
The timeout in milliseconds.
-
has_auto_generated_uids_enabled
() → bool[source]¶ Checks whether auto generated UIDs are supported.
Auto generated UIDs are enabled by default.
See also
See also
- Returns
Boolean value that represent whether auto generated UIDs are supported.
-
has_generic_types_disabled
() → bool[source]¶ Checks whether generic types are supported. Generic types are types that go through Kryo during serialization.
Generic types are enabled by default.
See also
See also
- Returns
Boolean value that represent whether the generic types are supported.
-
is_auto_type_registration_disabled
() → bool[source]¶ Returns whether Flink is automatically registering all types in the user programs with Kryo.
- Returns
True
means auto type registration is disabled andFalse
means enabled.
-
is_closure_cleaner_enabled
() → bool[source]¶ Returns whether the ClosureCleaner is enabled.
See also
- Returns
True
means enable andFalse
means disable.
-
is_force_avro_enabled
() → bool[source]¶ Returns whether the Apache Avro is the default serializer for POJOs.
- Returns
Boolean value that represent whether the Apache Avro is the default serializer for POJOs.
-
is_force_kryo_enabled
() → bool[source]¶ - Returns
Boolean value that represent whether the usage of Kryo serializer for all POJOs is enabled.
-
is_object_reuse_enabled
() → bool[source]¶ Returns whether object reuse has been enabled or disabled.
See also
- Returns
Boolean value that represent whether object reuse has been enabled or disabled.
-
is_use_snapshot_compression
() → bool[source]¶ Returns whether he compression (snappy) for keyed state in full checkpoints and savepoints is enabled.
- Returns
True
means enabled andFalse
means disabled.
-
register_kryo_type
(type_class_name: str) → pyflink.common.execution_config.ExecutionConfig[source]¶ Registers the given type with the serialization stack. If the type is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the type ends up being serialized with Kryo, then it will be registered at Kryo to make sure that only tags are written.
Example:
>>> config.register_kryo_type("com.aaa.bbb.KryoClass")
- Parameters
type_class_name – The full-qualified java class name of the type to register.
-
register_pojo_type
(type_class_name: str) → pyflink.common.execution_config.ExecutionConfig[source]¶ Registers the given type with the serialization stack. If the type is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the type ends up being serialized with Kryo, then it will be registered at Kryo to make sure that only tags are written.
Example:
>>> config.register_pojo_type("com.aaa.bbb.PojoClass")
- Parameters
type_class_name – The full-qualified java class name of the type to register.
-
register_type_with_kryo_serializer
(type_class_name: str, serializer_class_name: str) → pyflink.common.execution_config.ExecutionConfig[source]¶ Registers the given Serializer via its class as a serializer for the given type at the KryoSerializer.
Example:
>>> config.register_type_with_kryo_serializer("com.aaa.bbb.PojoClass", ... "com.aaa.bbb.Serializer")
- Parameters
type_class_name – The full-qualified java class name of the types serialized with the given serializer.
serializer_class_name – The full-qualified java class name of the serializer to use.
-
set_auto_watermark_interval
(interval: int) → pyflink.common.execution_config.ExecutionConfig[source]¶ Sets the interval of the automatic watermark emission. Watermarks are used throughout the streaming system to keep track of the progress of time. They are used, for example, for time based windowing.
- Parameters
interval – The integer value interval between watermarks in milliseconds.
- Returns
This object.
-
set_default_input_dependency_constraint
(input_dependency_constraint: pyflink.common.input_dependency_constraint.InputDependencyConstraint) → pyflink.common.execution_config.ExecutionConfig[source]¶ Sets the default input dependency constraint for vertex scheduling. It indicates when a task should be scheduled considering its inputs status.
The default constraint is
InputDependencyConstraint.ANY
.Example:
>>> config.set_default_input_dependency_constraint(InputDependencyConstraint.ALL)
- Parameters
input_dependency_constraint – The input dependency constraint. The constraints could be
InputDependencyConstraint.ANY
orInputDependencyConstraint.ALL
.
Note
Deprecated in 1.13.
InputDependencyConstraint
is not used anymore in the current scheduler implementations.
-
set_execution_mode
(execution_mode: pyflink.common.execution_mode.ExecutionMode) → pyflink.common.execution_config.ExecutionConfig[source]¶ Sets the execution mode to execute the program. The execution mode defines whether data exchanges are performed in a batch or on a pipelined manner.
The default execution mode is
ExecutionMode.PIPELINED
.Example:
>>> config.set_execution_mode(ExecutionMode.BATCH)
- Parameters
execution_mode – The execution mode to use. The execution mode could be
ExecutionMode.PIPELINED
,ExecutionMode.PIPELINED_FORCED
,ExecutionMode.BATCH
orExecutionMode.BATCH_FORCED
.
-
set_global_job_parameters
(global_job_parameters_dict: Dict) → pyflink.common.execution_config.ExecutionConfig[source]¶ Register a custom, serializable user configuration dict.
Example:
>>> config.set_global_job_parameters({"environment.checkpoint_interval": "1000"})
- Parameters
global_job_parameters_dict – Custom user configuration dict.
-
set_latency_tracking_interval
(interval: int) → pyflink.common.execution_config.ExecutionConfig[source]¶ Interval for sending latency tracking marks from the sources to the sinks.
Flink will send latency tracking marks from the sources at the specified interval. Setting a tracking interval <= 0 disables the latency tracking.
- Parameters
interval – Integer value interval in milliseconds.
- Returns
This object.
-
set_max_parallelism
(max_parallelism: int) → pyflink.common.execution_config.ExecutionConfig[source]¶ Sets the maximum degree of parallelism defined for the program.
The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also defines the number of key groups used for partitioned state.
- Parameters
max_parallelism – Maximum degree of parallelism to be used for the program.
-
set_parallelism
(parallelism: int) → pyflink.common.execution_config.ExecutionConfig[source]¶ Sets the parallelism for operations executed through this environment. Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run with x parallel instances.
This method overrides the default parallelism for this environment. The local execution environment uses by default a value equal to the number of hardware contexts (CPU cores / threads). When executing the program via the command line client from a JAR/Python file, the default parallelism is the one configured for that setup.
- Parameters
parallelism – The parallelism to use.
- Returns
This object.
-
set_restart_strategy
(restart_strategy_configuration: pyflink.common.restart_strategy.RestartStrategyConfiguration) → pyflink.common.execution_config.ExecutionConfig[source]¶ Sets the restart strategy to be used for recovery.
>>> config = env.get_config() >>> config.set_restart_strategy(RestartStrategies.fixed_delay_restart(10, 1000))
The restart strategy configurations are all created from
RestartStrategies
.- Parameters
restart_strategy_configuration – Configuration defining the restart strategy to use.
-
set_task_cancellation_interval
(interval: int) → pyflink.common.execution_config.ExecutionConfig[source]¶ Sets the configuration parameter specifying the interval (in milliseconds) between consecutive attempts to cancel a running task.
- Parameters
interval – The integer value interval in milliseconds.
- Returns
This object.
-
set_task_cancellation_timeout
(timeout: int) → pyflink.common.execution_config.ExecutionConfig[source]¶ Sets the timeout (in milliseconds) after which an ongoing task cancellation is considered failed, leading to a fatal TaskManager error.
The cluster default is configured via
TaskManagerOptions#TASK_CANCELLATION_TIMEOUT
.The value
0
disables the timeout. In this case a stuck cancellation will not lead to a fatal error.- Parameters
timeout – The task cancellation timeout (in milliseconds).
- Returns
This object.
-
set_use_snapshot_compression
(use_snapshot_compression: bool) → pyflink.common.execution_config.ExecutionConfig[source]¶ Control whether the compression (snappy) for keyed state in full checkpoints and savepoints is enabled.
- Parameters
use_snapshot_compression –
True
means enabled andFalse
means disabled.
-
class
pyflink.common.
ExecutionMode
(value)[source]¶ Bases:
enum.Enum
The execution mode specifies how a batch program is executed in terms of data exchange: pipelining or batched.
Executes the program in a pipelined fashion (including shuffles and broadcasts), except for data exchanges that are susceptible to deadlocks when pipelining. These data exchanges are performed in a batch manner.
An example of situations that are susceptible to deadlocks (when executed in a pipelined manner) are data flows that branch (one data set consumed by multiple operations) and re-join later.
Executes the program in a pipelined fashion (including shuffles and broadcasts), including data exchanges that are susceptible to deadlocks when executed via pipelining.
Usually, PIPELINED is the preferable option, which pipelines most data exchanges and only uses batch data exchanges in situations that are susceptible to deadlocks.
This option should only be used with care and only in situations where the programmer is sure that the program is safe for full pipelining and that Flink was too conservative when choosing the batch exchange at a certain point.
This mode executes all shuffles and broadcasts in a batch fashion, while pipelining data between operations that exchange data only locally between one producer and one consumer.
This mode executes the program in a strict batch way, including all points where data is forwarded locally from one producer to one consumer. This mode is typically more expensive to execute than the BATCH mode. It does guarantee that no successive operations are ever executed concurrently.
-
BATCH
= 2¶
-
BATCH_FORCED
= 3¶
-
PIPELINED
= 0¶
-
PIPELINED_FORCED
= 1¶
-
-
class
pyflink.common.
InputDependencyConstraint
(value)[source]¶ Bases:
enum.Enum
This constraint indicates when a task should be scheduled considering its inputs status.
ANY
:Schedule the task if any input is consumable.
ALL
:Schedule the task if all the inputs are consumable.
-
ALL
= 1¶
-
ANY
= 0¶
-
-
class
pyflink.common.
Instant
(seconds, nanos)[source]¶ Bases:
object
An instantaneous point on the time-line. Similar to Java.time.Instant.
-
class
pyflink.common.
JobClient
(j_job_client)[source]¶ Bases:
object
A client that is scoped to a specific job.
New in version 1.11.0.
-
cancel
() → pyflink.common.completable_future.CompletableFuture[source]¶ Cancels the associated job.
- Returns
A CompletableFuture for canceling the associated job.
New in version 1.11.0.
-
get_accumulators
() → pyflink.common.completable_future.CompletableFuture[source]¶ Requests the accumulators of the associated job. Accumulators can be requested while it is running or after it has finished. The class loader is used to deserialize the incoming accumulator results.
- Parameters
class_loader – Class loader used to deserialize the incoming accumulator results.
- Returns
A CompletableFuture containing the accumulators of the associated job.
New in version 1.11.0.
-
get_job_execution_result
() → pyflink.common.completable_future.CompletableFuture[source]¶ Returns the JobExecutionResult result of the job execution of the submitted job.
- Returns
A CompletableFuture containing the JobExecutionResult result of the job execution.
New in version 1.11.0.
-
get_job_id
() → pyflink.common.job_id.JobID[source]¶ Returns the JobID that uniquely identifies the job this client is scoped to.
- Returns
JobID, or null if the job has been executed on a runtime without JobIDs or if the execution failed.
New in version 1.11.0.
-
get_job_status
() → pyflink.common.completable_future.CompletableFuture[source]¶ Requests the JobStatus of the associated job.
- Returns
A CompletableFuture containing the JobStatus of the associated job.
New in version 1.11.0.
-
stop_with_savepoint
(advance_to_end_of_event_time: bool, savepoint_directory: str = None) → pyflink.common.completable_future.CompletableFuture[source]¶ Stops the associated job on Flink cluster.
Stopping works only for streaming programs. Be aware, that the job might continue to run for a while after sending the stop command, because after sources stopped to emit data all operators need to finish processing.
- Parameters
advance_to_end_of_event_time – Flag indicating if the source should inject a MAX_WATERMARK in the pipeline.
savepoint_directory – Directory the savepoint should be written to.
- Returns
A CompletableFuture containing the path where the savepoint is located.
New in version 1.11.0.
-
trigger_savepoint
(savepoint_directory: str = None) → pyflink.common.completable_future.CompletableFuture[source]¶ Triggers a savepoint for the associated job. The savepoint will be written to the given savepoint directory.
- Parameters
savepoint_directory – Directory the savepoint should be written to.
- Returns
A CompletableFuture containing the path where the savepoint is located.
New in version 1.11.0.
-
-
class
pyflink.common.
JobExecutionResult
(j_job_execution_result)[source]¶ Bases:
object
The result of a job execution. Gives access to the execution time of the job, and to all accumulators created by this job.
New in version 1.11.0.
-
get_accumulator_result
(accumulator_name: str)[source]¶ Gets the accumulator with the given name. Returns None, if no accumulator with that name was produced.
- Parameters
accumulator_name – The name of the accumulator.
- Returns
The value of the accumulator with the given name.
New in version 1.11.0.
-
get_all_accumulator_results
() → Dict[str, Any][source]¶ Gets all accumulators produced by the job. The map contains the accumulators as mappings from the accumulator name to the accumulator value.
- Returns
The dict which the keys are names of the accumulator and the values are values of the accumulator produced by the job.
New in version 1.11.0.
-
-
class
pyflink.common.
JobID
(j_job_id)[source]¶ Bases:
object
Unique (at least statistically unique) identifier for a Flink Job. Jobs in Flink correspond to dataflow graphs.
Jobs act simultaneously as sessions, because jobs can be created and submitted incrementally in different parts. Newer fragments of a graph can be attached to existing graphs, thereby extending the current data flow graphs.
New in version 1.11.0.
-
class
pyflink.common.
JobStatus
(value)[source]¶ Bases:
enum.Enum
Possible states of a job once it has been accepted by the job manager.
Job is newly created, no task has started to run.
Some tasks are scheduled or running, some may be pending, some may be finished.
The job has failed and is currently waiting for the cleanup to complete.
The job has failed with a non-recoverable task failure.
Job is being cancelled.
Job has been cancelled.
All of the job’s tasks have successfully finished.
The job is currently undergoing a reset and total restart.
The job has been suspended which means that it has been stopped but not been removed from a potential HA job store.
The job is currently reconciling and waits for task execution report to recover state.
New in version 1.11.0.
-
CANCELED
= 5¶
-
CANCELLING
= 4¶
-
CREATED
= 0¶
-
FAILED
= 3¶
-
FAILING
= 2¶
-
FINISHED
= 6¶
-
RECONCILING
= 9¶
-
RESTARTING
= 7¶
-
RUNNING
= 1¶
-
SUSPENDED
= 8¶
-
is_globally_terminal_state
() → bool[source]¶ Checks whether this state is <i>globally terminal</i>. A globally terminal job is complete and cannot fail any more and will not be restarted or recovered by another standby master node.
When a globally terminal state has been reached, all recovery data for the job is dropped from the high-availability services.
- Returns
True
if this job status is globally terminal,False
otherwise.
New in version 1.11.0.
-
is_terminal_state
() → bool[source]¶ Checks whether this state is locally terminal. Locally terminal refers to the state of a job’s execution graph within an executing JobManager. If the execution graph is locally terminal, the JobManager will not continue executing or recovering the job.
The only state that is locally terminal, but not globally terminal is SUSPENDED, which is typically entered when the executing JobManager loses its leader status.
- Returns
True
if this job status is terminal,False
otherwise.
New in version 1.11.0.
-
-
class
pyflink.common.
RestartStrategies
[source]¶ Bases:
object
This class defines methods to generate RestartStrategyConfigurations. These configurations are used to create RestartStrategies at runtime.
The RestartStrategyConfigurations are used to decouple the core module from the runtime module.
-
class
FailureRateRestartStrategyConfiguration
(max_failure_rate=None, failure_interval=None, delay_between_attempts_interval=None, j_restart_strategy=None)[source]¶ Bases:
pyflink.common.restart_strategy.RestartStrategyConfiguration
Configuration representing a failure rate restart strategy.
-
class
FallbackRestartStrategyConfiguration
(j_restart_strategy=None)[source]¶ Bases:
pyflink.common.restart_strategy.RestartStrategyConfiguration
Restart strategy configuration that could be used by jobs to use cluster level restart strategy. Useful especially when one has a custom implementation of restart strategy set via flink-conf.yaml.
-
class
FixedDelayRestartStrategyConfiguration
(restart_attempts=None, delay_between_attempts_interval=None, j_restart_strategy=None)[source]¶ Bases:
pyflink.common.restart_strategy.RestartStrategyConfiguration
Configuration representing a fixed delay restart strategy.
-
class
NoRestartStrategyConfiguration
(j_restart_strategy=None)[source]¶ Bases:
pyflink.common.restart_strategy.RestartStrategyConfiguration
Configuration representing no restart strategy.
-
static
failure_rate_restart
(failure_rate: int, failure_interval: int, delay_interval: int) → FailureRateRestartStrategyConfiguration[source]¶ Generates a FailureRateRestartStrategyConfiguration.
- Parameters
failure_rate – Maximum number of restarts in given interval
failure_interval
before failing a job.failure_interval – Time interval for failures, the input could be integer value in milliseconds or datetime.timedelta object.
delay_interval – Delay in-between restart attempts, the input could be integer value in milliseconds or datetime.timedelta object.
-
static
fixed_delay_restart
(restart_attempts: int, delay_between_attempts: int) → FixedDelayRestartStrategyConfiguration[source]¶ Generates a FixedDelayRestartStrategyConfiguration.
- Parameters
restart_attempts – Number of restart attempts for the FixedDelayRestartStrategy.
delay_between_attempts – Delay in-between restart attempts for the FixedDelayRestartStrategy, the input could be integer value in milliseconds or datetime.timedelta object.
- Returns
-
class
-
class
pyflink.common.
RestartStrategyConfiguration
(j_restart_strategy_configuration)[source]¶ Bases:
object
Abstract configuration for restart strategies.
-
class
pyflink.common.
Row
(*args, **kwargs)[source]¶ Bases:
object
A row in Table. The fields in it can be accessed:
like attributes (
row.key
)like dictionary values (
row[key]
)
key in row
will search through row keys.Row can be used to create a row object by using named arguments, the fields will be sorted by names. It is not allowed to omit a named argument to represent the value is None or missing. This should be explicitly set to None in this case.
>>> row = Row(name="Alice", age=11) >>> row Row(age=11, name='Alice') >>> row['name'], row['age'] ('Alice', 11) >>> row.name, row.age ('Alice', 11) >>> 'name' in row True >>> 'wrong_key' in row False
Row can also be used to create another Row like class, then it could be used to create Row objects, such as
>>> Person = Row("name", "age") >>> Person <Row(name, age)> >>> 'name' in Person True >>> 'wrong_key' in Person False >>> Person("Alice", 11) Row(name='Alice', age=11)
-
as_dict
(recursive=False)[source]¶ Returns as a dict.
Example:
>>> Row(name="Alice", age=11).as_dict() == {'name': 'Alice', 'age': 11} True >>> row = Row(key=1, value=Row(name='a', age=2)) >>> row.as_dict() == {'key': 1, 'value': Row(age=2, name='a')} True >>> row.as_dict(True) == {'key': 1, 'value': {'name': 'a', 'age': 2}} True
- Parameters
recursive – turns the nested Row as dict (default: False).
-
class
pyflink.common.
RowKind
(value)[source]¶ Bases:
enum.Enum
An enumeration.
-
DELETE
= 3¶
-
INSERT
= 0¶
-
UPDATE_AFTER
= 2¶
-
UPDATE_BEFORE
= 1¶
-
-
class
pyflink.common.
SerializationSchema
(j_serialization_schema=None)[source]¶ Bases:
object
Base class for SerializationSchema. The serialization schema describes how to turn a data object into a different serialized representation. Most data sinks (for example Apache Kafka) require the data to be handed to them in a specific format (for example as byte strings).
-
class
pyflink.common.
SimpleStringSchema
(charset: str = 'UTF-8')[source]¶ Bases:
pyflink.common.serialization.SerializationSchema
,pyflink.common.serialization.DeserializationSchema
Very simple serialization/deserialization schema for strings. By default, the serializer uses ‘UTF-8’ for string/byte conversion.
-
class
pyflink.common.
Time
(milliseconds: int)[source]¶ Bases:
object
The definition of a time interval.
-
class
pyflink.common.
TypeInformation
[source]¶ Bases:
object
TypeInformation is the core class of Flink’s type system. FLink requires a type information for all types that are used as input or return type of a user function. This type information class acts as the tool to generate serializers and comparators, and to perform semantic checks such as whether the fields that are used as join/grouping keys actually exist.
The type information also bridges between the programming languages object model and a logical flat schema. It maps fields from the types to columns (fields) in a flat schema. Not all fields from a type are mapped to a separate fields in the flat schema and often, entire types are mapped to one field.. It is important to notice that the schema must hold for all instances of a type. For that reason, elements in lists and arrays are not assigned to individual fields, but the lists and arrays are considered to be one field in total, to account for different lengths in the arrays.
Basic types are indivisible and are considered as a single field.
Arrays and collections are one field.
Tuples represents as many fields as the class has fields.
To represent this properly, each type has an arity (the number of fields it contains directly), and a total number of fields (number of fields in the entire schema of this type, including nested types).
-
class
pyflink.common.
TypeSerializer
(*args, **kwds)[source]¶ Bases:
abc.ABC
,typing.Generic
This interface describes the methods that are required for a data type to be handled by the Flink runtime. Specifically, this interface contains the serialization and deserialization methods.
-
class
pyflink.common.
Types
[source]¶ Bases:
object
This class gives access to the type information of the most common types for which Flink has built-in serializers and comparators.
-
static
BASIC_ARRAY
(element_type: pyflink.common.typeinfo.TypeInformation) → pyflink.common.typeinfo.TypeInformation[source]¶ Returns type information for arrays of boxed primitive type (such as Integer[]).
- Parameters
element_type – element type of the array (e.g. Types.BOOLEAN(), Types.INT(), Types.DOUBLE())
-
static
BIG_DEC
() → pyflink.common.typeinfo.TypeInformation[source]¶ Returns type information for BigDecimal. Supports a None value.
-
static
BIG_INT
() → pyflink.common.typeinfo.TypeInformation[source]¶ Returns type information for BigInteger. Supports a None value.
-
static
BOOLEAN
() → pyflink.common.typeinfo.TypeInformation[source]¶ Returns type information for bool. Does not support a None value.
-
static
BYTE
() → pyflink.common.typeinfo.TypeInformation[source]¶ Returns type information for byte. Does not support a None value.
-
static
CHAR
() → pyflink.common.typeinfo.TypeInformation[source]¶ Returns type information for char. Does not support a None value.
-
static
DOUBLE
() → pyflink.common.typeinfo.TypeInformation[source]¶ Returns type information for double. Does not support a None value.
-
static
FLOAT
() → pyflink.common.typeinfo.TypeInformation[source]¶ Returns type information for float. Does not support a None value.
-
static
INSTANT
() → pyflink.common.typeinfo.TypeInformation[source]¶ Returns type information for Instant. Supports a None value.
-
static
INT
() → pyflink.common.typeinfo.TypeInformation[source]¶ Returns type information for int. Does not support a None value.
-
static
LIST
(element_type_info: pyflink.common.typeinfo.TypeInformation) → pyflink.common.typeinfo.TypeInformation[source]¶ A TypeInformation for the list type.
- Parameters
element_type_info – The type of the elements in the list
-
static
LONG
() → pyflink.common.typeinfo.TypeInformation[source]¶ Returns type information for long. Does not support a None value.
-
static
MAP
(key_type_info: pyflink.common.typeinfo.TypeInformation, value_type_info: pyflink.common.typeinfo.TypeInformation) → pyflink.common.typeinfo.TypeInformation[source]¶ Special TypeInformation used by MapStateDescriptor
- Parameters
key_type_info – Element type of key (e.g. Types.BOOLEAN(), Types.INT(), Types.DOUBLE())
value_type_info – Element type of value (e.g. Types.BOOLEAN(), Types.INT(), Types.DOUBLE())
-
static
OBJECT_ARRAY
(element_type: pyflink.common.typeinfo.TypeInformation) → pyflink.common.typeinfo.TypeInformation[source]¶ Returns type information for arrays of non-primitive types. The array itself must not be None. None values for elements are supported.
- Parameters
element_type – element type of the array
-
static
PICKLED_BYTE_ARRAY
() → pyflink.common.typeinfo.TypeInformation[source]¶ Returns type information which uses pickle for serialization/deserialization.
-
static
PRIMITIVE_ARRAY
(element_type: pyflink.common.typeinfo.TypeInformation)[source]¶ Returns type information for arrays of primitive type (such as byte[]). The array must not be null.
- Parameters
element_type – element type of the array (e.g. Types.BOOLEAN(), Types.INT(), Types.DOUBLE())
-
static
ROW
(field_types: List[pyflink.common.typeinfo.TypeInformation])[source]¶ Returns type information for Row with fields of the given types. A row itself must not be null.
- Parameters
field_types – the types of the row fields, e.g., Types.String(), Types.INT()
-
static
ROW_NAMED
(field_names: List[str], field_types: List[pyflink.common.typeinfo.TypeInformation])[source]¶ Returns type information for Row with fields of the given types and with given names. A row must not be null.
- Parameters
field_names – array of field names.
field_types – array of field types.
-
static
SHORT
() → pyflink.common.typeinfo.TypeInformation[source]¶ Returns type information for short. Does not support a None value.
-
static
SQL_DATE
() → pyflink.common.typeinfo.TypeInformation[source]¶ Returns type information for Date. Supports a None value.
-
static
SQL_TIME
() → pyflink.common.typeinfo.TypeInformation[source]¶ Returns type information for Time. Supports a None value.
-
static
SQL_TIMESTAMP
() → pyflink.common.typeinfo.TypeInformation[source]¶ Returns type information for Timestamp. Supports a None value.
-
static
-
class
pyflink.common.
WatermarkStrategy
(j_watermark_strategy)[source]¶ Bases:
object
The WatermarkStrategy defines how to generate Watermarks in the stream sources. The WatermarkStrategy is a builder/factory for the WatermarkGenerator that generates the watermarks and the TimestampAssigner which assigns the internal timestamp of a record.
The convenience methods, for example forBoundedOutOfOrderness(Duration), create a WatermarkStrategy for common built in strategies.
-
static
for_bounded_out_of_orderness
(max_out_of_orderness: pyflink.common.time.Duration) → pyflink.common.watermark_strategy.WatermarkStrategy[source]¶ Creates a watermark strategy for situations where records are out of order, but you can place an upper bound on how far the events are out of order. An out-of-order bound B means that once the an event with timestamp T was encountered, no events older than (T - B) will follow any more.
-
static
for_monotonous_timestamps
() → pyflink.common.watermark_strategy.WatermarkStrategy[source]¶ Creates a watermark strategy for situations with monotonously ascending timestamps.
The watermarks are generated periodically and tightly follow the latest timestamp in the data. The delay introduced by this strategy is mainly the periodic interval in which the watermarks are generated.
-
static
no_watermarks
() → pyflink.common.watermark_strategy.WatermarkStrategy[source]¶ Creates a watermark strategy that generates no watermarks at all. This may be useful in scenarios that do pure processing-time based stream processing.
New in version 1.16.0.
-
with_idleness
(idle_timeout: pyflink.common.time.Duration) → pyflink.common.watermark_strategy.WatermarkStrategy[source]¶ Creates a new enriched WatermarkStrategy that also does idleness detection in the created WatermarkGenerator.
Example:
>>> WatermarkStrategy \ ... .for_bounded_out_of_orderness(Duration.of_seconds(20)) \ ... .with_idleness(Duration.of_minutes(1))
- Parameters
idle_timeout – The idle timeout.
- Returns
A new WatermarkStrategy with idle detection configured.
-
with_timestamp_assigner
(timestamp_assigner: pyflink.common.watermark_strategy.TimestampAssigner) → pyflink.common.watermark_strategy.WatermarkStrategy[source]¶ Creates a new WatermarkStrategy that wraps this strategy but instead uses the given a TimestampAssigner by implementing TimestampAssigner interface.
Example:
>>> watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \ >>> .with_timestamp_assigner(MyTimestampAssigner())
- Parameters
timestamp_assigner – The given TimestampAssigner.
- Returns
A WaterMarkStrategy that wraps a TimestampAssigner.
-
with_watermark_alignment
(watermark_group: str, max_allowed_watermark_drift: pyflink.common.time.Duration, update_interval: Optional[pyflink.common.time.Duration] = None) → pyflink.common.watermark_strategy.WatermarkStrategy[source]¶ Creates a new
WatermarkStrategy
that configures the maximum watermark drift from other sources/tasks/partitions in the same watermark group. The group may contain completely independent sources (e.g. File and Kafka).Once configured Flink will “pause” consuming from a source/task/partition that is ahead of the emitted watermark in the group by more than the maxAllowedWatermarkDrift.
Example:
>>> WatermarkStrategy \ ... .for_bounded_out_of_orderness(Duration.of_seconds(20)) \ ... .with_watermark_alignment("alignment-group-1", Duration.of_seconds(20), ... Duration.of_seconds(1))
- Parameters
watermark_group – A group of sources to align watermarks
max_allowed_watermark_drift – Maximal drift, before we pause consuming from the source/task/partition
update_interval – How often tasks should notify coordinator about the current watermark and how often the coordinator should announce the maximal aligned watermark. If is None, default update interval (1000ms) is used.
- Returns
A new WatermarkStrategy with watermark alignment configured.
New in version 1.16.0.
-
static