public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
AbstractKeyedStateBackend
that stores its state in RocksDB
and serializes
state to streams provided by a CheckpointStreamFactory
upon checkpointing. This state backend can store very large state that exceeds memory and spills
to disk. Except for the snapshotting, this class should be accessed as if it is not threadsafe.
This class follows the rules for closing/releasing native RocksDB resources as described in + this document.
Modifier and Type | Class and Description |
---|---|
static class |
RocksDBKeyedStateBackend.RocksDbKvStateInfo
Rocks DB specific information about the k/v states.
|
KeyedStateBackend.KeySelectionListener<K>
Modifier and Type | Field and Description |
---|---|
protected org.rocksdb.RocksDB |
db
Our RocksDB database, this is used by the actual subclasses of
AbstractRocksDBState
to store state. |
static String |
MERGE_OPERATOR_NAME
The name of the merge operator in RocksDB.
|
cancelStreamRegistry, keyContext, keyGroupCompressionDecorator, keyGroupRange, keySerializer, kvStateRegistry, numberOfKeyGroups, ttlTimeProvider, userCodeClassLoader
Constructor and Description |
---|
RocksDBKeyedStateBackend(ClassLoader userCodeClassLoader,
File instanceBasePath,
RocksDBResourceContainer optionsContainer,
java.util.function.Function<String,org.rocksdb.ColumnFamilyOptions> columnFamilyOptionsFactory,
TaskKvStateRegistry kvStateRegistry,
TypeSerializer<K> keySerializer,
ExecutionConfig executionConfig,
TtlTimeProvider ttlTimeProvider,
org.rocksdb.RocksDB db,
LinkedHashMap<String,RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
int keyGroupPrefixBytes,
CloseableRegistry cancelStreamRegistry,
StreamCompressionDecorator keyGroupCompressionDecorator,
ResourceGuard rocksDBResourceGuard,
RocksDBSnapshotStrategyBase<K> checkpointSnapshotStrategy,
RocksDBSnapshotStrategyBase<K> savepointSnapshotStrategy,
RocksDBWriteBatchWrapper writeBatchWrapper,
org.rocksdb.ColumnFamilyHandle defaultColumnFamilyHandle,
RocksDBNativeMetricMonitor nativeMetricMonitor,
org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder,
PriorityQueueSetFactory priorityQueueFactory,
RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
InternalKeyContext<K> keyContext,
long writeBatchSize) |
Modifier and Type | Method and Description |
---|---|
void |
compactState(StateDescriptor<?,?> stateDesc) |
<T extends HeapPriorityQueueElement & PriorityComparable & Keyed> |
create(String stateName,
TypeSerializer<T> byteOrderedElementSerializer)
Creates a
KeyGroupedInternalPriorityQueue . |
<N,SV,SEV,S extends State,IS extends S> |
createInternalState(TypeSerializer<N> namespaceSerializer,
StateDescriptor<S,SV> stateDesc,
StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory)
Creates and returns a new
InternalKvState . |
void |
dispose()
Should only be called by one thread, and only after all accesses to the DB happened.
|
int |
getKeyGroupPrefixBytes() |
<N> java.util.stream.Stream<K> |
getKeys(String state,
N namespace) |
org.rocksdb.ReadOptions |
getReadOptions() |
org.rocksdb.WriteOptions |
getWriteOptions() |
void |
notifyCheckpointAborted(long checkpointId)
This method is called as a notification once a distributed checkpoint has been aborted.
|
void |
notifyCheckpointComplete(long completedCheckpointId)
This method is called as a notification once a distributed checkpoint has been completed.
|
int |
numKeyValueStateEntries()
Returns the total number of state entries across all keys/namespaces.
|
boolean |
requiresLegacySynchronousTimerSnapshots() |
void |
setCurrentKey(K newKey)
Sets the current key that is used for partitioned state.
|
RunnableFuture<SnapshotResult<KeyedStateHandle>> |
snapshot(long checkpointId,
long timestamp,
CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions)
Triggers an asynchronous snapshot of the keyed state backend from RocksDB.
|
boolean |
supportsAsynchronousSnapshots() |
applyToAllKeys, close, deregisterKeySelectionListener, getCurrentKey, getCurrentKeyGroupIndex, getKeyGroupCompressionDecorator, getKeyGroupRange, getKeySerializer, getNumberOfKeyGroups, getOrCreateKeyedState, getPartitionedState, numKeyValueStatesByName, registerKeySelectionListener
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
createInternalState
public static final String MERGE_OPERATOR_NAME
protected final org.rocksdb.RocksDB db
AbstractRocksDBState
to store state. The different k/v states that we have don't each have their own RocksDB
instance. They all write to this instance but to their own column family.public RocksDBKeyedStateBackend(ClassLoader userCodeClassLoader, File instanceBasePath, RocksDBResourceContainer optionsContainer, java.util.function.Function<String,org.rocksdb.ColumnFamilyOptions> columnFamilyOptionsFactory, TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, org.rocksdb.RocksDB db, LinkedHashMap<String,RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, int keyGroupPrefixBytes, CloseableRegistry cancelStreamRegistry, StreamCompressionDecorator keyGroupCompressionDecorator, ResourceGuard rocksDBResourceGuard, RocksDBSnapshotStrategyBase<K> checkpointSnapshotStrategy, RocksDBSnapshotStrategyBase<K> savepointSnapshotStrategy, RocksDBWriteBatchWrapper writeBatchWrapper, org.rocksdb.ColumnFamilyHandle defaultColumnFamilyHandle, RocksDBNativeMetricMonitor nativeMetricMonitor, org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder, PriorityQueueSetFactory priorityQueueFactory, RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, InternalKeyContext<K> keyContext, @Nonnegative long writeBatchSize)
public <N> java.util.stream.Stream<K> getKeys(String state, N namespace)
state
- State variable for which existing keys will be returned.namespace
- Namespace for which existing keys will be returned.public void setCurrentKey(K newKey)
KeyedStateBackend
setCurrentKey
in interface KeyedStateBackend<K>
setCurrentKey
in class AbstractKeyedStateBackend<K>
newKey
- The new current key.KeyedStateBackend
public void dispose()
dispose
in interface KeyedStateBackend<K>
dispose
in interface Disposable
dispose
in class AbstractKeyedStateBackend<K>
@Nonnull public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer)
PriorityQueueSetFactory
KeyGroupedInternalPriorityQueue
.T
- type of the stored elements.stateName
- unique name for associated with this queue.byteOrderedElementSerializer
- a serializer that with a format that is lexicographically
ordered in alignment with elementPriorityComparator.public int getKeyGroupPrefixBytes()
public org.rocksdb.WriteOptions getWriteOptions()
public org.rocksdb.ReadOptions getReadOptions()
@Nonnull public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception
dispose()
. For
each backend, this method must always be called by the same thread.checkpointId
- The Id of the checkpoint.timestamp
- The timestamp of the checkpoint.streamFactory
- The factory that we can use for writing our state to streams.checkpointOptions
- Options for how to perform this checkpoint.Exception
- indicating a problem in the synchronous part of the checkpoint.public void notifyCheckpointComplete(long completedCheckpointId) throws Exception
CheckpointListener
Note that any exception during this method will not cause the checkpoint to fail any more.
completedCheckpointId
- The ID of the checkpoint that has been completed.Exception
- This method can propagate exceptions, which leads to a failure/recovery for
the task. Not that this will NOT lead to the checkpoint being revoked.public void notifyCheckpointAborted(long checkpointId) throws Exception
CheckpointListener
checkpointId
- The ID of the checkpoint that has been aborted.Exception
- This method can propagate exceptions, which leads to a failure/recovery for
the task.@Nonnull public <N,SV,SEV,S extends State,IS extends S> IS createInternalState(@Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<S,SV> stateDesc, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception
KeyedStateFactory
InternalKvState
.N
- The type of the namespace.SV
- The type of the stored state value.SEV
- The type of the stored state value or entry for collection types (list or map).S
- The type of the public API state.IS
- The type of internal state.namespaceSerializer
- TypeSerializer for the state namespace.stateDesc
- The StateDescriptor
that contains the name of the state.snapshotTransformFactory
- factory of state snapshot transformer.Exception
public boolean supportsAsynchronousSnapshots()
supportsAsynchronousSnapshots
in class AbstractKeyedStateBackend<K>
@VisibleForTesting public int numKeyValueStateEntries()
AbstractKeyedStateBackend
numKeyValueStateEntries
in class AbstractKeyedStateBackend<K>
public boolean requiresLegacySynchronousTimerSnapshots()
requiresLegacySynchronousTimerSnapshots
in class AbstractKeyedStateBackend<K>
@VisibleForTesting public void compactState(StateDescriptor<?,?> stateDesc) throws org.rocksdb.RocksDBException
org.rocksdb.RocksDBException
Copyright © 2014–2021 The Apache Software Foundation. All rights reserved.