The keyed state managed by Flink is a sort of sharded, key/value store, and the working copy of each item of keyed state is kept somewhere local to the taskmanager responsible for that key. Operator state is also local to the machine(s) that need(s) it. Flink periodically takes persistent snapshots of all the state and copies these snapshots somewhere more durable, such as a distributed file system.
In the event of the failure, Flink can restore the complete state of your application and resume processing as though nothing had gone wrong.
This state that Flink manages is stored in a state backend. Two implementations of state backends are available – one based on RocksDB, an embedded key/value store that keeps its working state on disk, and another heap-based state backend that keeps its working state in memory, on the Java heap. This heap-based state backend comes in two flavors: the FsStateBackend that persists its state snapshots to a distributed file system, and the MemoryStateBackend that uses the JobManager’s heap.
|Name||Working State||State Backup||Snapshotting|
|RocksDBStateBackend||Local disk (tmp dir)||Distributed file system||Full / Incremental|
|FsStateBackend||JVM Heap||Distributed file system||Full|
|MemoryStateBackend||JVM Heap||JobManager JVM Heap||Full|
When working with state kept in a heap-based state backend, accesses and updates involve reading and
writing objects on the heap. But for objects kept in the
RocksDBStateBackend, accesses and updates
involve serialization and deserialization, and so are much more expensive. But the amount of state
you can have with RocksDB is limited only by the size of the local disk. Note also that only the
RocksDBStateBackend is able to do incremental snapshotting, which is a significant benefit for
applications with large amounts of slowly changing state.
All of these state backends are able to do asynchronous snapshotting, meaning that they can take a snapshot without impeding the ongoing stream processing.
Flink uses a variant of the Chandy-Lamport algorithm known as asynchronous barrier snapshotting.
When a task manager is instructed by the checkpoint coordinator (part of the job manager) to begin a checkpoint, it has all of the sources record their offsets and insert numbered checkpoint barriers into their streams. These barriers flow through the job graph, indicating the part of the stream before and after each checkpoint.
Checkpoint n will contain the state of each operator that resulted from having consumed every event before checkpoint barrier n, and none of the events after it.
As each operator in the job graph receives one of these barriers, it records its state. Operators
with two input streams (such as a
CoProcessFunction) perform barrier alignment so that the
snapshot will reflect the state resulting from consuming events from both input streams up to (but
not past) both barriers.
Flink’s state backends use a copy-on-write mechanism to allow stream processing to continue unimpeded while older versions of the state are being asynchronously snapshotted. Only when the snapshots have been durably persisted will these older versions of the state be garbage collected.
When things go wrong in a stream processing application, it is possible to have either lost, or duplicated results. With Flink, depending on the choices you make for your application and the cluster you run it on, any of these outcomes is possible:
Given that Flink recovers from faults by rewinding and replaying the source data streams, when the ideal situation is described as exactly once this does not mean that every event will be processed exactly once. Instead, it means that every event will affect the state being managed by Flink exactly once.
Barrier alignment is only needed for providing exactly once guarantees. If you don’t need this, you
can gain some performance by configuring Flink to use
CheckpointingMode.AT_LEAST_ONCE, which has
the effect of disabling barrier alignment.
To achieve exactly once end-to-end, so that every event from the sources affects the sinks exactly once, the following must be true: