K
- The key by which state is keyed.@Internal public class ChangelogKeyedStateBackend<K> extends Object implements CheckpointableKeyedStateBackend<K>, CheckpointListener, TestableKeyedStateBackend<K>, InternalCheckpointListener, PeriodicMaterializationManager.MaterializationTarget
KeyedStateBackend
that keeps state on the underlying delegated keyed state backend as
well as on the state change log.KeyedStateBackend.KeySelectionListener<K>
NO_OP
Constructor and Description |
---|
ChangelogKeyedStateBackend(AbstractKeyedStateBackend<K> keyedStateBackend,
String subtaskName,
ExecutionConfig executionConfig,
TtlTimeProvider ttlTimeProvider,
org.apache.flink.state.changelog.ChangelogStateBackendMetricGroup metricGroup,
StateChangelogWriter<? extends ChangelogStateHandle> stateChangelogWriter,
Collection<ChangelogStateBackendHandle> initialState,
CheckpointStorageWorkerView checkpointStorageWorkerView,
ChangelogStateFactory changelogStateFactory) |
ChangelogKeyedStateBackend(AbstractKeyedStateBackend<K> keyedStateBackend,
String subtaskName,
ExecutionConfig executionConfig,
TtlTimeProvider ttlTimeProvider,
MetricGroup metricGroup,
StateChangelogWriter<? extends ChangelogStateHandle> stateChangelogWriter,
Collection<ChangelogStateBackendHandle> initialState,
CheckpointStorageWorkerView checkpointStorageWorkerView) |
Modifier and Type | Method and Description |
---|---|
<N,S extends State,T> |
applyToAllKeys(N namespace,
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S,T> stateDescriptor,
KeyedStateFunction<K,S> function)
Applies the provided
KeyedStateFunction to the state with the provided StateDescriptor of all the currently active keys. |
void |
close() |
<T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>> |
create(String stateName,
TypeSerializer<T> byteOrderedElementSerializer)
Creates a
KeyGroupedInternalPriorityQueue . |
<N,SV,SEV,S extends State,IS extends S> |
createOrUpdateInternalState(TypeSerializer<N> namespaceSerializer,
StateDescriptor<S,SV> stateDesc,
StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory)
Creates or updates internal state and returns a new
InternalKvState . |
boolean |
deregisterKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> listener)
Stop calling listener registered in
KeyedStateBackend.registerKeySelectionListener(org.apache.flink.runtime.state.KeyedStateBackend.KeySelectionListener<K>) . |
void |
dispose()
Disposes the object and releases all resources.
|
ChangelogRestoreTarget<K> |
getChangelogRestoreTarget() |
K |
getCurrentKey() |
KeyedStateBackend<K> |
getDelegatedKeyedStateBackend(boolean recursive) |
KeyGroupRange |
getKeyGroupRange()
Returns the key groups which this state backend is responsible for.
|
<N> Stream<K> |
getKeys(String state,
N namespace) |
<N> Stream<Tuple2<K,N>> |
getKeysAndNamespaces(String state) |
TypeSerializer<K> |
getKeySerializer() |
<N,S extends State,T> |
getOrCreateKeyedState(TypeSerializer<N> namespaceSerializer,
StateDescriptor<S,T> stateDescriptor)
Creates or retrieves a keyed state backed by this state backend.
|
<N,S extends State> |
getPartitionedState(N namespace,
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S,?> stateDescriptor)
Creates or retrieves a partitioned state backed by this state backend.
|
void |
handleMaterializationFailureOrCancellation(long materializationID,
SequenceNumber upTo,
Throwable cause) |
void |
handleMaterializationResult(SnapshotResult<KeyedStateHandle> materializedSnapshot,
long materializationID,
SequenceNumber upTo)
This method is not thread safe.
|
Optional<PeriodicMaterializationManager.MaterializationRunnable> |
initMaterialization()
Initialize state materialization so that materialized data can be persisted durably and
included into the checkpoint.
|
boolean |
isSafeToReuseKVState()
Whether it's safe to reuse key-values from the state-backend, e.g for the purpose of
optimization.
|
void |
notifyCheckpointAborted(long checkpointId)
This method is called as a notification once a distributed checkpoint has been aborted.
|
void |
notifyCheckpointComplete(long checkpointId)
Notifies the listener that the checkpoint with the given
checkpointId completed and
was committed. |
void |
notifyCheckpointSubsumed(long checkpointId)
This method is called as a notification once a distributed checkpoint has been subsumed.
|
int |
numKeyValueStateEntries()
Returns the total number of state entries across all keys/namespaces.
|
void |
registerCloseable(Closeable closeable) |
void |
registerKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> listener)
State backend will call
KeyedStateBackend.KeySelectionListener.keySelected(K) when key context is switched
if supported. |
SavepointResources<K> |
savepoint()
Returns a
SavepointResources that can be used by SavepointSnapshotStrategy to
write out a savepoint in the common/unified format. |
void |
setCurrentKey(K newKey)
Sets the current key that is used for partitioned state.
|
RunnableFuture<SnapshotResult<KeyedStateHandle>> |
snapshot(long checkpointId,
long timestamp,
CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions)
Operation that writes a snapshot into a stream that is provided by the given
CheckpointStreamFactory and returns a @RunnableFuture that gives a state handle to
the snapshot. |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
isStateImmutableInStateBackend
createOrUpdateInternalState, createOrUpdateInternalState
create
public ChangelogKeyedStateBackend(AbstractKeyedStateBackend<K> keyedStateBackend, String subtaskName, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, StateChangelogWriter<? extends ChangelogStateHandle> stateChangelogWriter, Collection<ChangelogStateBackendHandle> initialState, CheckpointStorageWorkerView checkpointStorageWorkerView)
public ChangelogKeyedStateBackend(AbstractKeyedStateBackend<K> keyedStateBackend, String subtaskName, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, org.apache.flink.state.changelog.ChangelogStateBackendMetricGroup metricGroup, StateChangelogWriter<? extends ChangelogStateHandle> stateChangelogWriter, Collection<ChangelogStateBackendHandle> initialState, CheckpointStorageWorkerView checkpointStorageWorkerView, ChangelogStateFactory changelogStateFactory)
public KeyGroupRange getKeyGroupRange()
CheckpointableKeyedStateBackend
getKeyGroupRange
in interface CheckpointableKeyedStateBackend<K>
public void close() throws IOException
close
in interface Closeable
close
in interface AutoCloseable
IOException
public void setCurrentKey(K newKey)
KeyedStateBackend
setCurrentKey
in interface KeyedStateBackend<K>
newKey
- The new current key.public K getCurrentKey()
getCurrentKey
in interface KeyedStateBackend<K>
public TypeSerializer<K> getKeySerializer()
getKeySerializer
in interface KeyedStateBackend<K>
public <N> Stream<K> getKeys(String state, N namespace)
getKeys
in interface KeyedStateBackend<K>
state
- State variable for which existing keys will be returned.namespace
- Namespace for which existing keys will be returned.public <N> Stream<Tuple2<K,N>> getKeysAndNamespaces(String state)
getKeysAndNamespaces
in interface KeyedStateBackend<K>
state
- State variable for which existing keys will be returned.public void dispose()
Disposable
dispose
in interface KeyedStateBackend<K>
dispose
in interface Disposable
public void registerKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> listener)
KeyedStateBackend
KeyedStateBackend.KeySelectionListener.keySelected(K)
when key context is switched
if supported.registerKeySelectionListener
in interface KeyedStateBackend<K>
public boolean deregisterKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> listener)
KeyedStateBackend
KeyedStateBackend.registerKeySelectionListener(org.apache.flink.runtime.state.KeyedStateBackend.KeySelectionListener<K>)
.deregisterKeySelectionListener
in interface KeyedStateBackend<K>
public <N,S extends State,T> void applyToAllKeys(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S,T> stateDescriptor, KeyedStateFunction<K,S> function) throws Exception
KeyedStateBackend
KeyedStateFunction
to the state with the provided StateDescriptor
of all the currently active keys.applyToAllKeys
in interface KeyedStateBackend<K>
N
- The type of the namespace.S
- The type of the state.namespace
- the namespace of the state.namespaceSerializer
- the serializer for the namespace.stateDescriptor
- the descriptor of the state to which the function is going to be
applied.function
- the function to be applied to the keyed state.Exception
public <N,S extends State> S getPartitionedState(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S,?> stateDescriptor) throws Exception
KeyedStateBackend
TODO: NOTE: This method does a lot of work caching / retrieving states just to update the namespace. This method should be removed for the sake of namespaces being lazily fetched from the keyed state backend, or being set on the state directly.
getPartitionedState
in interface KeyedStateBackend<K>
N
- The type of the namespace.S
- The type of the state.stateDescriptor
- The identifier for the state. This contains name and can create a
default state value.Exception
- Exceptions may occur during initialization of the state and should be
forwarded.@Nonnull public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception
Snapshotable
CheckpointStreamFactory
and returns a @RunnableFuture
that gives a state handle to
the snapshot. It is up to the implementation if the operation is performed synchronous or
asynchronous. In the later case, the returned Runnable must be executed first before
obtaining the handle.snapshot
in interface Snapshotable<SnapshotResult<KeyedStateHandle>>
checkpointId
- The ID of the checkpoint.timestamp
- The timestamp of the checkpoint.streamFactory
- The factory that we can use for writing our state to streams.checkpointOptions
- Options for how to perform this checkpoint.StateObject
.Exception
@Nonnull public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer)
PriorityQueueSetFactory
KeyGroupedInternalPriorityQueue
.create
in interface PriorityQueueSetFactory
T
- type of the stored elements.stateName
- unique name for associated with this queue.byteOrderedElementSerializer
- a serializer that with a format that is lexicographically
ordered in alignment with elementPriorityComparator.@VisibleForTesting public int numKeyValueStateEntries()
TestableKeyedStateBackend
numKeyValueStateEntries
in interface TestableKeyedStateBackend<K>
public boolean isSafeToReuseKVState()
KeyedStateBackend
NOTE: this method should not be used to check for InternalPriorityQueue
, as the
priority queue could be stored on different locations, e.g RocksDB state-backend could store
that on JVM heap if configuring HEAP as the time-service factory.
isSafeToReuseKVState
in interface KeyedStateBackend<K>
@Nonnull public SavepointResources<K> savepoint() throws Exception
CheckpointableKeyedStateBackend
SavepointResources
that can be used by SavepointSnapshotStrategy
to
write out a savepoint in the common/unified format.savepoint
in interface CheckpointableKeyedStateBackend<K>
Exception
public void notifyCheckpointComplete(long checkpointId) throws Exception
CheckpointListener
checkpointId
completed and
was committed.
These notifications are "best effort", meaning they can sometimes be skipped. To behave
properly, implementers need to follow the "Checkpoint Subsuming Contract". Please see the
class-level JavaDocs
for details.
Please note that checkpoints may generally overlap, so you cannot assume that the notifyCheckpointComplete()
call is always for the latest prior checkpoint (or snapshot) that
was taken on the function/operator implementing this interface. It might be for a checkpoint
that was triggered earlier. Implementing the "Checkpoint Subsuming Contract" (see above)
properly handles this situation correctly as well.
Please note that throwing exceptions from this method will not cause the completed checkpoint to be revoked. Throwing exceptions will typically cause task/job failure and trigger recovery.
notifyCheckpointComplete
in interface CheckpointListener
checkpointId
- The ID of the checkpoint that has been completed.Exception
- This method can propagate exceptions, which leads to a failure/recovery for
the task. Note that this will NOT lead to the checkpoint being revoked.public void notifyCheckpointAborted(long checkpointId) throws Exception
CheckpointListener
Important: The fact that a checkpoint has been aborted does NOT mean that the data
and artifacts produced between the previous checkpoint and the aborted checkpoint are to be
discarded. The expected behavior is as if this checkpoint was never triggered in the first
place, and the next successful checkpoint simply covers a longer time span. See the
"Checkpoint Subsuming Contract" in the class-level JavaDocs
for
details.
These notifications are "best effort", meaning they can sometimes be skipped.
This method is very rarely necessary to implement. The "best effort" guarantee, together with the fact that this method should not result in discarding any data (per the "Checkpoint Subsuming Contract") means it is mainly useful for earlier cleanups of auxiliary resources. One example is to pro-actively clear a local per-checkpoint state cache upon checkpoint failure.
notifyCheckpointAborted
in interface CheckpointListener
checkpointId
- The ID of the checkpoint that has been aborted.Exception
- This method can propagate exceptions, which leads to a failure/recovery for
the task or job.public <N,S extends State,T> S getOrCreateKeyedState(TypeSerializer<N> namespaceSerializer, StateDescriptor<S,T> stateDescriptor) throws Exception
KeyedStateBackend
getOrCreateKeyedState
in interface KeyedStateBackend<K>
N
- The type of the namespace.S
- The type of the state.namespaceSerializer
- The serializer used for the namespace type of the statestateDescriptor
- The identifier for the state. This contains name and can create a
default state value.Exception
- Exceptions may occur during initialization of the state and should be
forwarded.@Nonnull public <N,SV,SEV,S extends State,IS extends S> IS createOrUpdateInternalState(@Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<S,SV> stateDesc, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception
KeyedStateFactory
InternalKvState
.createOrUpdateInternalState
in interface KeyedStateFactory
N
- The type of the namespace.SV
- The type of the stored state value.SEV
- The type of the stored state value or entry for collection types (list or map).S
- The type of the public API state.IS
- The type of internal state.namespaceSerializer
- TypeSerializer for the state namespace.stateDesc
- The StateDescriptor
that contains the name of the state.snapshotTransformFactory
- factory of state snapshot transformer.Exception
public Optional<PeriodicMaterializationManager.MaterializationRunnable> initMaterialization() throws Exception
This method is not thread safe. It should be called either under a lock or through task mailbox executor.
initMaterialization
in interface PeriodicMaterializationManager.MaterializationTarget
SequenceNumber
identifying the latest change in the changelogException
public void handleMaterializationResult(SnapshotResult<KeyedStateHandle> materializedSnapshot, long materializationID, SequenceNumber upTo)
handleMaterializationResult
in interface PeriodicMaterializationManager.MaterializationTarget
public void handleMaterializationFailureOrCancellation(long materializationID, SequenceNumber upTo, Throwable cause)
handleMaterializationFailureOrCancellation
in interface PeriodicMaterializationManager.MaterializationTarget
public KeyedStateBackend<K> getDelegatedKeyedStateBackend(boolean recursive)
getDelegatedKeyedStateBackend
in interface TestableKeyedStateBackend<K>
recursive
- true if the call should be recursiveKeyedStateBackend
if this backends delegates its
responisibilities..public void notifyCheckpointSubsumed(long checkpointId) throws Exception
InternalCheckpointListener
These notifications are "best effort", meaning they can sometimes be skipped.
This method is very rarely necessary to implement. The "best effort" guarantee, together with the fact that this method should not result in discarding any data (per the "Checkpoint Subsuming Contract") means it is mainly useful for earlier cleanups of auxiliary resources.
notifyCheckpointSubsumed
in interface InternalCheckpointListener
checkpointId
- The ID of the checkpoint that has been subsumed.Exception
- This method can propagate exceptions, which leads to a failure/recovery for
the task or job.public ChangelogRestoreTarget<K> getChangelogRestoreTarget()
Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.