K
- The key by which state is keyed.@Internal public class ChangelogKeyedStateBackend<K> extends Object implements CheckpointableKeyedStateBackend<K>, CheckpointListener, TestableKeyedStateBackend<K>
KeyedStateBackend
that keeps state on the underlying delegated keyed state backend as
well as on the state change log.KeyedStateBackend.KeySelectionListener<K>
Constructor and Description |
---|
ChangelogKeyedStateBackend(AbstractKeyedStateBackend<K> keyedStateBackend,
ExecutionConfig executionConfig,
TtlTimeProvider ttlTimeProvider,
StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter,
Collection<ChangelogStateBackendHandle> initialState,
MailboxExecutor mainMailboxExecutor,
ExecutorService asyncOperationsThreadPool) |
Modifier and Type | Method and Description |
---|---|
<N,S extends State,T> |
applyToAllKeys(N namespace,
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S,T> stateDescriptor,
KeyedStateFunction<K,S> function)
Applies the provided
KeyedStateFunction to the state with the provided StateDescriptor of all the currently active keys. |
void |
close() |
<T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>> |
create(String stateName,
TypeSerializer<T> byteOrderedElementSerializer)
Creates a
KeyGroupedInternalPriorityQueue . |
<N,SV,SEV,S extends State,IS extends S> |
createInternalState(TypeSerializer<N> namespaceSerializer,
StateDescriptor<S,SV> stateDesc,
StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory)
Creates and returns a new
InternalKvState . |
boolean |
deregisterKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> listener)
Stop calling listener registered in
KeyedStateBackend.registerKeySelectionListener(org.apache.flink.runtime.state.KeyedStateBackend.KeySelectionListener<K>) . |
void |
dispose()
Disposes the object and releases all resources.
|
K |
getCurrentKey() |
KeyedStateBackend<K> |
getDelegatedKeyedStateBackend(boolean recursive) |
ChangelogState |
getExistingStateForRecovery(String name,
StateMetaInfoSnapshot.BackendStateType type) |
KeyGroupRange |
getKeyGroupRange()
Returns the key groups which this state backend is responsible for.
|
<N> java.util.stream.Stream<K> |
getKeys(String state,
N namespace) |
<N> java.util.stream.Stream<Tuple2<K,N>> |
getKeysAndNamespaces(String state) |
TypeSerializer<K> |
getKeySerializer() |
<N,S extends State,T> |
getOrCreateKeyedState(TypeSerializer<N> namespaceSerializer,
StateDescriptor<S,T> stateDescriptor)
Creates or retrieves a keyed state backed by this state backend.
|
<N,S extends State> |
getPartitionedState(N namespace,
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S,?> stateDescriptor)
Creates or retrieves a partitioned state backed by this state backend.
|
boolean |
isStateImmutableInStateBackend(CheckpointType checkpointOptions) |
void |
notifyCheckpointAborted(long checkpointId)
This method is called as a notification once a distributed checkpoint has been aborted.
|
void |
notifyCheckpointComplete(long checkpointId)
Notifies the listener that the checkpoint with the given
checkpointId completed and
was committed. |
int |
numKeyValueStateEntries()
Returns the total number of state entries across all keys/namespaces.
|
void |
registerKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> listener)
State backend will call
KeyedStateBackend.KeySelectionListener.keySelected(K) when key context is switched
if supported. |
SavepointResources<K> |
savepoint()
Returns a
SavepointResources that can be used by SavepointSnapshotStrategy to
write out a savepoint in the common/unified format. |
void |
setCurrentKey(K newKey)
Sets the current key that is used for partitioned state.
|
RunnableFuture<SnapshotResult<KeyedStateHandle>> |
snapshot(long checkpointId,
long timestamp,
CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions)
Operation that writes a snapshot into a stream that is provided by the given
CheckpointStreamFactory and returns a @RunnableFuture that gives a state handle to
the snapshot. |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
createInternalState
public ChangelogKeyedStateBackend(AbstractKeyedStateBackend<K> keyedStateBackend, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter, Collection<ChangelogStateBackendHandle> initialState, MailboxExecutor mainMailboxExecutor, ExecutorService asyncOperationsThreadPool)
public KeyGroupRange getKeyGroupRange()
CheckpointableKeyedStateBackend
getKeyGroupRange
in interface CheckpointableKeyedStateBackend<K>
public void close() throws IOException
close
in interface Closeable
close
in interface AutoCloseable
IOException
public void setCurrentKey(K newKey)
KeyedStateBackend
setCurrentKey
in interface KeyedStateBackend<K>
newKey
- The new current key.public K getCurrentKey()
getCurrentKey
in interface KeyedStateBackend<K>
public TypeSerializer<K> getKeySerializer()
getKeySerializer
in interface KeyedStateBackend<K>
public <N> java.util.stream.Stream<K> getKeys(String state, N namespace)
getKeys
in interface KeyedStateBackend<K>
state
- State variable for which existing keys will be returned.namespace
- Namespace for which existing keys will be returned.public <N> java.util.stream.Stream<Tuple2<K,N>> getKeysAndNamespaces(String state)
getKeysAndNamespaces
in interface KeyedStateBackend<K>
state
- State variable for which existing keys will be returned.public void dispose()
Disposable
dispose
in interface KeyedStateBackend<K>
dispose
in interface Disposable
public void registerKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> listener)
KeyedStateBackend
KeyedStateBackend.KeySelectionListener.keySelected(K)
when key context is switched
if supported.registerKeySelectionListener
in interface KeyedStateBackend<K>
public boolean deregisterKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> listener)
KeyedStateBackend
KeyedStateBackend.registerKeySelectionListener(org.apache.flink.runtime.state.KeyedStateBackend.KeySelectionListener<K>)
.deregisterKeySelectionListener
in interface KeyedStateBackend<K>
public <N,S extends State,T> void applyToAllKeys(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S,T> stateDescriptor, KeyedStateFunction<K,S> function) throws Exception
KeyedStateBackend
KeyedStateFunction
to the state with the provided StateDescriptor
of all the currently active keys.applyToAllKeys
in interface KeyedStateBackend<K>
N
- The type of the namespace.S
- The type of the state.namespace
- the namespace of the state.namespaceSerializer
- the serializer for the namespace.stateDescriptor
- the descriptor of the state to which the function is going to be
applied.function
- the function to be applied to the keyed state.Exception
public <N,S extends State> S getPartitionedState(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S,?> stateDescriptor) throws Exception
KeyedStateBackend
TODO: NOTE: This method does a lot of work caching / retrieving states just to update the namespace. This method should be removed for the sake of namespaces being lazily fetched from the keyed state backend, or being set on the state directly.
getPartitionedState
in interface KeyedStateBackend<K>
N
- The type of the namespace.S
- The type of the state.stateDescriptor
- The identifier for the state. This contains name and can create a
default state value.Exception
- Exceptions may occur during initialization of the state and should be
forwarded.@Nonnull public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception
Snapshotable
CheckpointStreamFactory
and returns a @RunnableFuture
that gives a state handle to
the snapshot. It is up to the implementation if the operation is performed synchronous or
asynchronous. In the later case, the returned Runnable must be executed first before
obtaining the handle.snapshot
in interface Snapshotable<SnapshotResult<KeyedStateHandle>>
checkpointId
- The ID of the checkpoint.timestamp
- The timestamp of the checkpoint.streamFactory
- The factory that we can use for writing our state to streams.checkpointOptions
- Options for how to perform this checkpoint.StateObject
.Exception
@Nonnull public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer)
PriorityQueueSetFactory
KeyGroupedInternalPriorityQueue
.create
in interface PriorityQueueSetFactory
T
- type of the stored elements.stateName
- unique name for associated with this queue.byteOrderedElementSerializer
- a serializer that with a format that is lexicographically
ordered in alignment with elementPriorityComparator.@VisibleForTesting public int numKeyValueStateEntries()
TestableKeyedStateBackend
numKeyValueStateEntries
in interface TestableKeyedStateBackend<K>
public boolean isStateImmutableInStateBackend(CheckpointType checkpointOptions)
isStateImmutableInStateBackend
in interface KeyedStateBackend<K>
@Nonnull public SavepointResources<K> savepoint() throws Exception
CheckpointableKeyedStateBackend
SavepointResources
that can be used by SavepointSnapshotStrategy
to
write out a savepoint in the common/unified format.savepoint
in interface CheckpointableKeyedStateBackend<K>
Exception
public void notifyCheckpointComplete(long checkpointId) throws Exception
CheckpointListener
checkpointId
completed and
was committed.
These notifications are "best effort", meaning they can sometimes be skipped. To behave
properly, implementers need to follow the "Checkpoint Subsuming Contract". Please see the
class-level JavaDocs
for details.
Please note that checkpoints may generally overlap, so you cannot assume that the notifyCheckpointComplete()
call is always for the latest prior checkpoint (or snapshot) that
was taken on the function/operator implementing this interface. It might be for a checkpoint
that was triggered earlier. Implementing the "Checkpoint Subsuming Contract" (see above)
properly handles this situation correctly as well.
Please note that throwing exceptions from this method will not cause the completed checkpoint to be revoked. Throwing exceptions will typically cause task/job failure and trigger recovery.
notifyCheckpointComplete
in interface CheckpointListener
checkpointId
- The ID of the checkpoint that has been completed.Exception
- This method can propagate exceptions, which leads to a failure/recovery for
the task. Not that this will NOT lead to the checkpoint being revoked.public void notifyCheckpointAborted(long checkpointId) throws Exception
CheckpointListener
Important: The fact that a checkpoint has been aborted does NOT mean that the data
and artifacts produced between the previous checkpoint and the aborted checkpoint are to be
discarded. The expected behavior is as if this checkpoint was never triggered in the first
place, and the next successful checkpoint simply covers a longer time span. See the
"Checkpoint Subsuming Contract" in the class-level JavaDocs
for
details.
These notifications are "best effort", meaning they can sometimes be skipped.
This method is very rarely necessary to implement. The "best effort" guarantee, together with the fact that this method should not result in discarding any data (per the "Checkpoint Subsuming Contract") means it is mainly useful for earlier cleanups of auxiliary resources. One example is to pro-actively clear a local per-checkpoint state cache upon checkpoint failure.
notifyCheckpointAborted
in interface CheckpointListener
checkpointId
- The ID of the checkpoint that has been aborted.Exception
- This method can propagate exceptions, which leads to a failure/recovery for
the task or job.public <N,S extends State,T> S getOrCreateKeyedState(TypeSerializer<N> namespaceSerializer, StateDescriptor<S,T> stateDescriptor) throws Exception
KeyedStateBackend
getOrCreateKeyedState
in interface KeyedStateBackend<K>
N
- The type of the namespace.S
- The type of the state.namespaceSerializer
- The serializer used for the namespace type of the statestateDescriptor
- The identifier for the state. This contains name and can create a
default state value.Exception
- Exceptions may occur during initialization of the state and should be
forwarded.@Nonnull public <N,SV,SEV,S extends State,IS extends S> IS createInternalState(@Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<S,SV> stateDesc, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception
KeyedStateFactory
InternalKvState
.createInternalState
in interface KeyedStateFactory
N
- The type of the namespace.SV
- The type of the stored state value.SEV
- The type of the stored state value or entry for collection types (list or map).S
- The type of the public API state.IS
- The type of internal state.namespaceSerializer
- TypeSerializer for the state namespace.stateDesc
- The StateDescriptor
that contains the name of the state.snapshotTransformFactory
- factory of state snapshot transformer.Exception
public KeyedStateBackend<K> getDelegatedKeyedStateBackend(boolean recursive)
getDelegatedKeyedStateBackend
in interface TestableKeyedStateBackend<K>
recursive
- true if the call should be recursiveKeyedStateBackend
if this backends delegates its
responisibilities..public ChangelogState getExistingStateForRecovery(String name, StateMetaInfoSnapshot.BackendStateType type) throws NoSuchElementException, UnsupportedOperationException
name
- state nametype
- state type (the only supported type currently are: key value
, priority
queue
)NoSuchElementException
- if the state wasn't createdUnsupportedOperationException
- if state type is not supportedCopyright © 2014–2023 The Apache Software Foundation. All rights reserved.