Interface CheckpointedFunction
-
- All Known Implementing Classes:
ArrowSourceFunction
,ArtificalOperatorStateMapper
,ArtificialKeyedStateMapper
,CollectSinkFunction
,ContinuousFileMonitoringFunction
,DataGeneratorSource
,FastTop1Function
,FromElementsFunction
,SequenceGeneratorSource
,StateBootstrapFunction
,StatefulSequenceSource
,StreamingFileSink
,StreamSQLTestProgram.Generator
,StreamSQLTestProgram.KillMapper
,TwoPhaseCommitSinkFunction
,UpdatableTopNFunction
@Public public interface CheckpointedFunction
This is the core interface for stateful transformation functions, meaning functions that maintain state across individual stream records. While more lightweight interfaces exist as shortcuts for various types of state, this interface offer the greatest flexibility in managing both keyed state and operator state.The section Shortcuts illustrates the common lightweight ways to setup stateful functions typically used instead of the full fledged abstraction represented by this interface.
Initialization
The
initializeState(FunctionInitializationContext)
is called when the parallel instance of the transformation function is created during distributed execution. The method gives access to theFunctionInitializationContext
which in turn gives access to the to theOperatorStateStore
andKeyedStateStore
.The
OperatorStateStore
andKeyedStateStore
give access to the data structures in which state should be stored for Flink to transparently manage and checkpoint it, such asValueState
orListState
.Note: The
KeyedStateStore
can only be used when the transformation supports keyed state, i.e., when it is applied on a keyed stream (after akeyBy(...)
).Snapshot
The
snapshotState(FunctionSnapshotContext)
is called whenever a checkpoint takes a state snapshot of the transformation function. Inside this method, functions typically make sure that the checkpointed data structures (obtained in the initialization phase) are up to date for a snapshot to be taken. The given snapshot context gives access to the metadata of the checkpoint.In addition, functions can use this method as a hook to flush/commit/synchronize with external systems.
Example
The code example below illustrates how to use this interface for a function that keeps counts of events per key and per parallel partition (parallel instance of the transformation function during distributed execution). The example also changes of parallelism, which affect the count-per-parallel-partition by adding up the counters of partitions that get merged on scale-down. Note that this is a toy example, but should illustrate the basic skeleton for a stateful function.
public class MyFunction<T> implements MapFunction<T, T>, CheckpointedFunction { private ReducingState<Long> countPerKey; private ListState<Long> countPerPartition; private long localCount; public void initializeState(FunctionInitializationContext context) throws Exception { // get the state data structure for the per-key state countPerKey = context.getKeyedStateStore().getReducingState( new ReducingStateDescriptor<>("perKeyCount", new AddFunction<>(), Long.class)); // get the state data structure for the per-partition state countPerPartition = context.getOperatorStateStore().getOperatorState( new ListStateDescriptor<>("perPartitionCount", Long.class)); // initialize the "local count variable" based on the operator state for (Long l : countPerPartition.get()) { localCount += l; } } public void snapshotState(FunctionSnapshotContext context) throws Exception { // the keyed state is always up to date anyways // just bring the per-partition state in shape countPerPartition.clear(); countPerPartition.add(localCount); } public T map(T value) throws Exception { // update the states countPerKey.add(1L); localCount++; return value; } }
Shortcuts
There are various ways that transformation functions can use state without implementing the full-fledged
CheckpointedFunction
interface:Operator State
Checkpointing some state that is part of the function object itself is possible in a simpler way by directly implementing the
ListCheckpointed
interface.Keyed State
Access to keyed state is possible via the
RuntimeContext
's methods:public class CountPerKeyFunction<T> extends RichMapFunction<T, T> { private ValueState<Long> count; public void open(OpenContext ctx) throws Exception { count = getRuntimeContext().getState(new ValueStateDescriptor<>("myCount", Long.class)); } public T map(T value) throws Exception { Long current = count.get(); count.update(current == null ? 1L : current + 1); return value; } }
- See Also:
ListCheckpointed
,RuntimeContext
-
-
Method Summary
All Methods Instance Methods Abstract Methods Modifier and Type Method Description void
initializeState(FunctionInitializationContext context)
This method is called when the parallel function instance is created during distributed execution.void
snapshotState(FunctionSnapshotContext context)
This method is called when a snapshot for a checkpoint is requested.
-
-
-
Method Detail
-
snapshotState
void snapshotState(FunctionSnapshotContext context) throws Exception
This method is called when a snapshot for a checkpoint is requested. This acts as a hook to the function to ensure that all state is exposed by means previously offered throughFunctionInitializationContext
when the Function was initialized, or offered now byFunctionSnapshotContext
itself.- Parameters:
context
- the context for drawing a snapshot of the operator- Throws:
Exception
- Thrown, if state could not be created ot restored.
-
initializeState
void initializeState(FunctionInitializationContext context) throws Exception
This method is called when the parallel function instance is created during distributed execution. Functions typically set up their state storing data structures in this method.- Parameters:
context
- the context for initializing the operator- Throws:
Exception
- Thrown, if state could not be created ot restored.
-
-