Class RocksDBKeyedStateBackend<K>
- java.lang.Object
-
- org.apache.flink.runtime.state.AbstractKeyedStateBackend<K>
-
- org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend<K>
-
- All Implemented Interfaces:
Closeable
,AutoCloseable
,CheckpointListener
,InternalCheckpointListener
,CheckpointableKeyedStateBackend<K>
,InternalKeyContext<K>
,KeyedStateBackend<K>
,KeyedStateFactory
,PriorityQueueSetFactory
,Snapshotable<SnapshotResult<KeyedStateHandle>>
,TestableKeyedStateBackend<K>
,Disposable
public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
AnAbstractKeyedStateBackend
that stores its state inRocksDB
and serializes state to streams provided by aCheckpointStreamFactory
upon checkpointing. This state backend can store very large state that exceeds memory and spills to disk. Except for the snapshotting, this class should be accessed as if it is not threadsafe.This class follows the rules for closing/releasing native RocksDB resources as described in + this document.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
RocksDBKeyedStateBackend.RocksDbKvStateInfo
Rocks DB specific information about the k/v states.-
Nested classes/interfaces inherited from class org.apache.flink.runtime.state.AbstractKeyedStateBackend
AbstractKeyedStateBackend.PartitionStateFactory
-
Nested classes/interfaces inherited from interface org.apache.flink.runtime.state.KeyedStateBackend
KeyedStateBackend.KeySelectionListener<K>
-
-
Field Summary
Fields Modifier and Type Field Description protected org.rocksdb.RocksDB
db
Our RocksDB database, this is used by the actual subclasses ofAbstractRocksDBState
to store state.static String
MERGE_OPERATOR_NAME
The name of the merge operator in RocksDB.-
Fields inherited from class org.apache.flink.runtime.state.AbstractKeyedStateBackend
cancelStreamRegistry, keyContext, keyGroupCompressionDecorator, keyGroupRange, keySerializer, kvStateRegistry, latencyTrackingStateConfig, numberOfKeyGroups, ttlTimeProvider, userCodeClassLoader
-
-
Constructor Summary
Constructors Constructor Description RocksDBKeyedStateBackend(ClassLoader userCodeClassLoader, File instanceBasePath, RocksDBResourceContainer optionsContainer, Function<String,org.rocksdb.ColumnFamilyOptions> columnFamilyOptionsFactory, TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, LatencyTrackingStateConfig latencyTrackingStateConfig, org.rocksdb.RocksDB db, LinkedHashMap<String,RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, Map<String,HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates, int keyGroupPrefixBytes, CloseableRegistry cancelStreamRegistry, StreamCompressionDecorator keyGroupCompressionDecorator, ResourceGuard rocksDBResourceGuard, RocksDBSnapshotStrategyBase<K,?> checkpointSnapshotStrategy, RocksDBWriteBatchWrapper writeBatchWrapper, org.rocksdb.ColumnFamilyHandle defaultColumnFamilyHandle, RocksDBNativeMetricMonitor nativeMetricMonitor, SerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder, PriorityQueueSetFactory priorityQueueFactory, RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, InternalKeyContext<K> keyContext, long writeBatchSize, CompletableFuture<Void> asyncCompactFuture, RocksDBManualCompactionManager rocksDBManualCompactionManager)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
compactState(StateDescriptor<?,?> stateDesc)
<T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
KeyGroupedInternalPriorityQueue<T>create(String stateName, TypeSerializer<T> byteOrderedElementSerializer)
Creates aKeyGroupedInternalPriorityQueue
.<T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
KeyGroupedInternalPriorityQueue<T>create(String stateName, TypeSerializer<T> byteOrderedElementSerializer, boolean allowFutureMetadataUpdates)
Creates aKeyGroupedInternalPriorityQueue
.<N,SV,SEV,S extends State,IS extends S>
IScreateOrUpdateInternalState(TypeSerializer<N> namespaceSerializer, StateDescriptor<S,SV> stateDesc, StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory)
Creates or updates internal state and returns a newInternalKvState
.<N,SV,SEV,S extends State,IS extends S>
IScreateOrUpdateInternalState(TypeSerializer<N> namespaceSerializer, StateDescriptor<S,SV> stateDesc, StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory, boolean allowFutureMetadataUpdates)
Creates or updates internal state and returns a newInternalKvState
.void
dispose()
Should only be called by one thread, and only after all accesses to the DB happened.Optional<CompletableFuture<Void>>
getAsyncCompactAfterRestoreFuture()
int
getKeyGroupPrefixBytes()
<N> Stream<K>
getKeys(String state, N namespace)
<N> Stream<Tuple2<K,N>>
getKeysAndNamespaces(String state)
org.rocksdb.ReadOptions
getReadOptions()
org.rocksdb.WriteOptions
getWriteOptions()
boolean
isSafeToReuseKVState()
Whether it's safe to reuse key-values from the state-backend, e.g for the purpose of optimization.void
notifyCheckpointAborted(long checkpointId)
This method is called as a notification once a distributed checkpoint has been aborted.void
notifyCheckpointComplete(long completedCheckpointId)
Notifies the listener that the checkpoint with the givencheckpointId
completed and was committed.int
numKeyValueStateEntries()
Returns the total number of state entries across all keys/namespaces.boolean
requiresLegacySynchronousTimerSnapshots(SnapshotType checkpointType)
SavepointResources<K>
savepoint()
Returns aSavepointResources
that can be used bySavepointSnapshotStrategy
to write out a savepoint in the common/unified format.void
setCurrentKey(K newKey)
Sets the current key that is used for partitioned state.void
setCurrentKeyAndKeyGroup(K newKey, int newKeyGroupIndex)
Act as a fast path forKeyedStateBackend.setCurrentKey(K)
when the key group is known.RunnableFuture<SnapshotResult<KeyedStateHandle>>
snapshot(long checkpointId, long timestamp, CheckpointStreamFactory streamFactory, CheckpointOptions checkpointOptions)
Triggers an asynchronous snapshot of the keyed state backend from RocksDB.-
Methods inherited from class org.apache.flink.runtime.state.AbstractKeyedStateBackend
applyToAllKeys, applyToAllKeys, close, deregisterKeySelectionListener, getCurrentKey, getCurrentKeyGroupIndex, getKeyContext, getKeyGroupCompressionDecorator, getKeyGroupRange, getKeySerializer, getLatencyTrackingStateConfig, getNumberOfKeyGroups, getOrCreateKeyedState, getPartitionedState, notifyCheckpointSubsumed, numKeyValueStatesByName, publishQueryableStateIfEnabled, registerKeySelectionListener, setCurrentKeyGroupIndex
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.apache.flink.runtime.state.KeyedStateFactory
createOrUpdateInternalState
-
Methods inherited from interface org.apache.flink.runtime.state.TestableKeyedStateBackend
getDelegatedKeyedStateBackend
-
-
-
-
Field Detail
-
MERGE_OPERATOR_NAME
public static final String MERGE_OPERATOR_NAME
The name of the merge operator in RocksDB. Do not change except you know exactly what you do.- See Also:
- Constant Field Values
-
db
protected final org.rocksdb.RocksDB db
Our RocksDB database, this is used by the actual subclasses ofAbstractRocksDBState
to store state. The different k/v states that we have don't each have their own RocksDB instance. They all write to this instance but to their own column family.
-
-
Constructor Detail
-
RocksDBKeyedStateBackend
public RocksDBKeyedStateBackend(ClassLoader userCodeClassLoader, File instanceBasePath, RocksDBResourceContainer optionsContainer, Function<String,org.rocksdb.ColumnFamilyOptions> columnFamilyOptionsFactory, TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, LatencyTrackingStateConfig latencyTrackingStateConfig, org.rocksdb.RocksDB db, LinkedHashMap<String,RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, Map<String,HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates, int keyGroupPrefixBytes, CloseableRegistry cancelStreamRegistry, StreamCompressionDecorator keyGroupCompressionDecorator, ResourceGuard rocksDBResourceGuard, RocksDBSnapshotStrategyBase<K,?> checkpointSnapshotStrategy, RocksDBWriteBatchWrapper writeBatchWrapper, org.rocksdb.ColumnFamilyHandle defaultColumnFamilyHandle, RocksDBNativeMetricMonitor nativeMetricMonitor, SerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder, PriorityQueueSetFactory priorityQueueFactory, RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, InternalKeyContext<K> keyContext, @Nonnegative long writeBatchSize, @Nullable CompletableFuture<Void> asyncCompactFuture, RocksDBManualCompactionManager rocksDBManualCompactionManager)
-
-
Method Detail
-
getKeys
public <N> Stream<K> getKeys(String state, N namespace)
- Parameters:
state
- State variable for which existing keys will be returned.namespace
- Namespace for which existing keys will be returned.- Returns:
- A stream of all keys for the given state and namespace. Modifications to the state during iterating over it keys are not supported.
-
getKeysAndNamespaces
public <N> Stream<Tuple2<K,N>> getKeysAndNamespaces(String state)
- Parameters:
state
- State variable for which existing keys will be returned.- Returns:
- A stream of all keys for the given state and namespace. Modifications to the state during iterating over it keys are not supported. Implementations go not make any ordering guarantees about the returned tupes. Two records with the same key or namespace may not be returned near each other in the stream.
-
setCurrentKey
public void setCurrentKey(K newKey)
Description copied from interface:KeyedStateBackend
Sets the current key that is used for partitioned state.- Specified by:
setCurrentKey
in interfaceInternalKeyContext<K>
- Specified by:
setCurrentKey
in interfaceKeyedStateBackend<K>
- Overrides:
setCurrentKey
in classAbstractKeyedStateBackend<K>
- Parameters:
newKey
- The new current key.- See Also:
KeyedStateBackend
-
setCurrentKeyAndKeyGroup
public void setCurrentKeyAndKeyGroup(K newKey, int newKeyGroupIndex)
Description copied from interface:KeyedStateBackend
Act as a fast path forKeyedStateBackend.setCurrentKey(K)
when the key group is known.- Specified by:
setCurrentKeyAndKeyGroup
in interfaceKeyedStateBackend<K>
- Overrides:
setCurrentKeyAndKeyGroup
in classAbstractKeyedStateBackend<K>
-
dispose
public void dispose()
Should only be called by one thread, and only after all accesses to the DB happened.- Specified by:
dispose
in interfaceDisposable
- Specified by:
dispose
in interfaceKeyedStateBackend<K>
- Overrides:
dispose
in classAbstractKeyedStateBackend<K>
-
create
@Nonnull public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer)
Description copied from interface:PriorityQueueSetFactory
Creates aKeyGroupedInternalPriorityQueue
.- Type Parameters:
T
- type of the stored elements.- Parameters:
stateName
- unique name for associated with this queue.byteOrderedElementSerializer
- a serializer that with a format that is lexicographically ordered in alignment with elementPriorityComparator.- Returns:
- the queue with the specified unique name.
-
create
public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer, boolean allowFutureMetadataUpdates)
Description copied from interface:PriorityQueueSetFactory
Creates aKeyGroupedInternalPriorityQueue
.- Type Parameters:
T
- type of the stored elements.- Parameters:
stateName
- unique name for associated with this queue.byteOrderedElementSerializer
- a serializer that with a format that is lexicographically ordered in alignment with elementPriorityComparator.allowFutureMetadataUpdates
- whether allow metadata to update in the future or not.- Returns:
- the queue with the specified unique name.
-
getKeyGroupPrefixBytes
public int getKeyGroupPrefixBytes()
-
getWriteOptions
public org.rocksdb.WriteOptions getWriteOptions()
-
getReadOptions
public org.rocksdb.ReadOptions getReadOptions()
-
snapshot
@Nonnull public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception
Triggers an asynchronous snapshot of the keyed state backend from RocksDB. This snapshot can be canceled and is also stopped when the backend is closed throughdispose()
. For each backend, this method must always be called by the same thread.- Parameters:
checkpointId
- The Id of the checkpoint.timestamp
- The timestamp of the checkpoint.streamFactory
- The factory that we can use for writing our state to streams.checkpointOptions
- Options for how to perform this checkpoint.- Returns:
- Future to the state handle of the snapshot data.
- Throws:
Exception
- indicating a problem in the synchronous part of the checkpoint.
-
savepoint
@Nonnull public SavepointResources<K> savepoint() throws Exception
Description copied from interface:CheckpointableKeyedStateBackend
Returns aSavepointResources
that can be used bySavepointSnapshotStrategy
to write out a savepoint in the common/unified format.- Throws:
Exception
-
notifyCheckpointComplete
public void notifyCheckpointComplete(long completedCheckpointId) throws Exception
Description copied from interface:CheckpointListener
Notifies the listener that the checkpoint with the givencheckpointId
completed and was committed.These notifications are "best effort", meaning they can sometimes be skipped. To behave properly, implementers need to follow the "Checkpoint Subsuming Contract". Please see the
class-level JavaDocs
for details.Please note that checkpoints may generally overlap, so you cannot assume that the
notifyCheckpointComplete()
call is always for the latest prior checkpoint (or snapshot) that was taken on the function/operator implementing this interface. It might be for a checkpoint that was triggered earlier. Implementing the "Checkpoint Subsuming Contract" (see above) properly handles this situation correctly as well.Please note that throwing exceptions from this method will not cause the completed checkpoint to be revoked. Throwing exceptions will typically cause task/job failure and trigger recovery.
- Parameters:
completedCheckpointId
- The ID of the checkpoint that has been completed.- Throws:
Exception
- This method can propagate exceptions, which leads to a failure/recovery for the task. Note that this will NOT lead to the checkpoint being revoked.
-
notifyCheckpointAborted
public void notifyCheckpointAborted(long checkpointId) throws Exception
Description copied from interface:CheckpointListener
This method is called as a notification once a distributed checkpoint has been aborted.Important: The fact that a checkpoint has been aborted does NOT mean that the data and artifacts produced between the previous checkpoint and the aborted checkpoint are to be discarded. The expected behavior is as if this checkpoint was never triggered in the first place, and the next successful checkpoint simply covers a longer time span. See the "Checkpoint Subsuming Contract" in the
class-level JavaDocs
for details.These notifications are "best effort", meaning they can sometimes be skipped.
This method is very rarely necessary to implement. The "best effort" guarantee, together with the fact that this method should not result in discarding any data (per the "Checkpoint Subsuming Contract") means it is mainly useful for earlier cleanups of auxiliary resources. One example is to pro-actively clear a local per-checkpoint state cache upon checkpoint failure.
- Parameters:
checkpointId
- The ID of the checkpoint that has been aborted.- Throws:
Exception
- This method can propagate exceptions, which leads to a failure/recovery for the task or job.
-
createOrUpdateInternalState
@Nonnull public <N,SV,SEV,S extends State,IS extends S> IS createOrUpdateInternalState(@Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<S,SV> stateDesc, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception
Description copied from interface:KeyedStateFactory
Creates or updates internal state and returns a newInternalKvState
.- Type Parameters:
N
- The type of the namespace.SV
- The type of the stored state value.SEV
- The type of the stored state value or entry for collection types (list or map).S
- The type of the public API state.IS
- The type of internal state.- Parameters:
namespaceSerializer
- TypeSerializer for the state namespace.stateDesc
- TheStateDescriptor
that contains the name of the state.snapshotTransformFactory
- factory of state snapshot transformer.- Throws:
Exception
-
createOrUpdateInternalState
@Nonnull public <N,SV,SEV,S extends State,IS extends S> IS createOrUpdateInternalState(@Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<S,SV> stateDesc, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory, boolean allowFutureMetadataUpdates) throws Exception
Description copied from interface:KeyedStateFactory
Creates or updates internal state and returns a newInternalKvState
.- Type Parameters:
N
- The type of the namespace.SV
- The type of the stored state value.SEV
- The type of the stored state value or entry for collection types (list or map).S
- The type of the public API state.IS
- The type of internal state.- Parameters:
namespaceSerializer
- TypeSerializer for the state namespace.stateDesc
- TheStateDescriptor
that contains the name of the state.snapshotTransformFactory
- factory of state snapshot transformer.allowFutureMetadataUpdates
- whether allow metadata to update in the future or not.- Throws:
Exception
-
numKeyValueStateEntries
@VisibleForTesting public int numKeyValueStateEntries()
Description copied from interface:TestableKeyedStateBackend
Returns the total number of state entries across all keys/namespaces.
-
requiresLegacySynchronousTimerSnapshots
public boolean requiresLegacySynchronousTimerSnapshots(SnapshotType checkpointType)
- Overrides:
requiresLegacySynchronousTimerSnapshots
in classAbstractKeyedStateBackend<K>
-
isSafeToReuseKVState
public boolean isSafeToReuseKVState()
Description copied from interface:KeyedStateBackend
Whether it's safe to reuse key-values from the state-backend, e.g for the purpose of optimization.NOTE: this method should not be used to check for
InternalPriorityQueue
, as the priority queue could be stored on different locations, e.g RocksDB state-backend could store that on JVM heap if configuring HEAP as the time-service factory.- Returns:
- returns ture if safe to reuse the key-values from the state-backend.
-
compactState
@VisibleForTesting public void compactState(StateDescriptor<?,?> stateDesc) throws org.rocksdb.RocksDBException
- Throws:
org.rocksdb.RocksDBException
-
getAsyncCompactAfterRestoreFuture
@VisibleForTesting public Optional<CompletableFuture<Void>> getAsyncCompactAfterRestoreFuture()
-
-