@PublicEvolving public interface CheckpointedFunction
The section Shortcuts illustrates the common lightweight ways to setup stateful functions typically used instead of the full fledged abstraction represented by this interface.
initializeState(FunctionInitializationContext)
is called when
the parallel instance of the transformation function is created during distributed execution.
The method gives access to the FunctionInitializationContext
which in turn gives access
to the to the OperatorStateStore
and KeyedStateStore
.
The OperatorStateStore
and KeyedStateStore
give access to the data structures
in which state should be stored for Flink to transparently manage and checkpoint it, such as
ValueState
or
ListState
.
Note: The KeyedStateStore
can only be used when the transformation supports
keyed state, i.e., when it is applied on a keyed stream (after a keyBy(...)
).
snapshotState(FunctionSnapshotContext)
is called whenever a
checkpoint takes a state snapshot of the transformation function. Inside this method, functions typically
make sure that the checkpointed data structures (obtained in the initialization phase) are up
to date for a snapshot to be taken. The given snapshot context gives access to the metadata
of the checkpoint.
In addition, functions can use this method as a hook to flush/commit/synchronize with external systems.
public class MyFunction<T> implements MapFunction<T, T>, CheckpointedFunction {
private ReducingState<Long> countPerKey;
private ListState<Long> countPerPartition;
private long localCount;
public void initializeState(FunctionInitializationContext context) throws Exception {
// get the state data structure for the per-key state
countPerKey = context.getKeyedStateStore().getReducingState(
new ReducingStateDescriptor<>("perKeyCount", new AddFunction<>(), Long.class));
// get the state data structure for the per-partition state
countPerPartition = context.getOperatorStateStore().getOperatorState(
new ListStateDescriptor<>("perPartitionCount", Long.class));
// initialize the "local count variable" based on the operator state
for (Long l : countPerPartition.get()) {
localCount += l;
}
}
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// the keyed state is always up to date anyways
// just bring the per-partition state in shape
countPerPartition.clear();
countPerPartition.add(localCount);
}
public T map(T value) throws Exception {
// update the states
countPerKey.add(1L);
localCount++;
return value;
}
}
CheckpointedFunction
interface:
ListCheckpointed
interface.
That mechanism is similar to the previously used Checkpointed
interface.
RuntimeContext
's methods:
public class CountPerKeyFunction<T> extends RichMapFunction<T, T> {
private ValueState<Long> count;
public void open(Configuration cfg) throws Exception {
count = getRuntimeContext().getState(new ValueStateDescriptor<>("myCount", Long.class));
}
public T map(T value) throws Exception {
Long current = count.get();
count.update(current == null ? 1L : current + 1);
return value;
}
}
ListCheckpointed
,
RuntimeContext
Modifier and Type | Method and Description |
---|---|
void |
initializeState(FunctionInitializationContext context)
This method is called when the parallel function instance is created during distributed
execution.
|
void |
snapshotState(FunctionSnapshotContext context)
This method is called when a snapshot for a checkpoint is requested.
|
void snapshotState(FunctionSnapshotContext context) throws Exception
FunctionInitializationContext
when
the Function was initialized, or offered now by FunctionSnapshotContext
itself.context
- the context for drawing a snapshot of the operatorException
void initializeState(FunctionInitializationContext context) throws Exception
context
- the context for initializing the operatorException
Copyright © 2014–2020 The Apache Software Foundation. All rights reserved.