Programs written in the Data Stream API often hold state in various forms:
- Windows gather elements or aggregates until they are triggered
- Transformation functions may use the key/value state interface to store values
- Transformation functions may implement the
Checkpointedinterface to make their local variables fault tolerant
See also Working with State in the streaming API guide.
When checkpointing is activated, such state is persisted upon checkpoints to guard against data loss and recover consistently. How the state is represented internally, and how and where it is persisted upon checkpoints depends on the chosen State Backend.
- Available State Backends
- Configuring a State Backend
Available State Backends
Out of the box, Flink bundles these state backends:
If nothing else is configured, the system will use the MemoryStateBacked.
The MemoryStateBacked holds data internally as objects on the Java heap. Key/value state and window operators hold hash tables that store the values, triggers, etc.
Upon checkpoints, this state backend will snapshot the state and send it as part of the checkpoint acknowledgement messages to the JobManager (master), which stores it on its heap as well.
Limitations of the MemoryStateBackend:
- The size of each individual state is by default limited to 5 MB. This value can be increased in the constructor of the MemoryStateBackend.
- Irrespective of the configured maximal state size, the state cannot be larger than the akka frame size (see Configuration).
- The aggregate state must fit into the JobManager memory.
The MemoryStateBackend is encouraged for:
- Local development and debugging
- Jobs that do hold little state, such as jobs that consist only of record-at-a-time functions (Map, FlatMap, Filter, …). The Kafka Consumer requires very little state.
The FsStateBackend is configured with a file system URL (type, address, path), such as for example “hdfs://namenode:40010/flink/checkpoints” or “file:///data/flink/checkpoints”.
The FsStateBackend holds in-flight data in the TaskManager’s memory. Upon checkpointing, it writes state snapshots into files in the configured file system and directory. Minimal metadata is stored in the JobManager’s memory (or, in high-availability mode, in the metadata checkpoint).
The FsStateBackend is encouraged for:
- Jobs with large state, long windows, large key/value states.
- All high-availability setups.
The RocksDBStateBackend is configured with a file system URL (type, address, path), such as for example “hdfs://namenode:40010/flink/checkpoints” or “file:///data/flink/checkpoints”.
The RocksDBStateBackend holds in-flight data in a RocksDB data base that is (per default) stored in the TaskManager data directories. Upon checkpointing, the whole RocksDB data base will be checkpointed into the configured file system and directory. Minimal metadata is stored in the JobManager’s memory (or, in high-availability mode, in the metadata checkpoint).
The RocksDBStateBackend is encouraged for:
- Jobs with very large state, long windows, large key/value states.
- All high-availability setups.
Note that the amount of state that you can keep is only limited by the amount of disc space available. This allows keeping very large state, compared to the FsStateBackend that keeps state in memory. This also means, however, that the maximum throughput that can be achieved will be lower with this state backend.
NOTE: To use the RocksDBStateBackend you also have to add the correct maven dependency to your project:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.10</artifactId> <version>1.0.3</version> </dependency>
The backend is currently not part of the binary distribution. See here for an explanation of how to include it for cluster execution.
Configuring a State Backend
State backends can be configured per job. In addition, you can define a default state backend to be used when the job does not explicitly define a state backend.
Setting the Per-job State Backend
The per-job state backend is set on the
StreamExecutionEnvironment of the job, as shown in the example below:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
val env = StreamExecutionEnvironment.getExecutionEnvironment() env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"))
Setting Default State Backend
A default state backend can be configured in the
flink-conf.yaml, using the configuration key
Possible values for the config entry are jobmanager (MemoryStateBackend), filesystem (FsStateBackend), or the fully qualified class name of the class that implements the state backend factory FsStateBackendFactory.
In the case where the default state backend is set to filesystem, the entry
state.backend.fs.checkpointdir defines the directory where the checkpoint data will be stored.
A sample section in the configuration file could look as follows:
# The backend that will be used to store operator state checkpoints state.backend: filesystem # Directory for storing checkpoints state.backend.fs.checkpointdir: hdfs://namenode:40010/flink/checkpoints