Class BatchExecutionKeyedStateBackend<K>
- java.lang.Object
-
- org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionKeyedStateBackend<K>
-
- All Implemented Interfaces:
Closeable
,AutoCloseable
,CheckpointableKeyedStateBackend<K>
,KeyedStateBackend<K>
,KeyedStateFactory
,PriorityQueueSetFactory
,Snapshotable<SnapshotResult<KeyedStateHandle>>
,Disposable
public class BatchExecutionKeyedStateBackend<K> extends Object implements CheckpointableKeyedStateBackend<K>
ACheckpointableKeyedStateBackend
which keeps values for a single key at a time.IMPORTANT: Requires the incoming records to be sorted/grouped by the key. Used in a BATCH style execution.
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface org.apache.flink.runtime.state.KeyedStateBackend
KeyedStateBackend.KeySelectionListener<K>
-
-
Constructor Summary
Constructors Constructor Description BatchExecutionKeyedStateBackend(TypeSerializer<K> keySerializer, KeyGroupRange keyGroupRange, ExecutionConfig executionConfig)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description <N,S extends State,T>
voidapplyToAllKeys(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S,T> stateDescriptor, KeyedStateFunction<K,S> function)
Applies the providedKeyedStateFunction
to the state with the providedStateDescriptor
of all the currently active keys.void
close()
<T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
KeyGroupedInternalPriorityQueue<T>create(String stateName, TypeSerializer<T> byteOrderedElementSerializer)
Creates aKeyGroupedInternalPriorityQueue
.<N,SV,SEV,S extends State,IS extends S>
IScreateOrUpdateInternalState(TypeSerializer<N> namespaceSerializer, StateDescriptor<S,SV> stateDesc, StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory)
Creates or updates internal state and returns a newInternalKvState
.boolean
deregisterKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> listener)
Stop calling listener registered inKeyedStateBackend.registerKeySelectionListener(org.apache.flink.runtime.state.KeyedStateBackend.KeySelectionListener<K>)
.void
dispose()
Disposes the object and releases all resources.K
getCurrentKey()
KeyGroupRange
getKeyGroupRange()
Returns the key groups which this state backend is responsible for.<N> Stream<K>
getKeys(String state, N namespace)
<N> Stream<Tuple2<K,N>>
getKeysAndNamespaces(String state)
TypeSerializer<K>
getKeySerializer()
<N,S extends State,T>
SgetOrCreateKeyedState(TypeSerializer<N> namespaceSerializer, StateDescriptor<S,T> stateDescriptor)
Creates or retrieves a keyed state backed by this state backend.<N,S extends State>
SgetPartitionedState(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S,?> stateDescriptor)
Creates or retrieves a partitioned state backed by this state backend.void
registerKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> listener)
State backend will callKeyedStateBackend.KeySelectionListener.keySelected(K)
when key context is switched if supported.SavepointResources<K>
savepoint()
Returns aSavepointResources
that can be used bySavepointSnapshotStrategy
to write out a savepoint in the common/unified format.void
setCurrentKey(K newKey)
Sets the current key that is used for partitioned state.void
setCurrentKeyAndKeyGroup(K newKey, int newKeyGroupIndex)
Act as a fast path forKeyedStateBackend.setCurrentKey(K)
when the key group is known.RunnableFuture<SnapshotResult<KeyedStateHandle>>
snapshot(long checkpointId, long timestamp, CheckpointStreamFactory streamFactory, CheckpointOptions checkpointOptions)
Operation that writes a snapshot into a stream that is provided by the givenCheckpointStreamFactory
and returns a @RunnableFuture
that gives a state handle to the snapshot.-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.apache.flink.runtime.state.KeyedStateBackend
isSafeToReuseKVState
-
Methods inherited from interface org.apache.flink.runtime.state.KeyedStateFactory
createOrUpdateInternalState, createOrUpdateInternalState
-
Methods inherited from interface org.apache.flink.runtime.state.PriorityQueueSetFactory
create
-
-
-
-
Constructor Detail
-
BatchExecutionKeyedStateBackend
public BatchExecutionKeyedStateBackend(TypeSerializer<K> keySerializer, KeyGroupRange keyGroupRange, ExecutionConfig executionConfig)
-
-
Method Detail
-
setCurrentKey
public void setCurrentKey(K newKey)
Description copied from interface:KeyedStateBackend
Sets the current key that is used for partitioned state.- Specified by:
setCurrentKey
in interfaceKeyedStateBackend<K>
- Parameters:
newKey
- The new current key.
-
getCurrentKey
public K getCurrentKey()
- Specified by:
getCurrentKey
in interfaceKeyedStateBackend<K>
- Returns:
- Current key.
-
setCurrentKeyAndKeyGroup
public void setCurrentKeyAndKeyGroup(K newKey, int newKeyGroupIndex)
Description copied from interface:KeyedStateBackend
Act as a fast path forKeyedStateBackend.setCurrentKey(K)
when the key group is known.- Specified by:
setCurrentKeyAndKeyGroup
in interfaceKeyedStateBackend<K>
-
getKeySerializer
public TypeSerializer<K> getKeySerializer()
- Specified by:
getKeySerializer
in interfaceKeyedStateBackend<K>
- Returns:
- Serializer of the key.
-
applyToAllKeys
public <N,S extends State,T> void applyToAllKeys(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S,T> stateDescriptor, KeyedStateFunction<K,S> function)
Description copied from interface:KeyedStateBackend
Applies the providedKeyedStateFunction
to the state with the providedStateDescriptor
of all the currently active keys.- Specified by:
applyToAllKeys
in interfaceKeyedStateBackend<K>
- Type Parameters:
N
- The type of the namespace.S
- The type of the state.- Parameters:
namespace
- the namespace of the state.namespaceSerializer
- the serializer for the namespace.stateDescriptor
- the descriptor of the state to which the function is going to be applied.function
- the function to be applied to the keyed state.
-
getKeys
public <N> Stream<K> getKeys(String state, N namespace)
- Specified by:
getKeys
in interfaceKeyedStateBackend<K>
- Parameters:
state
- State variable for which existing keys will be returned.namespace
- Namespace for which existing keys will be returned.- Returns:
- A stream of all keys for the given state and namespace. Modifications to the state during iterating over it keys are not supported.
-
getKeysAndNamespaces
public <N> Stream<Tuple2<K,N>> getKeysAndNamespaces(String state)
- Specified by:
getKeysAndNamespaces
in interfaceKeyedStateBackend<K>
- Parameters:
state
- State variable for which existing keys will be returned.- Returns:
- A stream of all keys for the given state and namespace. Modifications to the state during iterating over it keys are not supported. Implementations go not make any ordering guarantees about the returned tupes. Two records with the same key or namespace may not be returned near each other in the stream.
-
getOrCreateKeyedState
public <N,S extends State,T> S getOrCreateKeyedState(TypeSerializer<N> namespaceSerializer, StateDescriptor<S,T> stateDescriptor) throws Exception
Description copied from interface:KeyedStateBackend
Creates or retrieves a keyed state backed by this state backend.- Specified by:
getOrCreateKeyedState
in interfaceKeyedStateBackend<K>
- Type Parameters:
N
- The type of the namespace.S
- The type of the state.- Parameters:
namespaceSerializer
- The serializer used for the namespace type of the statestateDescriptor
- The identifier for the state. This contains name and can create a default state value.- Returns:
- A new key/value state backed by this backend.
- Throws:
Exception
- Exceptions may occur during initialization of the state and should be forwarded.
-
getPartitionedState
public <N,S extends State> S getPartitionedState(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S,?> stateDescriptor) throws Exception
Description copied from interface:KeyedStateBackend
Creates or retrieves a partitioned state backed by this state backend.TODO: NOTE: This method does a lot of work caching / retrieving states just to update the namespace. This method should be removed for the sake of namespaces being lazily fetched from the keyed state backend, or being set on the state directly.
- Specified by:
getPartitionedState
in interfaceKeyedStateBackend<K>
- Type Parameters:
N
- The type of the namespace.S
- The type of the state.stateDescriptor
- The identifier for the state. This contains name and can create a default state value.- Returns:
- A new key/value state backed by this backend.
- Throws:
Exception
- Exceptions may occur during initialization of the state and should be forwarded.
-
dispose
public void dispose()
Description copied from interface:Disposable
Disposes the object and releases all resources. After calling this method, calling any methods on the object may result in undefined behavior.- Specified by:
dispose
in interfaceDisposable
- Specified by:
dispose
in interfaceKeyedStateBackend<K>
-
registerKeySelectionListener
public void registerKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> listener)
Description copied from interface:KeyedStateBackend
State backend will callKeyedStateBackend.KeySelectionListener.keySelected(K)
when key context is switched if supported.- Specified by:
registerKeySelectionListener
in interfaceKeyedStateBackend<K>
-
deregisterKeySelectionListener
public boolean deregisterKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> listener)
Description copied from interface:KeyedStateBackend
Stop calling listener registered inKeyedStateBackend.registerKeySelectionListener(org.apache.flink.runtime.state.KeyedStateBackend.KeySelectionListener<K>)
.- Specified by:
deregisterKeySelectionListener
in interfaceKeyedStateBackend<K>
- Returns:
- returns true iff listener was registered before.
-
createOrUpdateInternalState
@Nonnull public <N,SV,SEV,S extends State,IS extends S> IS createOrUpdateInternalState(@Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<S,SV> stateDesc, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception
Description copied from interface:KeyedStateFactory
Creates or updates internal state and returns a newInternalKvState
.- Specified by:
createOrUpdateInternalState
in interfaceKeyedStateFactory
- Type Parameters:
N
- The type of the namespace.SV
- The type of the stored state value.SEV
- The type of the stored state value or entry for collection types (list or map).S
- The type of the public API state.IS
- The type of internal state.- Parameters:
namespaceSerializer
- TypeSerializer for the state namespace.stateDesc
- TheStateDescriptor
that contains the name of the state.snapshotTransformFactory
- factory of state snapshot transformer.- Throws:
Exception
-
create
@Nonnull public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer)
Description copied from interface:PriorityQueueSetFactory
Creates aKeyGroupedInternalPriorityQueue
.- Specified by:
create
in interfacePriorityQueueSetFactory
- Type Parameters:
T
- type of the stored elements.- Parameters:
stateName
- unique name for associated with this queue.byteOrderedElementSerializer
- a serializer that with a format that is lexicographically ordered in alignment with elementPriorityComparator.- Returns:
- the queue with the specified unique name.
-
getKeyGroupRange
public KeyGroupRange getKeyGroupRange()
Description copied from interface:CheckpointableKeyedStateBackend
Returns the key groups which this state backend is responsible for.- Specified by:
getKeyGroupRange
in interfaceCheckpointableKeyedStateBackend<K>
-
close
public void close() throws IOException
- Specified by:
close
in interfaceAutoCloseable
- Specified by:
close
in interfaceCloseable
- Throws:
IOException
-
snapshot
@Nonnull public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions)
Description copied from interface:Snapshotable
Operation that writes a snapshot into a stream that is provided by the givenCheckpointStreamFactory
and returns a @RunnableFuture
that gives a state handle to the snapshot. It is up to the implementation if the operation is performed synchronous or asynchronous. In the later case, the returned Runnable must be executed first before obtaining the handle.- Specified by:
snapshot
in interfaceSnapshotable<K>
- Parameters:
checkpointId
- The ID of the checkpoint.timestamp
- The timestamp of the checkpoint.streamFactory
- The factory that we can use for writing our state to streams.checkpointOptions
- Options for how to perform this checkpoint.- Returns:
- A runnable future that will yield a
StateObject
.
-
savepoint
@Nonnull public SavepointResources<K> savepoint() throws Exception
Description copied from interface:CheckpointableKeyedStateBackend
Returns aSavepointResources
that can be used bySavepointSnapshotStrategy
to write out a savepoint in the common/unified format.- Specified by:
savepoint
in interfaceCheckpointableKeyedStateBackend<K>
- Throws:
Exception
-
-