@Public public interface CheckpointedFunction
The section Shortcuts illustrates the common lightweight ways to setup stateful functions typically used instead of the full fledged abstraction represented by this interface.
The initializeState(FunctionInitializationContext)
is called when
the parallel instance of the transformation function is created during distributed execution. The
method gives access to the FunctionInitializationContext
which in turn gives access to
the to the OperatorStateStore
and KeyedStateStore
.
The OperatorStateStore
and KeyedStateStore
give access to the data structures
in which state should be stored for Flink to transparently manage and checkpoint it, such as
ValueState
or ListState
.
Note: The KeyedStateStore
can only be used when the transformation supports
keyed state, i.e., when it is applied on a keyed stream (after a keyBy(...)
).
The snapshotState(FunctionSnapshotContext)
is called whenever a
checkpoint takes a state snapshot of the transformation function. Inside this method, functions
typically make sure that the checkpointed data structures (obtained in the initialization phase)
are up to date for a snapshot to be taken. The given snapshot context gives access to the
metadata of the checkpoint.
In addition, functions can use this method as a hook to flush/commit/synchronize with external systems.
The code example below illustrates how to use this interface for a function that keeps counts of events per key and per parallel partition (parallel instance of the transformation function during distributed execution). The example also changes of parallelism, which affect the count-per-parallel-partition by adding up the counters of partitions that get merged on scale-down. Note that this is a toy example, but should illustrate the basic skeleton for a stateful function.
public class MyFunction<T> implements MapFunction<T, T>, CheckpointedFunction {
private ReducingState<Long> countPerKey;
private ListState<Long> countPerPartition;
private long localCount;
public void initializeState(FunctionInitializationContext context) throws Exception {
// get the state data structure for the per-key state
countPerKey = context.getKeyedStateStore().getReducingState(
new ReducingStateDescriptor<>("perKeyCount", new AddFunction<>(), Long.class));
// get the state data structure for the per-partition state
countPerPartition = context.getOperatorStateStore().getOperatorState(
new ListStateDescriptor<>("perPartitionCount", Long.class));
// initialize the "local count variable" based on the operator state
for (Long l : countPerPartition.get()) {
localCount += l;
}
}
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// the keyed state is always up to date anyways
// just bring the per-partition state in shape
countPerPartition.clear();
countPerPartition.add(localCount);
}
public T map(T value) throws Exception {
// update the states
countPerKey.add(1L);
localCount++;
return value;
}
}
There are various ways that transformation functions can use state without implementing the
full-fledged CheckpointedFunction
interface:
Checkpointing some state that is part of the function object itself is possible in a simpler
way by directly implementing the ListCheckpointed
interface.
Access to keyed state is possible via the RuntimeContext
's methods:
public class CountPerKeyFunction<T> extends RichMapFunction<T, T> {
private ValueState<Long> count;
public void open(Configuration cfg) throws Exception {
count = getRuntimeContext().getState(new ValueStateDescriptor<>("myCount", Long.class));
}
public T map(T value) throws Exception {
Long current = count.get();
count.update(current == null ? 1L : current + 1);
return value;
}
}
ListCheckpointed
,
RuntimeContext
Modifier and Type | Method and Description |
---|---|
void |
initializeState(FunctionInitializationContext context)
This method is called when the parallel function instance is created during distributed
execution.
|
void |
snapshotState(FunctionSnapshotContext context)
This method is called when a snapshot for a checkpoint is requested.
|
void snapshotState(FunctionSnapshotContext context) throws Exception
FunctionInitializationContext
when the Function was initialized, or offered now by FunctionSnapshotContext
itself.context
- the context for drawing a snapshot of the operatorException
- Thrown, if state could not be created ot restored.void initializeState(FunctionInitializationContext context) throws Exception
context
- the context for initializing the operatorException
- Thrown, if state could not be created ot restored.Copyright © 2014–2021 The Apache Software Foundation. All rights reserved.