Apache Flink’s State Processor API provides powerful functionality to reading, writing, and modifing savepoints and checkpoints using Flink’s batch DataSet API.
Due to the interoperability of DataSet and Table API, you can even use relational Table API or SQL queries to analyze and process state data.
For example, you can take a savepoint of a running stream processing application and analyze it with a DataSet batch program to verify that the application behaves correctly.
Or you can read a batch of data from any store, preprocess it, and write the result to a savepoint that you use to bootstrap the state of a streaming application.
It is also possible to fix inconsistent state entries.
Finally, the State Processor API opens up many ways to evolve a stateful application that was previously blocked by parameter and design choices that could not be changed without losing all the state of the application after it was started.
For example, you can now arbitrarily modify the data types of states, adjust the maximum parallelism of operators, split or merge operator state, re-assign operator UIDs, and so on.
To get started with the state processor api, include the following library in your application.
The State Processor API maps the state of a streaming application to one or more data sets that can be processed separately.
In order to be able to use the API, you need to understand how this mapping works.
But let us first have a look at what a stateful Flink job looks like.
A Flink job is composed of operators; typically one or more source operators, a few operators for the actual processing, and one or more sink operators.
Each operator runs in parallel in one or more tasks and can work with different types of state.
An operator can have zero, one, or more “operator states” which are organized as lists that are scoped to the operator’s tasks.
If the operator is applied on a keyed stream, it can also have zero, one, or more “keyed states” which are scoped to a key that is extracted from each processed record.
You can think of keyed state as a distributed key-value map.
The following figure shows the application “MyApp” which consists of three operators called “Src”, “Proc”, and “Snk”.
Src has one operator state (os1), Proc has one operator state (os2) and two keyed states (ks1, ks2) and Snk is stateless.
A savepoint or checkpoint of MyApp consists of the data of all states, organized in a way that the states of each task can be restored.
When processing the data of a savepoint (or checkpoint) with a batch job, we need a mental model that maps the data of the individual tasks’ states into data sets or tables.
In fact, we can think of a savepoint as a database. Every operator (identified by its UID) represents a namespace.
Each operator state of an operator is mapped to a dedicated table in the namespace with a single column that holds the state’s data of all tasks.
All keyed states of an operator are mapped to a single table consisting of a column for the key, and one column for each keyed state.
The following figure shows how a savepoint of MyApp is mapped to a database.
The figure shows how the values of Src’s operator state are mapped to a table with one column and five rows, one row for each of the list entries across all parallel tasks of Src.
Operator state os2 of the operator “Proc” is similarly mapped to an individual table.
The keyed states ks1 and ks2 are combined to a single table with three columns, one for the key, one for ks1 and one for ks2.
The keyed table holds one row for each distinct key of both keyed states.
Since the operator “Snk” does not have any state, its namespace is empty.
Reading State
Reading state begins by specifying the path to a valid savepoint or checkpoint along with the StateBackend that should be used to restore the data.
The compatibility guarantees for restoring state are identical to those when restoring a DataStream application.
Operator State
Operator state is any non-keyed state in Flink.
This includes, but is not limited to, any use of CheckpointedFunction or BroadcastState within an application.
When reading operator state, users specify the operator uid, the state name, and the type information.
Operator List State
Operator state stored in a CheckpointedFunction using getListState can be read using ExistingSavepoint#readListState.
The state name and type information should match those used to define the ListStateDescriptor that declared this state in the DataStream application.
Operator Union List State
Operator state stored in a CheckpointedFunction using getUnionListState can be read using ExistingSavepoint#readUnionState.
The state name and type information should match those used to define the ListStateDescriptor that declared this state in the DataStream application.
The framework will return a single copy of the state, equivalent to restoring a DataStream with parallelism 1.
Broadcast State
BroadcastState can be read using ExistingSavepoint#readBroadcastState.
The state name and type information should match those used to define the MapStateDescriptor that declared this state in the DataStream application.
The framework will return a single copy of the state, equivalent to restoring a DataStream with parallelism 1.
Using Custom Serializers
Each of the operator state readers support using custom TypeSerializers if one was used to define the StateDescriptor that wrote out the state.
Keyed State
Keyed state, or partitioned state, is any state that is partitioned relative to a key.
When reading a keyed state, users specify the operator id and a KeyedStateReaderFunction<KeyType, OutputType>.
The KeyedStateReaderFunction allows users to read arbitrary columns and complex state types such as ListState, MapState, and AggregatingState.
This means if an operator contains a stateful process function such as:
Then it can read by defining an output type and corresponding KeyedStateReaderFunction.
Along with reading registered state values, each key has access to a Context with metadata such as registered event time and processing time timers.
Note: When using a KeyedStateReaderFunction, all state descriptors must be registered eagerly inside of open. Any attempt to call a RuntimeContext#get*State will result in a RuntimeException.
Writing New Savepoints
Savepoint’s may also be written, which allows such use cases as bootstrapping state based on historical data.
Each savepoint is made up of one or more BootstrapTransformation’s (explained below), each of which defines the state for an individual operator.
The UIDs associated with each operator must match one to one with the UIDs assigned to the operators in your DataStream application; these are how Flink knows what state maps to which operator.
Operator State
Simple operator state, using CheckpointedFunction, can be created using the StateBootstrapFunction.
Broadcast State
BroadcastState can be written using a BroadcastStateBootstrapFunction. Similar to broadcast state in the DataStream API, the full state must fit in memory.
Keyed State
Keyed state for ProcessFunction’s and other RichFunction types can be written using a KeyedStateBootstrapFunction.
The KeyedStateBootstrapFunction supports setting event time and processing time timers.
The timers will not fire inside the bootstrap function and only become active once restored within a DataStream application.
If a processing time timer is set but the state is not restored until after that time has passed, the timer will fire immediately upon start.
Attention If your bootstrap function creates timers, the state can only be restored using one of the process type functions.
Modifying Savepoints
Besides creating a savepoint from scratch, you can base one off an existing savepoint such as when bootstrapping a single new operator for an existing job.
Note: When basing a new savepoint on existing state, the state processor api makes a shallow copy of the pointers to the existing operators. This means that both savepoints share state and one cannot be deleted without corrupting the other!