State#

State#

ValueState(*args, **kwds)

State interface for partitioned single-value state.

AppendingState(*args, **kwds)

Base interface for partitioned state that supports adding elements and inspecting the current state.

MergingState(*args, **kwds)

Extension of AppendingState that allows merging of state.

ReducingState(*args, **kwds)

State interface for reducing state.

AggregatingState(*args, **kwds)

State interface for aggregating state, based on an AggregateFunction.

ListState(*args, **kwds)

State interface for partitioned list state in Operations.

MapState(*args, **kwds)

State interface for partitioned key-value state.

StateDescriptor#

ValueStateDescriptor(name, value_type_info)

StateDescriptor for ValueState.

ListStateDescriptor(name, elem_type_info)

StateDescriptor for ListState.

MapStateDescriptor(name, key_type_info, ...)

StateDescriptor for MapState.

ReducingStateDescriptor(name, ...)

StateDescriptor for ReducingState.

AggregatingStateDescriptor(name, ...)

A StateDescriptor for AggregatingState.

StateTtlConfig#

class StateTtlConfig.UpdateType(value)[source]#

This option value configures when to update last access timestamp which prolongs state TTL.

Disabled = 0#

TTL is disabled. State does not expire.

OnCreateAndWrite = 1#

Last access timestamp is initialised when state is created and updated on every write operation.

OnReadAndWrite = 2#

The same as OnCreateAndWrite but also updated on read.

class StateTtlConfig.StateVisibility(value)[source]#

This option configures whether expired user value can be returned or not.

NeverReturnExpired = 1#

Never return expired user value.

ReturnExpiredIfNotCleanedUp = 0#

Return expired user value if it is not cleaned up yet.

class StateTtlConfig.TtlTimeCharacteristic(value)[source]#

This option configures time scale to use for ttl.

ProcessingTime = 0#

Processing time

class StateTtlConfig.CleanupStrategies(strategies: Dict[pyflink.datastream.state.StateTtlConfig.CleanupStrategies.Strategies, pyflink.datastream.state.StateTtlConfig.CleanupStrategies.CleanupStrategy], is_cleanup_in_background: bool)[source]#

TTL cleanup strategies.

This class configures when to cleanup expired state with TTL. By default, state is always cleaned up on explicit read access if found expired. Currently cleanup of state full snapshot can be additionally activated.

class CleanupStrategy[source]#

Base interface for cleanup strategies configurations.

class EmptyCleanupStrategy[source]#
class IncrementalCleanupStrategy(cleanup_size: int, run_cleanup_for_every_record: int)[source]#

Configuration of cleanup strategy while taking the full snapshot.

class RocksdbCompactFilterCleanupStrategy(query_time_after_num_entries: int)[source]#

Configuration of cleanup strategy using custom compaction filter in RocksDB.

class Strategies(value)[source]#

Fixed strategies ordinals in strategies config field.

StateTtlConfig.new_builder(ttl)

StateTtlConfig.get_update_type()

StateTtlConfig.get_state_visibility()

StateTtlConfig.get_ttl()

StateTtlConfig.get_ttl_time_characteristic()

StateTtlConfig.is_enabled()

StateTtlConfig.get_cleanup_strategies()

StateTtlConfig.Builder.set_update_type(...)

Sets the ttl update type.

StateTtlConfig.Builder.update_ttl_on_create_and_write()

StateTtlConfig.Builder.update_ttl_on_read_and_write()

StateTtlConfig.Builder.set_state_visibility(...)

Sets the state visibility.

StateTtlConfig.Builder.return_expired_if_not_cleaned_up()

StateTtlConfig.Builder.never_return_expired()

StateTtlConfig.Builder.set_ttl_time_characteristic(...)

Sets the time characteristic.

StateTtlConfig.Builder.use_processing_time()

StateTtlConfig.Builder.cleanup_full_snapshot()

Cleanup expired state in full snapshot on checkpoint.

StateTtlConfig.Builder.cleanup_incrementally(...)

Cleanup expired state incrementally cleanup local state.

StateTtlConfig.Builder.cleanup_in_rocksdb_compact_filter(...)

Cleanup expired state while Rocksdb compaction is running.

StateTtlConfig.Builder.disable_cleanup_in_background()

Disable default cleanup of expired state in background (enabled by default).

StateTtlConfig.Builder.set_ttl(ttl)

Sets the ttl time.

StateTtlConfig.Builder.build()

StateBackend#

A State Backend defines how the state of a streaming application is stored locally within the cluster. Different state backends store their state in different fashions, and use different data structures to hold the state of running applications.

For example, the HashMapStateBackend keeps working state in the memory of the TaskManager. The backend is lightweight and without additional dependencies.

The EmbeddedRocksDBStateBackend keeps working state in the memory of the TaskManager and stores state checkpoints in a filesystem(typically a replicated highly-available filesystem, like HDFS, Ceph, S3, GCS, etc).

The EmbeddedRocksDBStateBackend stores working state in an embedded RocksDB, instance and is able to scale working state to many terrabytes in size, only limited by available disk space across all task managers.

Raw Bytes Storage and Backends

The StateBackend creates services for raw bytes storage and for keyed state and operator state.

The org.apache.flink.runtime.state.AbstractKeyedStateBackend and `org.apache.flink.runtime.state.OperatorStateBackend created by this state backend define how to hold the working state for keys and operators. They also define how to checkpoint that state, frequently using the raw bytes storage (via the org.apache.flink.runtime.state.CheckpointStreamFactory). However, it is also possible that for example a keyed state backend simply implements the bridge to a key/value store, and that it does not need to store anything in the raw byte storage upon a checkpoint.

Serializability

State Backends need to be serializable(java.io.Serializable), because they distributed across parallel processes (for distributed execution) together with the streaming application code.

Because of that, StateBackend implementations are meant to be like factories that create the proper states stores that provide access to the persistent storage and hold the keyed- and operator state data structures. That way, the State Backend can be very lightweight (contain only configurations) which makes it easier to be serializable.

Thread Safety

State backend implementations have to be thread-safe. Multiple threads may be creating streams and keyed-/operator state backends concurrently.

HashMapStateBackend([j_hashmap_state_backend])

This state backend holds the working state in the memory (JVM heap) of the TaskManagers and checkpoints based on the configured CheckpointStorage.

EmbeddedRocksDBStateBackend([...])

A State Backend that stores its state in an embedded RocksDB instance.

MemoryStateBackend([checkpoint_path, ...])

IMPORTANT MemoryStateBackend is deprecated in favor of HashMapStateBackend and JobManagerCheckpointStorage.

FsStateBackend([checkpoint_directory_uri, ...])

IMPORTANT FsStateBackend is deprecated in favor of `HashMapStateBackend and FileSystemCheckpointStorage.

RocksDBStateBackend([checkpoint_data_uri, ...])

IMPORTANT RocksDBStateBackend is deprecated in favor of EmbeddedRocksDBStateBackend and FileSystemCheckpointStorage.

CustomStateBackend(j_custom_state_backend)

A wrapper of customized java state backend.

PredefinedOptions(value)

The PredefinedOptions are configuration settings for the RocksDBStateBackend.