pyflink.datastream.state_backend.FsStateBackend#
- class FsStateBackend(checkpoint_directory_uri=None, default_savepoint_directory_uri=None, file_state_size_threshold=None, write_buffer_size=None, using_asynchronous_snapshots=None, j_fs_state_backend=None)[source]#
IMPORTANT FsStateBackend is deprecated in favor of `HashMapStateBackend and FileSystemCheckpointStorage. This change does not affect the runtime characteristics of your Jobs and is simply an API change to help better communicate the ways Flink separates local state storage from fault tolerance. Jobs can be upgraded without loss of state. If configuring your state backend via the StreamExecutionEnvironment please make the following changes.
>> env.set_state_backend(HashMapStateBackend()) >> env.get_checkpoint_config().set_checkpoint_storage("hdfs://checkpoints")
If you are configuring your state backend via the flink-conf.yaml please set your state backend type to hashmap.
This state backend holds the working state in the memory (JVM heap) of the TaskManagers. The state backend checkpoints state as files to a file system (hence the backend’s name).
Each checkpoint individually will store all its files in a subdirectory that includes the checkpoint number, such as
hdfs://namenode:port/flink-checkpoints/chk-17/
.State Size Considerations
Working state is kept on the TaskManager heap. If a TaskManager executes multiple tasks concurrently (if the TaskManager has multiple slots, or if slot-sharing is used) then the aggregate state of all tasks needs to fit into that TaskManager’s memory.
This state backend stores small state chunks directly with the metadata, to avoid creating many small files. The threshold for that is configurable. When increasing this threshold, the size of the checkpoint metadata increases. The checkpoint metadata of all retained completed checkpoints needs to fit into the JobManager’s heap memory. This is typically not a problem, unless the threshold
get_min_file_size_threshold()
is increased significantly.Persistence Guarantees
Checkpoints from this state backend are as persistent and available as filesystem that is written to. If the file system is a persistent distributed file system, this state backend supports highly available setups. The backend additionally supports savepoints and externalized checkpoints.
Configuration
As for all state backends, this backend can either be configured within the application (by creating the backend with the respective constructor parameters and setting it on the execution environment) or by specifying it in the Flink configuration.
If the state backend was specified in the application, it may pick up additional configuration parameters from the Flink configuration. For example, if the backend if configured in the application without a default savepoint directory, it will pick up a default savepoint directory specified in the Flink configuration of the running job/cluster. That behavior is implemented via the
configure()
method.Methods
get_checkpoint_path
()Gets the base directory where all the checkpoints are stored.
get_min_file_size_threshold
()Gets the threshold below which state is stored as part of the metadata, rather than in files.
get_write_buffer_size
()Gets the write buffer size for created checkpoint stream.
is_using_asynchronous_snapshots
()Gets whether the key/value data structures are asynchronously snapshotted.