Programs written in the Data Stream API often hold state in various forms:
Checkpointed
interface to make their local variables fault tolerantSee also Working with State in the streaming API guide.
When checkpointing is activated, such state is persisted upon checkpoints to guard against data loss and recover consistently. How the state is represented internally, and how and where it is persisted upon checkpoints depends on the chosen State Backend.
Out of the box, Flink bundles these state backends:
If nothing else is configured, the system will use the MemoryStateBackend.
The MemoryStateBackend holds data internally as objects on the Java heap. Key/value state and window operators hold hash tables that store the values, triggers, etc.
Upon checkpoints, this state backend will snapshot the state and send it as part of the checkpoint acknowledgement messages to the JobManager (master), which stores it on its heap as well.
The MemoryStateBackend can be configured to use asynchronous snapshots. While we strongly encourage the use of asynchronous snapshots to avoid blocking pipelines, please note that this is a new feature and currently not enabled
by default. To enable this feature, users can instantiate a MemoryStateBackend
with the corresponding boolean flag in the constructor set to true
, e.g.:
new MemoryStateBackend(MAX_MEM_STATE_SIZE, true);
Limitations of the MemoryStateBackend:
The MemoryStateBackend is encouraged for:
The FsStateBackend is configured with a file system URL (type, address, path), such as “hdfs://namenode:40010/flink/checkpoints” or “file:///data/flink/checkpoints”.
The FsStateBackend holds in-flight data in the TaskManager’s memory. Upon checkpointing, it writes state snapshots into files in the configured file system and directory. Minimal metadata is stored in the JobManager’s memory (or, in high-availability mode, in the metadata checkpoint).
The FsStateBackend can be configured to use asynchronous snapshots. While we strongly encourage the use of asynchronous snapshots to avoid blocking pipelines, please note that this is a new feature and currently not enabled
by default. To enable this feature, users can instantiate a FsStateBackend
with the corresponding boolean flag in the constructor set to true
, e.g.:
new FsStateBackend(path, true);
The FsStateBackend is encouraged for:
The RocksDBStateBackend is configured with a file system URL (type, address, path), such as “hdfs://namenode:40010/flink/checkpoints” or “file:///data/flink/checkpoints”.
The RocksDBStateBackend holds in-flight data in a RocksDB data base that is (per default) stored in the TaskManager data directories. Upon checkpointing, the whole RocksDB data base will be checkpointed into the configured file system and directory. Minimal metadata is stored in the JobManager’s memory (or, in high-availability mode, in the metadata checkpoint).
The RocksDBStateBackend always performs asynchronous snapshots.
Limitations of the RocksDBStateBackend:
The RocksDBStateBackend is encouraged for:
Note that the amount of state that you can keep is only limited by the amount of disc space available. This allows keeping very large state, compared to the FsStateBackend that keeps state in memory. This also means, however, that the maximum throughput that can be achieved will be lower with this state backend.
RocksDBStateBackend is currently the only backend that offers incremental checkpoints (see here).
The default state backend, if you specify nothing, is the jobmanager. If you wish to establish a different default for all jobs on your cluster, you can do so by defining a new default state backend in flink-conf.yaml. The default state backend can be overridden on a per-job basis, as shown below.
The per-job state backend is set on the StreamExecutionEnvironment
of the job, as shown in the example below:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"))
A default state backend can be configured in the flink-conf.yaml
, using the configuration key state.backend
.
Possible values for the config entry are jobmanager (MemoryStateBackend), filesystem (FsStateBackend), rocksdb (RocksDBStateBackend), or the fully qualified class
name of the class that implements the state backend factory FsStateBackendFactory,
such as org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
for RocksDBStateBackend.
In the case where the default state backend is set to filesystem, the entry state.backend.fs.checkpointdir
defines the directory where the checkpoint data will be stored.
A sample section in the configuration file could look as follows:
# The backend that will be used to store operator state checkpoints
state.backend: filesystem
# Directory for storing checkpoints
state.backend.fs.checkpointdir: hdfs://namenode:40010/flink/checkpoints