Class DefaultOperatorStateBackend
- java.lang.Object
-
- org.apache.flink.runtime.state.DefaultOperatorStateBackend
-
- All Implemented Interfaces:
Closeable
,AutoCloseable
,OperatorStateStore
,OperatorStateBackend
,Snapshotable<SnapshotResult<OperatorStateHandle>>
,Disposable
@Internal public class DefaultOperatorStateBackend extends Object implements OperatorStateBackend
Default implementation of OperatorStateStore that provides the ability to make snapshots.
-
-
Field Summary
Fields Modifier and Type Field Description static String
DEFAULT_OPERATOR_STATE_NAME
The default namespace for state in cases where no state name is provided
-
Constructor Summary
Constructors Constructor Description DefaultOperatorStateBackend(ExecutionConfig executionConfig, CloseableRegistry closeStreamOnCancelRegistry, Map<String,PartitionableListState<?>> registeredOperatorStates, Map<String,BackendWritableBroadcastState<?,?>> registeredBroadcastStates, Map<String,PartitionableListState<?>> accessedStatesByName, Map<String,BackendWritableBroadcastState<?,?>> accessedBroadcastStatesByName, SnapshotStrategyRunner<OperatorStateHandle,?> snapshotStrategyRunner)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
close()
void
dispose()
Disposes the object and releases all resources.<K,V>
BroadcastState<K,V>getBroadcastState(MapStateDescriptor<K,V> stateDescriptor)
Creates (or restores) abroadcast state
.ExecutionConfig
getExecutionConfig()
<S> ListState<S>
getListState(ListStateDescriptor<S> stateDescriptor)
Creates (or restores) a list state.Set<String>
getRegisteredBroadcastStateNames()
Returns a set with the names of all currently registered broadcast states.Set<String>
getRegisteredStateNames()
Returns a set with the names of all currently registered states.<S> ListState<S>
getUnionListState(ListStateDescriptor<S> stateDescriptor)
Creates (or restores) a list state.RunnableFuture<SnapshotResult<OperatorStateHandle>>
snapshot(long checkpointId, long timestamp, CheckpointStreamFactory streamFactory, CheckpointOptions checkpointOptions)
Operation that writes a snapshot into a stream that is provided by the givenCheckpointStreamFactory
and returns a @RunnableFuture
that gives a state handle to the snapshot.
-
-
-
Field Detail
-
DEFAULT_OPERATOR_STATE_NAME
public static final String DEFAULT_OPERATOR_STATE_NAME
The default namespace for state in cases where no state name is provided- See Also:
- Constant Field Values
-
-
Constructor Detail
-
DefaultOperatorStateBackend
public DefaultOperatorStateBackend(ExecutionConfig executionConfig, CloseableRegistry closeStreamOnCancelRegistry, Map<String,PartitionableListState<?>> registeredOperatorStates, Map<String,BackendWritableBroadcastState<?,?>> registeredBroadcastStates, Map<String,PartitionableListState<?>> accessedStatesByName, Map<String,BackendWritableBroadcastState<?,?>> accessedBroadcastStatesByName, SnapshotStrategyRunner<OperatorStateHandle,?> snapshotStrategyRunner)
-
-
Method Detail
-
getExecutionConfig
public ExecutionConfig getExecutionConfig()
-
getRegisteredStateNames
public Set<String> getRegisteredStateNames()
Description copied from interface:OperatorStateStore
Returns a set with the names of all currently registered states.- Specified by:
getRegisteredStateNames
in interfaceOperatorStateStore
- Returns:
- set of names for all registered states.
-
getRegisteredBroadcastStateNames
public Set<String> getRegisteredBroadcastStateNames()
Description copied from interface:OperatorStateStore
Returns a set with the names of all currently registered broadcast states.- Specified by:
getRegisteredBroadcastStateNames
in interfaceOperatorStateStore
- Returns:
- set of names for all registered broadcast states.
-
close
public void close() throws IOException
- Specified by:
close
in interfaceAutoCloseable
- Specified by:
close
in interfaceCloseable
- Throws:
IOException
-
dispose
public void dispose()
Description copied from interface:Disposable
Disposes the object and releases all resources. After calling this method, calling any methods on the object may result in undefined behavior.- Specified by:
dispose
in interfaceDisposable
- Specified by:
dispose
in interfaceOperatorStateBackend
-
getBroadcastState
public <K,V> BroadcastState<K,V> getBroadcastState(MapStateDescriptor<K,V> stateDescriptor) throws StateMigrationException
Description copied from interface:OperatorStateStore
Creates (or restores) abroadcast state
. This type of state can only be created to store the state of aBroadcastStream
. Each state is registered under a unique name. The provided serializer is used to de/serialize the state in case of checkpointing (snapshot/restore). The returned broadcast state haskey-value
format.CAUTION: the user has to guarantee that all task instances store the same elements in this type of state.
Each operator instance individually maintains and stores elements in the broadcast state. The fact that the incoming stream is a broadcast one guarantees that all instances see all the elements. Upon recovery or re-scaling, the same state is given to each of the instances. To avoid hotspots, each task reads its previous partition, and if there are more tasks (scale up), then the new instances read from the old instances in a round robin fashion. This is why each instance has to guarantee that it stores the same elements as the rest. If not, upon recovery or rescaling you may have unpredictable redistribution of the partitions, thus unpredictable results.
- Specified by:
getBroadcastState
in interfaceOperatorStateStore
- Type Parameters:
K
- The type of the keys in the broadcast state.V
- The type of the values in the broadcast state.- Parameters:
stateDescriptor
- The descriptor for this state, providing a name, a serializer for the keys and one for the values.- Returns:
- The Broadcast State
- Throws:
StateMigrationException
-
getListState
public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception
Description copied from interface:OperatorStateStore
Creates (or restores) a list state. Each state is registered under a unique name. The provided serializer is used to de/serialize the state in case of checkpointing (snapshot/restore).Note the semantic differences between an operator list state and a keyed list state (see
KeyedStateStore.getListState(ListStateDescriptor)
). Under the context of operator state, the list is a collection of state items that are independent of each other and eligible for redistribution across operator instances in case of changed operator parallelism. In other words, these state items are the finest granularity at which non-keyed state can be redistributed, and should not be correlated with each other.The redistribution scheme of this list state upon operator rescaling is a round-robin pattern, such that the logical whole state (a concatenation of all the lists of state elements previously managed by each operator before the restore) is evenly divided into as many sublists as there are parallel operators.
- Specified by:
getListState
in interfaceOperatorStateStore
- Type Parameters:
S
- The generic type of the state- Parameters:
stateDescriptor
- The descriptor for this state, providing a name and serializer.- Returns:
- A list for all state partitions.
- Throws:
Exception
-
getUnionListState
public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception
Description copied from interface:OperatorStateStore
Creates (or restores) a list state. Each state is registered under a unique name. The provided serializer is used to de/serialize the state in case of checkpointing (snapshot/restore).Note the semantic differences between an operator list state and a keyed list state (see
KeyedStateStore.getListState(ListStateDescriptor)
). Under the context of operator state, the list is a collection of state items that are independent of each other and eligible for redistribution across operator instances in case of changed operator parallelism. In other words, these state items are the finest granularity at which non-keyed state can be redistributed, and should not be correlated with each other.The redistribution scheme of this list state upon operator rescaling is a broadcast pattern, such that the logical whole state (a concatenation of all the lists of state elements previously managed by each operator before the restore) is restored to all parallel operators so that each of them will get the union of all state items before the restore.
- Specified by:
getUnionListState
in interfaceOperatorStateStore
- Type Parameters:
S
- The generic type of the state- Parameters:
stateDescriptor
- The descriptor for this state, providing a name and serializer.- Returns:
- A list for all state partitions.
- Throws:
Exception
-
snapshot
@Nonnull public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception
Description copied from interface:Snapshotable
Operation that writes a snapshot into a stream that is provided by the givenCheckpointStreamFactory
and returns a @RunnableFuture
that gives a state handle to the snapshot. It is up to the implementation if the operation is performed synchronous or asynchronous. In the later case, the returned Runnable must be executed first before obtaining the handle.- Specified by:
snapshot
in interfaceSnapshotable<SnapshotResult<OperatorStateHandle>>
- Parameters:
checkpointId
- The ID of the checkpoint.timestamp
- The timestamp of the checkpoint.streamFactory
- The factory that we can use for writing our state to streams.checkpointOptions
- Options for how to perform this checkpoint.- Returns:
- A runnable future that will yield a
StateObject
. - Throws:
Exception
-
-