Class ChangelogKeyedStateBackend<K>
- java.lang.Object
-
- org.apache.flink.state.changelog.ChangelogKeyedStateBackend<K>
-
- Type Parameters:
K
- The key by which state is keyed.
- All Implemented Interfaces:
Closeable
,AutoCloseable
,CheckpointListener
,InternalCheckpointListener
,CheckpointableKeyedStateBackend<K>
,KeyedStateBackend<K>
,KeyedStateFactory
,PriorityQueueSetFactory
,Snapshotable<SnapshotResult<KeyedStateHandle>>
,TestableKeyedStateBackend<K>
,PeriodicMaterializationManager.MaterializationTarget
,Disposable
@Internal public class ChangelogKeyedStateBackend<K> extends Object implements CheckpointableKeyedStateBackend<K>, CheckpointListener, TestableKeyedStateBackend<K>, InternalCheckpointListener, PeriodicMaterializationManager.MaterializationTarget
AKeyedStateBackend
that keeps state on the underlying delegated keyed state backend as well as on the state change log.
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface org.apache.flink.runtime.state.KeyedStateBackend
KeyedStateBackend.KeySelectionListener<K>
-
-
Field Summary
-
Fields inherited from interface org.apache.flink.state.common.PeriodicMaterializationManager.MaterializationTarget
NO_OP
-
-
Constructor Summary
Constructors Constructor Description ChangelogKeyedStateBackend(AbstractKeyedStateBackend<K> keyedStateBackend, String subtaskName, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, StateChangelogWriter<? extends ChangelogStateHandle> stateChangelogWriter, Collection<ChangelogStateBackendHandle> initialState, CheckpointStorageWorkerView checkpointStorageWorkerView)
ChangelogKeyedStateBackend(AbstractKeyedStateBackend<K> keyedStateBackend, String subtaskName, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, org.apache.flink.state.changelog.ChangelogStateBackendMetricGroup metricGroup, StateChangelogWriter<? extends ChangelogStateHandle> stateChangelogWriter, Collection<ChangelogStateBackendHandle> initialState, CheckpointStorageWorkerView checkpointStorageWorkerView, ChangelogStateFactory changelogStateFactory)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description <N,S extends State,T>
voidapplyToAllKeys(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S,T> stateDescriptor, KeyedStateFunction<K,S> function)
Applies the providedKeyedStateFunction
to the state with the providedStateDescriptor
of all the currently active keys.void
close()
<T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
KeyGroupedInternalPriorityQueue<T>create(String stateName, TypeSerializer<T> byteOrderedElementSerializer)
Creates aKeyGroupedInternalPriorityQueue
.<N,SV,SEV,S extends State,IS extends S>
IScreateOrUpdateInternalState(TypeSerializer<N> namespaceSerializer, StateDescriptor<S,SV> stateDesc, StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory)
Creates or updates internal state and returns a newInternalKvState
.boolean
deregisterKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> listener)
Stop calling listener registered inKeyedStateBackend.registerKeySelectionListener(org.apache.flink.runtime.state.KeyedStateBackend.KeySelectionListener<K>)
.void
dispose()
Disposes the object and releases all resources.ChangelogRestoreTarget<K>
getChangelogRestoreTarget()
K
getCurrentKey()
KeyedStateBackend<K>
getDelegatedKeyedStateBackend(boolean recursive)
KeyGroupRange
getKeyGroupRange()
Returns the key groups which this state backend is responsible for.<N> Stream<K>
getKeys(String state, N namespace)
<N> Stream<Tuple2<K,N>>
getKeysAndNamespaces(String state)
TypeSerializer<K>
getKeySerializer()
<N,S extends State,T>
SgetOrCreateKeyedState(TypeSerializer<N> namespaceSerializer, StateDescriptor<S,T> stateDescriptor)
Creates or retrieves a keyed state backed by this state backend.<N,S extends State>
SgetPartitionedState(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S,?> stateDescriptor)
Creates or retrieves a partitioned state backed by this state backend.void
handleMaterializationFailureOrCancellation(long materializationID, SequenceNumber upTo, Throwable cause)
void
handleMaterializationResult(SnapshotResult<KeyedStateHandle> materializedSnapshot, long materializationID, SequenceNumber upTo)
This method is not thread safe.Optional<PeriodicMaterializationManager.MaterializationRunnable>
initMaterialization()
Initialize state materialization so that materialized data can be persisted durably and included into the checkpoint.boolean
isSafeToReuseKVState()
Whether it's safe to reuse key-values from the state-backend, e.g for the purpose of optimization.void
notifyCheckpointAborted(long checkpointId)
This method is called as a notification once a distributed checkpoint has been aborted.void
notifyCheckpointComplete(long checkpointId)
Notifies the listener that the checkpoint with the givencheckpointId
completed and was committed.void
notifyCheckpointSubsumed(long checkpointId)
This method is called as a notification once a distributed checkpoint has been subsumed.int
numKeyValueStateEntries()
Returns the total number of state entries across all keys/namespaces.void
registerCloseable(Closeable closeable)
void
registerKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> listener)
State backend will callKeyedStateBackend.KeySelectionListener.keySelected(K)
when key context is switched if supported.SavepointResources<K>
savepoint()
Returns aSavepointResources
that can be used bySavepointSnapshotStrategy
to write out a savepoint in the common/unified format.void
setCurrentKey(K newKey)
Sets the current key that is used for partitioned state.void
setCurrentKeyAndKeyGroup(K newKey, int newKeyGroupIndex)
Act as a fast path forKeyedStateBackend.setCurrentKey(K)
when the key group is known.RunnableFuture<SnapshotResult<KeyedStateHandle>>
snapshot(long checkpointId, long timestamp, CheckpointStreamFactory streamFactory, CheckpointOptions checkpointOptions)
Operation that writes a snapshot into a stream that is provided by the givenCheckpointStreamFactory
and returns a @RunnableFuture
that gives a state handle to the snapshot.-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.apache.flink.runtime.state.KeyedStateFactory
createOrUpdateInternalState, createOrUpdateInternalState
-
Methods inherited from interface org.apache.flink.runtime.state.PriorityQueueSetFactory
create
-
-
-
-
Constructor Detail
-
ChangelogKeyedStateBackend
public ChangelogKeyedStateBackend(AbstractKeyedStateBackend<K> keyedStateBackend, String subtaskName, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, StateChangelogWriter<? extends ChangelogStateHandle> stateChangelogWriter, Collection<ChangelogStateBackendHandle> initialState, CheckpointStorageWorkerView checkpointStorageWorkerView)
-
ChangelogKeyedStateBackend
public ChangelogKeyedStateBackend(AbstractKeyedStateBackend<K> keyedStateBackend, String subtaskName, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, org.apache.flink.state.changelog.ChangelogStateBackendMetricGroup metricGroup, StateChangelogWriter<? extends ChangelogStateHandle> stateChangelogWriter, Collection<ChangelogStateBackendHandle> initialState, CheckpointStorageWorkerView checkpointStorageWorkerView, ChangelogStateFactory changelogStateFactory)
-
-
Method Detail
-
getKeyGroupRange
public KeyGroupRange getKeyGroupRange()
Description copied from interface:CheckpointableKeyedStateBackend
Returns the key groups which this state backend is responsible for.- Specified by:
getKeyGroupRange
in interfaceCheckpointableKeyedStateBackend<K>
-
close
public void close() throws IOException
- Specified by:
close
in interfaceAutoCloseable
- Specified by:
close
in interfaceCloseable
- Throws:
IOException
-
setCurrentKey
public void setCurrentKey(K newKey)
Description copied from interface:KeyedStateBackend
Sets the current key that is used for partitioned state.- Specified by:
setCurrentKey
in interfaceKeyedStateBackend<K>
- Parameters:
newKey
- The new current key.
-
getCurrentKey
public K getCurrentKey()
- Specified by:
getCurrentKey
in interfaceKeyedStateBackend<K>
- Returns:
- Current key.
-
setCurrentKeyAndKeyGroup
public void setCurrentKeyAndKeyGroup(K newKey, int newKeyGroupIndex)
Description copied from interface:KeyedStateBackend
Act as a fast path forKeyedStateBackend.setCurrentKey(K)
when the key group is known.- Specified by:
setCurrentKeyAndKeyGroup
in interfaceKeyedStateBackend<K>
-
getKeySerializer
public TypeSerializer<K> getKeySerializer()
- Specified by:
getKeySerializer
in interfaceKeyedStateBackend<K>
- Returns:
- Serializer of the key.
-
getKeys
public <N> Stream<K> getKeys(String state, N namespace)
- Specified by:
getKeys
in interfaceKeyedStateBackend<K>
- Parameters:
state
- State variable for which existing keys will be returned.namespace
- Namespace for which existing keys will be returned.- Returns:
- A stream of all keys for the given state and namespace. Modifications to the state during iterating over it keys are not supported.
-
getKeysAndNamespaces
public <N> Stream<Tuple2<K,N>> getKeysAndNamespaces(String state)
- Specified by:
getKeysAndNamespaces
in interfaceKeyedStateBackend<K>
- Parameters:
state
- State variable for which existing keys will be returned.- Returns:
- A stream of all keys for the given state and namespace. Modifications to the state during iterating over it keys are not supported. Implementations go not make any ordering guarantees about the returned tupes. Two records with the same key or namespace may not be returned near each other in the stream.
-
dispose
public void dispose()
Description copied from interface:Disposable
Disposes the object and releases all resources. After calling this method, calling any methods on the object may result in undefined behavior.- Specified by:
dispose
in interfaceDisposable
- Specified by:
dispose
in interfaceKeyedStateBackend<K>
-
registerKeySelectionListener
public void registerKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> listener)
Description copied from interface:KeyedStateBackend
State backend will callKeyedStateBackend.KeySelectionListener.keySelected(K)
when key context is switched if supported.- Specified by:
registerKeySelectionListener
in interfaceKeyedStateBackend<K>
-
deregisterKeySelectionListener
public boolean deregisterKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> listener)
Description copied from interface:KeyedStateBackend
Stop calling listener registered inKeyedStateBackend.registerKeySelectionListener(org.apache.flink.runtime.state.KeyedStateBackend.KeySelectionListener<K>)
.- Specified by:
deregisterKeySelectionListener
in interfaceKeyedStateBackend<K>
- Returns:
- returns true iff listener was registered before.
-
applyToAllKeys
public <N,S extends State,T> void applyToAllKeys(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S,T> stateDescriptor, KeyedStateFunction<K,S> function) throws Exception
Description copied from interface:KeyedStateBackend
Applies the providedKeyedStateFunction
to the state with the providedStateDescriptor
of all the currently active keys.- Specified by:
applyToAllKeys
in interfaceKeyedStateBackend<K>
- Type Parameters:
N
- The type of the namespace.S
- The type of the state.- Parameters:
namespace
- the namespace of the state.namespaceSerializer
- the serializer for the namespace.stateDescriptor
- the descriptor of the state to which the function is going to be applied.function
- the function to be applied to the keyed state.- Throws:
Exception
-
getPartitionedState
public <N,S extends State> S getPartitionedState(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S,?> stateDescriptor) throws Exception
Description copied from interface:KeyedStateBackend
Creates or retrieves a partitioned state backed by this state backend.TODO: NOTE: This method does a lot of work caching / retrieving states just to update the namespace. This method should be removed for the sake of namespaces being lazily fetched from the keyed state backend, or being set on the state directly.
- Specified by:
getPartitionedState
in interfaceKeyedStateBackend<K>
- Type Parameters:
N
- The type of the namespace.S
- The type of the state.stateDescriptor
- The identifier for the state. This contains name and can create a default state value.- Returns:
- A new key/value state backed by this backend.
- Throws:
Exception
- Exceptions may occur during initialization of the state and should be forwarded.
-
snapshot
@Nonnull public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception
Description copied from interface:Snapshotable
Operation that writes a snapshot into a stream that is provided by the givenCheckpointStreamFactory
and returns a @RunnableFuture
that gives a state handle to the snapshot. It is up to the implementation if the operation is performed synchronous or asynchronous. In the later case, the returned Runnable must be executed first before obtaining the handle.- Specified by:
snapshot
in interfaceSnapshotable<K>
- Parameters:
checkpointId
- The ID of the checkpoint.timestamp
- The timestamp of the checkpoint.streamFactory
- The factory that we can use for writing our state to streams.checkpointOptions
- Options for how to perform this checkpoint.- Returns:
- A runnable future that will yield a
StateObject
. - Throws:
Exception
-
create
@Nonnull public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer)
Description copied from interface:PriorityQueueSetFactory
Creates aKeyGroupedInternalPriorityQueue
.- Specified by:
create
in interfacePriorityQueueSetFactory
- Type Parameters:
T
- type of the stored elements.- Parameters:
stateName
- unique name for associated with this queue.byteOrderedElementSerializer
- a serializer that with a format that is lexicographically ordered in alignment with elementPriorityComparator.- Returns:
- the queue with the specified unique name.
-
numKeyValueStateEntries
@VisibleForTesting public int numKeyValueStateEntries()
Description copied from interface:TestableKeyedStateBackend
Returns the total number of state entries across all keys/namespaces.- Specified by:
numKeyValueStateEntries
in interfaceTestableKeyedStateBackend<K>
-
isSafeToReuseKVState
public boolean isSafeToReuseKVState()
Description copied from interface:KeyedStateBackend
Whether it's safe to reuse key-values from the state-backend, e.g for the purpose of optimization.NOTE: this method should not be used to check for
InternalPriorityQueue
, as the priority queue could be stored on different locations, e.g RocksDB state-backend could store that on JVM heap if configuring HEAP as the time-service factory.- Specified by:
isSafeToReuseKVState
in interfaceKeyedStateBackend<K>
- Returns:
- returns ture if safe to reuse the key-values from the state-backend.
-
savepoint
@Nonnull public SavepointResources<K> savepoint() throws Exception
Description copied from interface:CheckpointableKeyedStateBackend
Returns aSavepointResources
that can be used bySavepointSnapshotStrategy
to write out a savepoint in the common/unified format.- Specified by:
savepoint
in interfaceCheckpointableKeyedStateBackend<K>
- Throws:
Exception
-
notifyCheckpointComplete
public void notifyCheckpointComplete(long checkpointId) throws Exception
Description copied from interface:CheckpointListener
Notifies the listener that the checkpoint with the givencheckpointId
completed and was committed.These notifications are "best effort", meaning they can sometimes be skipped. To behave properly, implementers need to follow the "Checkpoint Subsuming Contract". Please see the
class-level JavaDocs
for details.Please note that checkpoints may generally overlap, so you cannot assume that the
notifyCheckpointComplete()
call is always for the latest prior checkpoint (or snapshot) that was taken on the function/operator implementing this interface. It might be for a checkpoint that was triggered earlier. Implementing the "Checkpoint Subsuming Contract" (see above) properly handles this situation correctly as well.Please note that throwing exceptions from this method will not cause the completed checkpoint to be revoked. Throwing exceptions will typically cause task/job failure and trigger recovery.
- Specified by:
notifyCheckpointComplete
in interfaceCheckpointListener
- Parameters:
checkpointId
- The ID of the checkpoint that has been completed.- Throws:
Exception
- This method can propagate exceptions, which leads to a failure/recovery for the task. Note that this will NOT lead to the checkpoint being revoked.
-
notifyCheckpointAborted
public void notifyCheckpointAborted(long checkpointId) throws Exception
Description copied from interface:CheckpointListener
This method is called as a notification once a distributed checkpoint has been aborted.Important: The fact that a checkpoint has been aborted does NOT mean that the data and artifacts produced between the previous checkpoint and the aborted checkpoint are to be discarded. The expected behavior is as if this checkpoint was never triggered in the first place, and the next successful checkpoint simply covers a longer time span. See the "Checkpoint Subsuming Contract" in the
class-level JavaDocs
for details.These notifications are "best effort", meaning they can sometimes be skipped.
This method is very rarely necessary to implement. The "best effort" guarantee, together with the fact that this method should not result in discarding any data (per the "Checkpoint Subsuming Contract") means it is mainly useful for earlier cleanups of auxiliary resources. One example is to pro-actively clear a local per-checkpoint state cache upon checkpoint failure.
- Specified by:
notifyCheckpointAborted
in interfaceCheckpointListener
- Parameters:
checkpointId
- The ID of the checkpoint that has been aborted.- Throws:
Exception
- This method can propagate exceptions, which leads to a failure/recovery for the task or job.
-
getOrCreateKeyedState
public <N,S extends State,T> S getOrCreateKeyedState(TypeSerializer<N> namespaceSerializer, StateDescriptor<S,T> stateDescriptor) throws Exception
Description copied from interface:KeyedStateBackend
Creates or retrieves a keyed state backed by this state backend.- Specified by:
getOrCreateKeyedState
in interfaceKeyedStateBackend<K>
- Type Parameters:
N
- The type of the namespace.S
- The type of the state.- Parameters:
namespaceSerializer
- The serializer used for the namespace type of the statestateDescriptor
- The identifier for the state. This contains name and can create a default state value.- Returns:
- A new key/value state backed by this backend.
- Throws:
Exception
- Exceptions may occur during initialization of the state and should be forwarded.
-
createOrUpdateInternalState
@Nonnull public <N,SV,SEV,S extends State,IS extends S> IS createOrUpdateInternalState(@Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<S,SV> stateDesc, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception
Description copied from interface:KeyedStateFactory
Creates or updates internal state and returns a newInternalKvState
.- Specified by:
createOrUpdateInternalState
in interfaceKeyedStateFactory
- Type Parameters:
N
- The type of the namespace.SV
- The type of the stored state value.SEV
- The type of the stored state value or entry for collection types (list or map).S
- The type of the public API state.IS
- The type of internal state.- Parameters:
namespaceSerializer
- TypeSerializer for the state namespace.stateDesc
- TheStateDescriptor
that contains the name of the state.snapshotTransformFactory
- factory of state snapshot transformer.- Throws:
Exception
-
registerCloseable
public void registerCloseable(@Nullable Closeable closeable)
-
initMaterialization
public Optional<PeriodicMaterializationManager.MaterializationRunnable> initMaterialization() throws Exception
Initialize state materialization so that materialized data can be persisted durably and included into the checkpoint.This method is not thread safe. It should be called either under a lock or through task mailbox executor.
- Specified by:
initMaterialization
in interfacePeriodicMaterializationManager.MaterializationTarget
- Returns:
- a tuple of - future snapshot result from the underlying state backend - a
SequenceNumber
identifying the latest change in the changelog - Throws:
Exception
-
handleMaterializationResult
public void handleMaterializationResult(SnapshotResult<KeyedStateHandle> materializedSnapshot, long materializationID, SequenceNumber upTo)
This method is not thread safe. It should be called either under a lock or through task mailbox executor.- Specified by:
handleMaterializationResult
in interfacePeriodicMaterializationManager.MaterializationTarget
-
handleMaterializationFailureOrCancellation
public void handleMaterializationFailureOrCancellation(long materializationID, SequenceNumber upTo, Throwable cause)
- Specified by:
handleMaterializationFailureOrCancellation
in interfacePeriodicMaterializationManager.MaterializationTarget
-
getDelegatedKeyedStateBackend
public KeyedStateBackend<K> getDelegatedKeyedStateBackend(boolean recursive)
- Specified by:
getDelegatedKeyedStateBackend
in interfaceTestableKeyedStateBackend<K>
- Parameters:
recursive
- true if the call should be recursive- Returns:
- delegated
KeyedStateBackend
if this backends delegates its responisibilities..
-
notifyCheckpointSubsumed
public void notifyCheckpointSubsumed(long checkpointId) throws Exception
Description copied from interface:InternalCheckpointListener
This method is called as a notification once a distributed checkpoint has been subsumed.These notifications are "best effort", meaning they can sometimes be skipped.
This method is very rarely necessary to implement. The "best effort" guarantee, together with the fact that this method should not result in discarding any data (per the "Checkpoint Subsuming Contract") means it is mainly useful for earlier cleanups of auxiliary resources.
- Specified by:
notifyCheckpointSubsumed
in interfaceInternalCheckpointListener
- Parameters:
checkpointId
- The ID of the checkpoint that has been subsumed.- Throws:
Exception
- This method can propagate exceptions, which leads to a failure/recovery for the task or job.
-
getChangelogRestoreTarget
public ChangelogRestoreTarget<K> getChangelogRestoreTarget()
-
-