Interface CheckpointedFunction

  • All Known Implementing Classes:
    ArrowSourceFunction, ArtificalOperatorStateMapper, ArtificialKeyedStateMapper, CollectSinkFunction, ContinuousFileMonitoringFunction, DataGeneratorSource, FastTop1Function, FromElementsFunction, SequenceGeneratorSource, StateBootstrapFunction, StatefulSequenceSource, StreamingFileSink, StreamSQLTestProgram.Generator, StreamSQLTestProgram.KillMapper, TwoPhaseCommitSinkFunction, UpdatableTopNFunction

    @Public
    public interface CheckpointedFunction
    This is the core interface for stateful transformation functions, meaning functions that maintain state across individual stream records. While more lightweight interfaces exist as shortcuts for various types of state, this interface offer the greatest flexibility in managing both keyed state and operator state.

    The section Shortcuts illustrates the common lightweight ways to setup stateful functions typically used instead of the full fledged abstraction represented by this interface.

    Initialization

    The initializeState(FunctionInitializationContext) is called when the parallel instance of the transformation function is created during distributed execution. The method gives access to the FunctionInitializationContext which in turn gives access to the to the OperatorStateStore and KeyedStateStore.

    The OperatorStateStore and KeyedStateStore give access to the data structures in which state should be stored for Flink to transparently manage and checkpoint it, such as ValueState or ListState.

    Note: The KeyedStateStore can only be used when the transformation supports keyed state, i.e., when it is applied on a keyed stream (after a keyBy(...)).

    Snapshot

    The snapshotState(FunctionSnapshotContext) is called whenever a checkpoint takes a state snapshot of the transformation function. Inside this method, functions typically make sure that the checkpointed data structures (obtained in the initialization phase) are up to date for a snapshot to be taken. The given snapshot context gives access to the metadata of the checkpoint.

    In addition, functions can use this method as a hook to flush/commit/synchronize with external systems.

    Example

    The code example below illustrates how to use this interface for a function that keeps counts of events per key and per parallel partition (parallel instance of the transformation function during distributed execution). The example also changes of parallelism, which affect the count-per-parallel-partition by adding up the counters of partitions that get merged on scale-down. Note that this is a toy example, but should illustrate the basic skeleton for a stateful function.

    
     public class MyFunction<T> implements MapFunction<T, T>, CheckpointedFunction {
    
         private ReducingState<Long> countPerKey;
         private ListState<Long> countPerPartition;
    
         private long localCount;
    
         public void initializeState(FunctionInitializationContext context) throws Exception {
             // get the state data structure for the per-key state
             countPerKey = context.getKeyedStateStore().getReducingState(
                     new ReducingStateDescriptor<>("perKeyCount", new AddFunction<>(), Long.class));
    
             // get the state data structure for the per-partition state
             countPerPartition = context.getOperatorStateStore().getOperatorState(
                     new ListStateDescriptor<>("perPartitionCount", Long.class));
    
             // initialize the "local count variable" based on the operator state
             for (Long l : countPerPartition.get()) {
                 localCount += l;
             }
         }
    
         public void snapshotState(FunctionSnapshotContext context) throws Exception {
             // the keyed state is always up to date anyways
             // just bring the per-partition state in shape
             countPerPartition.clear();
             countPerPartition.add(localCount);
         }
    
         public T map(T value) throws Exception {
             // update the states
             countPerKey.add(1L);
             localCount++;
    
             return value;
         }
     }
     

    Shortcuts

    There are various ways that transformation functions can use state without implementing the full-fledged CheckpointedFunction interface:

    Operator State

    Checkpointing some state that is part of the function object itself is possible in a simpler way by directly implementing the ListCheckpointed interface.

    Keyed State

    Access to keyed state is possible via the RuntimeContext's methods:

    
     public class CountPerKeyFunction<T> extends RichMapFunction<T, T> {
    
         private ValueState<Long> count;
    
         public void open(OpenContext ctx) throws Exception {
             count = getRuntimeContext().getState(new ValueStateDescriptor<>("myCount", Long.class));
         }
    
         public T map(T value) throws Exception {
             Long current = count.get();
             count.update(current == null ? 1L : current + 1);
    
             return value;
         }
     }
     
    See Also:
    ListCheckpointed, RuntimeContext
    • Method Detail

      • snapshotState

        void snapshotState​(FunctionSnapshotContext context)
                    throws Exception
        This method is called when a snapshot for a checkpoint is requested. This acts as a hook to the function to ensure that all state is exposed by means previously offered through FunctionInitializationContext when the Function was initialized, or offered now by FunctionSnapshotContext itself.
        Parameters:
        context - the context for drawing a snapshot of the operator
        Throws:
        Exception - Thrown, if state could not be created ot restored.
      • initializeState

        void initializeState​(FunctionInitializationContext context)
                      throws Exception
        This method is called when the parallel function instance is created during distributed execution. Functions typically set up their state storing data structures in this method.
        Parameters:
        context - the context for initializing the operator
        Throws:
        Exception - Thrown, if state could not be created ot restored.