Class ForStKeyedStateBackend<K>
- java.lang.Object
-
- org.apache.flink.state.forst.ForStKeyedStateBackend<K>
-
- All Implemented Interfaces:
Closeable
,AutoCloseable
,CheckpointListener
,InternalCheckpointListener
,AsyncExecutionController.SwitchContextListener<K>
,AsyncKeyedStateBackend<K>
,PriorityQueueSetFactory
,Snapshotable<SnapshotResult<KeyedStateHandle>>
,Disposable
public class ForStKeyedStateBackend<K> extends Object implements AsyncKeyedStateBackend<K>
A KeyedStateBackend that stores its state inForSt
. This state backend can store very large state that exceeds memory even disk to remote storage.
-
-
Field Summary
Fields Modifier and Type Field Description protected org.forstdb.RocksDB
db
Our ForSt database.protected TypeSerializer<K>
keySerializer
The key serializer.protected TtlTimeProvider
ttlTimeProvider
-
Constructor Summary
Constructors Constructor Description ForStKeyedStateBackend(UUID backendUID, ForStResourceContainer optionsContainer, int keyGroupPrefixBytes, TypeSerializer<K> keySerializer, Supplier<SerializedCompositeKeyBuilder<K>> serializedKeyBuilder, Supplier<DataOutputSerializer> valueSerializerView, Supplier<DataInputDeserializer> valueDeserializerView, org.forstdb.RocksDB db, LinkedHashMap<String,ForStOperationUtils.ForStKvStateInfo> kvStateInformation, Map<String,HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates, Function<String,org.forstdb.ColumnFamilyOptions> columnFamilyOptionsFactory, org.forstdb.ColumnFamilyHandle defaultColumnFamilyHandle, ForStSnapshotStrategyBase<K,?> snapshotStrategy, PriorityQueueSetFactory priorityQueueFactory, CloseableRegistry cancelStreamRegistry, ForStNativeMetricMonitor nativeMetricMonitor, InternalKeyContext<K> keyContext, TtlTimeProvider ttlTimeProvider, ForStDBTtlCompactFiltersManager ttlCompactFiltersManager)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
close()
<T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
KeyGroupedInternalPriorityQueue<T>create(String stateName, TypeSerializer<T> byteOrderedElementSerializer)
Creates aKeyGroupedInternalPriorityQueue
.<T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
KeyGroupedInternalPriorityQueue<T>create(String stateName, TypeSerializer<T> byteOrderedElementSerializer, boolean allowFutureMetadataUpdates)
Creates aKeyGroupedInternalPriorityQueue
.<N,S extends State,SV>
ScreateState(N defaultNamespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<SV> stateDesc)
Creates and returns a new state.StateExecutor
createStateExecutor()
Creates aStateExecutor
which supports to execute a batch of state requests asynchronously.<N,S extends InternalKeyedState,SV>
ScreateStateInternal(N defaultNamespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<SV> stateDesc)
Creates and returns a new state for internal usage.void
dispose()
Should only be called by one thread, and only after all accesses to the DB happened.KeyGroupRange
getKeyGroupRange()
Returns the key groups which this state backend is responsible for.void
notifyCheckpointAborted(long checkpointId)
This method is called as a notification once a distributed checkpoint has been aborted.void
notifyCheckpointComplete(long checkpointId)
Notifies the listener that the checkpoint with the givencheckpointId
completed and was committed.void
notifyCheckpointSubsumed(long checkpointId)
This method is called as a notification once a distributed checkpoint has been subsumed.void
setup(StateRequestHandler stateRequestHandler)
Initializes with some contexts.RunnableFuture<SnapshotResult<KeyedStateHandle>>
snapshot(long checkpointId, long timestamp, CheckpointStreamFactory streamFactory, CheckpointOptions checkpointOptions)
Operation that writes a snapshot into a stream that is provided by the givenCheckpointStreamFactory
and returns a @RunnableFuture
that gives a state handle to the snapshot.-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.apache.flink.runtime.state.AsyncKeyedStateBackend
requiresLegacySynchronousTimerSnapshots, switchContext
-
-
-
-
Field Detail
-
ttlTimeProvider
protected final TtlTimeProvider ttlTimeProvider
-
keySerializer
protected final TypeSerializer<K> keySerializer
The key serializer.
-
db
protected final org.forstdb.RocksDB db
Our ForSt database. The different k/v states that we have don't each have their own ForSt instance. They all write to this instance but to their own column family.
-
-
Constructor Detail
-
ForStKeyedStateBackend
public ForStKeyedStateBackend(UUID backendUID, ForStResourceContainer optionsContainer, int keyGroupPrefixBytes, TypeSerializer<K> keySerializer, Supplier<SerializedCompositeKeyBuilder<K>> serializedKeyBuilder, Supplier<DataOutputSerializer> valueSerializerView, Supplier<DataInputDeserializer> valueDeserializerView, org.forstdb.RocksDB db, LinkedHashMap<String,ForStOperationUtils.ForStKvStateInfo> kvStateInformation, Map<String,HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates, Function<String,org.forstdb.ColumnFamilyOptions> columnFamilyOptionsFactory, org.forstdb.ColumnFamilyHandle defaultColumnFamilyHandle, ForStSnapshotStrategyBase<K,?> snapshotStrategy, PriorityQueueSetFactory priorityQueueFactory, CloseableRegistry cancelStreamRegistry, ForStNativeMetricMonitor nativeMetricMonitor, InternalKeyContext<K> keyContext, TtlTimeProvider ttlTimeProvider, ForStDBTtlCompactFiltersManager ttlCompactFiltersManager)
-
-
Method Detail
-
setup
public void setup(@Nonnull StateRequestHandler stateRequestHandler)
Description copied from interface:AsyncKeyedStateBackend
Initializes with some contexts.- Specified by:
setup
in interfaceAsyncKeyedStateBackend<K>
- Parameters:
stateRequestHandler
- which handles state request.
-
createState
@Nonnull public <N,S extends State,SV> S createState(@Nonnull N defaultNamespace, @Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<SV> stateDesc) throws Exception
Description copied from interface:AsyncKeyedStateBackend
Creates and returns a new state.- Specified by:
createState
in interfaceAsyncKeyedStateBackend<K>
- Type Parameters:
N
- the type of namespace for partitioning.S
- The type of the public API state.SV
- The type of the stored state value.- Parameters:
defaultNamespace
- the default namespace for this state.namespaceSerializer
- the serializer for namespace.stateDesc
- TheStateDescriptor
that contains the name of the state.- Throws:
Exception
- Exceptions may occur during initialization of the state.
-
createStateInternal
@Nonnull public <N,S extends InternalKeyedState,SV> S createStateInternal(@Nonnull N defaultNamespace, @Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<SV> stateDesc) throws Exception
Description copied from interface:AsyncKeyedStateBackend
Creates and returns a new state for internal usage.- Specified by:
createStateInternal
in interfaceAsyncKeyedStateBackend<K>
- Type Parameters:
N
- the type of namespace for partitioning.S
- The type of the public API state.SV
- The type of the stored state value.- Parameters:
defaultNamespace
- the default namespace for this state.namespaceSerializer
- the serializer for namespace.stateDesc
- TheStateDescriptor
that contains the name of the state.- Throws:
Exception
- Exceptions may occur during initialization of the state.
-
createStateExecutor
@Nonnull public StateExecutor createStateExecutor()
Description copied from interface:AsyncKeyedStateBackend
Creates aStateExecutor
which supports to execute a batch of state requests asynchronously.Notice that the
AsyncKeyedStateBackend
is responsible for shutting down the StateExecutors created by itself when they are no longer in use.- Specified by:
createStateExecutor
in interfaceAsyncKeyedStateBackend<K>
- Returns:
- a
StateExecutor
which supports to execute a batch of state requests asynchronously.
-
getKeyGroupRange
public KeyGroupRange getKeyGroupRange()
Description copied from interface:AsyncKeyedStateBackend
Returns the key groups which this state backend is responsible for.- Specified by:
getKeyGroupRange
in interfaceAsyncKeyedStateBackend<K>
-
snapshot
@Nonnull public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception
Description copied from interface:Snapshotable
Operation that writes a snapshot into a stream that is provided by the givenCheckpointStreamFactory
and returns a @RunnableFuture
that gives a state handle to the snapshot. It is up to the implementation if the operation is performed synchronous or asynchronous. In the later case, the returned Runnable must be executed first before obtaining the handle.- Specified by:
snapshot
in interfaceSnapshotable<K>
- Parameters:
checkpointId
- The ID of the checkpoint.timestamp
- The timestamp of the checkpoint.streamFactory
- The factory that we can use for writing our state to streams.checkpointOptions
- Options for how to perform this checkpoint.- Returns:
- A runnable future that will yield a
StateObject
. - Throws:
Exception
-
notifyCheckpointComplete
public void notifyCheckpointComplete(long checkpointId) throws Exception
Description copied from interface:CheckpointListener
Notifies the listener that the checkpoint with the givencheckpointId
completed and was committed.These notifications are "best effort", meaning they can sometimes be skipped. To behave properly, implementers need to follow the "Checkpoint Subsuming Contract". Please see the
class-level JavaDocs
for details.Please note that checkpoints may generally overlap, so you cannot assume that the
notifyCheckpointComplete()
call is always for the latest prior checkpoint (or snapshot) that was taken on the function/operator implementing this interface. It might be for a checkpoint that was triggered earlier. Implementing the "Checkpoint Subsuming Contract" (see above) properly handles this situation correctly as well.Please note that throwing exceptions from this method will not cause the completed checkpoint to be revoked. Throwing exceptions will typically cause task/job failure and trigger recovery.
- Specified by:
notifyCheckpointComplete
in interfaceCheckpointListener
- Parameters:
checkpointId
- The ID of the checkpoint that has been completed.- Throws:
Exception
- This method can propagate exceptions, which leads to a failure/recovery for the task. Note that this will NOT lead to the checkpoint being revoked.
-
notifyCheckpointAborted
public void notifyCheckpointAborted(long checkpointId) throws Exception
Description copied from interface:CheckpointListener
This method is called as a notification once a distributed checkpoint has been aborted.Important: The fact that a checkpoint has been aborted does NOT mean that the data and artifacts produced between the previous checkpoint and the aborted checkpoint are to be discarded. The expected behavior is as if this checkpoint was never triggered in the first place, and the next successful checkpoint simply covers a longer time span. See the "Checkpoint Subsuming Contract" in the
class-level JavaDocs
for details.These notifications are "best effort", meaning they can sometimes be skipped.
This method is very rarely necessary to implement. The "best effort" guarantee, together with the fact that this method should not result in discarding any data (per the "Checkpoint Subsuming Contract") means it is mainly useful for earlier cleanups of auxiliary resources. One example is to pro-actively clear a local per-checkpoint state cache upon checkpoint failure.
- Specified by:
notifyCheckpointAborted
in interfaceCheckpointListener
- Parameters:
checkpointId
- The ID of the checkpoint that has been aborted.- Throws:
Exception
- This method can propagate exceptions, which leads to a failure/recovery for the task or job.
-
notifyCheckpointSubsumed
public void notifyCheckpointSubsumed(long checkpointId) throws Exception
Description copied from interface:InternalCheckpointListener
This method is called as a notification once a distributed checkpoint has been subsumed.These notifications are "best effort", meaning they can sometimes be skipped.
This method is very rarely necessary to implement. The "best effort" guarantee, together with the fact that this method should not result in discarding any data (per the "Checkpoint Subsuming Contract") means it is mainly useful for earlier cleanups of auxiliary resources.
- Specified by:
notifyCheckpointSubsumed
in interfaceInternalCheckpointListener
- Parameters:
checkpointId
- The ID of the checkpoint that has been subsumed.- Throws:
Exception
- This method can propagate exceptions, which leads to a failure/recovery for the task or job.
-
dispose
public void dispose()
Should only be called by one thread, and only after all accesses to the DB happened.- Specified by:
dispose
in interfaceAsyncKeyedStateBackend<K>
- Specified by:
dispose
in interfaceDisposable
-
close
public void close() throws IOException
- Specified by:
close
in interfaceAutoCloseable
- Specified by:
close
in interfaceCloseable
- Throws:
IOException
-
create
@Nonnull public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer)
Description copied from interface:PriorityQueueSetFactory
Creates aKeyGroupedInternalPriorityQueue
.- Specified by:
create
in interfacePriorityQueueSetFactory
- Type Parameters:
T
- type of the stored elements.- Parameters:
stateName
- unique name for associated with this queue.byteOrderedElementSerializer
- a serializer that with a format that is lexicographically ordered in alignment with elementPriorityComparator.- Returns:
- the queue with the specified unique name.
-
create
public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer, boolean allowFutureMetadataUpdates)
Description copied from interface:PriorityQueueSetFactory
Creates aKeyGroupedInternalPriorityQueue
.- Specified by:
create
in interfacePriorityQueueSetFactory
- Type Parameters:
T
- type of the stored elements.- Parameters:
stateName
- unique name for associated with this queue.byteOrderedElementSerializer
- a serializer that with a format that is lexicographically ordered in alignment with elementPriorityComparator.allowFutureMetadataUpdates
- whether allow metadata to update in the future or not.- Returns:
- the queue with the specified unique name.
-
-