T
- The type of the operator state.CheckpointedFunction
. This should only be needed in rare cases, though.@PublicEvolving @Deprecated public interface ListCheckpointed<T extends Serializable>
Implementing this interface is a shortcut for obtaining the default ListState
from the
OperatorStateStore
. Using the OperatorStateStore
directly gives more flexible
options to use operator state, for example controlling the serialization of the state objects, or
have multiple named states.
State redistribution happens when the parallelism of the operator is changed. State redistribution of operator state (to which category the state handled by this interface belongs) always goes through a checkpoint, so it appears to the transformation functions like a failure/recovery combination, where recovery happens with a different parallelism.
Conceptually, the state in the checkpoint is the concatenated list of all lists returned by the parallel transformation function instances. When restoring from a checkpoint, the list is divided into sub-lists that are assigned to each parallel function instance.
The following sketch illustrates the state redistribution.The function runs with parallelism 3. The first two parallel instance of the function return lists with two state elements, the third one a list with one element.
func_1 func_2 func_3 +----+----+ +----+----+ +----+ | S1 | S2 | | S3 | S4 | | S5 | +----+----+ +----+----+ +----+
Recovering the checkpoint with parallelism = 5 yields the following state assignment:
func_1 func_2 func_3 func_4 func_5 +----+ +----+ +----+ +----+ +----+ | S1 | | S2 | | S3 | | S4 | | S5 | +----+ +----+ +----+ +----+ +----+
Recovering the checkpoint with parallelism = 2 yields the following state assignment:
func_1 func_2 +----+----+----+ +----+----+ | S1 | S2 | S3 | | S4 | S5 | +----+----+----+ +----+----+
The following example illustrates how to implement a MapFunction
that counts all
elements passing through it, keeping the total count accurate under re-scaling (changes or
parallelism):
public class CountingFunction<T> implements MapFunction<T, Tuple2<T, Long>>, ListCheckpointed<Long> {
// this count is the number of elements in the parallel subtask
private long count;
{@literal @}Override
public List<Long> snapshotState(long checkpointId, long timestamp) {
// return a single element - our count
return Collections.singletonList(count);
}
{@literal @}Override
public void restoreState(List<Long> state) throws Exception {
// in case of scale in, this adds up counters from different original subtasks
// in case of scale out, list this may be empty
for (Long l : state) {
count += l;
}
}
{@literal @}Override
public Tuple2<T, Long> map(T value) {
count++;
return new Tuple2<>(value, count);
}
}
Modifier and Type | Method and Description |
---|---|
void |
restoreState(List<T> state)
Deprecated.
Restores the state of the function or operator to that of a previous checkpoint.
|
List<T> |
snapshotState(long checkpointId,
long timestamp)
Deprecated.
Gets the current state of the function.
|
List<T> snapshotState(long checkpointId, long timestamp) throws Exception
The returned list should contain one entry for redistributable unit of state. See the
class docs
for an illustration how list-style state redistribution
works.
As special case, the returned list may be null or empty (if the operator has no state) or it may contain a single element (if the operator state is indivisible).
checkpointId
- The ID of the checkpoint - a unique and monotonously increasing value.timestamp
- The wall clock timestamp when the checkpoint was triggered by the master.Exception
- Thrown if the creation of the state object failed. This causes the
checkpoint to fail. The system may decide to fail the operation (and trigger recovery),
or to discard this checkpoint attempt and to continue running and to try again with the
next checkpoint attempt.void restoreState(List<T> state) throws Exception
The given state list will contain all the sub states that this parallel instance of
the function needs to handle. Refer to the class docs
for an
illustration how list-style state redistribution works.
Important: When implementing this interface together with RichFunction
,
then the restoreState()
method is called before RichFunction.open(OpenContext)
.
state
- The state to be restored as a list of atomic sub-states.Exception
- Throwing an exception in this method causes the recovery to fail. The exact
consequence depends on the configured failure handling strategy, but typically the system
will re-attempt the recovery, or try recovering from a different checkpoint.Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.