@PublicEvolving public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBackend implements ConfigurableStateBackend
StateBackend
that stores its state in an embedded RocksDB
instance. This state backend can store very large state that exceeds memory and spills
to local disk. All key/value state (including windows) is stored in the key/value index of
RocksDB. For persistence against loss of machines, please configure a CheckpointStorage
instance for the Job.
The behavior of the RocksDB instances can be parametrized by setting RocksDB Options using the
methods setPredefinedOptions(PredefinedOptions)
and setRocksDBOptions(RocksDBOptionsFactory)
.
Modifier and Type | Class and Description |
---|---|
static class |
EmbeddedRocksDBStateBackend.PriorityQueueStateType
The options to chose for the type of priority queue state.
|
StateBackend.CustomInitializationMetrics, StateBackend.KeyedStateBackendParameters<K>, StateBackend.OperatorStateBackendParameters
latencyTrackingConfigBuilder
Constructor and Description |
---|
EmbeddedRocksDBStateBackend()
Creates a new
EmbeddedRocksDBStateBackend for storing local state. |
EmbeddedRocksDBStateBackend(boolean enableIncrementalCheckpointing)
Creates a new
EmbeddedRocksDBStateBackend for storing local state. |
EmbeddedRocksDBStateBackend(TernaryBoolean enableIncrementalCheckpointing)
Creates a new
EmbeddedRocksDBStateBackend for storing local state. |
Modifier and Type | Method and Description |
---|---|
EmbeddedRocksDBStateBackend |
configure(ReadableConfig config,
ClassLoader classLoader)
Creates a copy of this state backend that uses the values defined in the configuration for
fields where that were not yet specified in this state backend.
|
<K> AbstractKeyedStateBackend<K> |
createKeyedStateBackend(StateBackend.KeyedStateBackendParameters<K> parameters)
Creates a new
CheckpointableKeyedStateBackend that is responsible for holding
keyed state and checkpointing it. |
OperatorStateBackend |
createOperatorStateBackend(StateBackend.OperatorStateBackendParameters parameters)
Creates a new
OperatorStateBackend that can be used for storing operator state. |
String[] |
getDbStoragePaths()
Gets the configured local DB storage paths, or null, if none were configured.
|
RocksDBMemoryConfiguration |
getMemoryConfiguration()
Gets the memory configuration object, which offers settings to control RocksDB's memory
usage.
|
int |
getNumberOfTransferThreads()
Gets the number of threads used to transfer files while snapshotting/restoring.
|
PredefinedOptions |
getPredefinedOptions()
Gets the currently set predefined options for RocksDB.
|
EmbeddedRocksDBStateBackend.PriorityQueueStateType |
getPriorityQueueStateType()
Gets the type of the priority queue state.
|
RocksDBOptionsFactory |
getRocksDBOptions()
Gets
Options for the RocksDB instances. |
long |
getWriteBatchSize()
Gets the max batch size will be used in
RocksDBWriteBatchWrapper . |
boolean |
isIncrementalCheckpointsEnabled()
Gets whether incremental checkpoints are enabled for this state backend.
|
void |
setDbStoragePath(String path)
Sets the path where the RocksDB local database files should be stored on the local file
system.
|
void |
setDbStoragePaths(String... paths)
Sets the directories in which the local RocksDB database puts its files (like SST and
metadata files).
|
void |
setNumberOfTransferThreads(int numberOfTransferThreads)
Sets the number of threads used to transfer files while snapshotting/restoring.
|
void |
setPredefinedOptions(PredefinedOptions options)
Sets the predefined options for RocksDB.
|
void |
setPriorityQueueStateType(EmbeddedRocksDBStateBackend.PriorityQueueStateType priorityQueueStateType)
Sets the type of the priority queue state.
|
void |
setRocksDBMemoryFactory(RocksDBMemoryControllerUtils.RocksDBMemoryFactory rocksDBMemoryFactory)
Set RocksDBMemoryFactory.
|
void |
setRocksDBOptions(RocksDBOptionsFactory optionsFactory)
Sets
Options for the RocksDB instances. |
void |
setWriteBatchSize(long writeBatchSize)
Sets the max batch size will be used in
RocksDBWriteBatchWrapper , no positive value
will disable memory size controller, just use item count controller. |
boolean |
supportsNoClaimRestoreMode()
Tells if a state backend supports the
RestoreMode.NO_CLAIM mode. |
boolean |
supportsSavepointFormat(SavepointFormatType formatType) |
String |
toString() |
useManagedMemory
getCompressionDecorator
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
createAsyncKeyedStateBackend, getName, supportsAsyncKeyedStateBackend, useManagedMemory
public EmbeddedRocksDBStateBackend()
EmbeddedRocksDBStateBackend
for storing local state.public EmbeddedRocksDBStateBackend(boolean enableIncrementalCheckpointing)
EmbeddedRocksDBStateBackend
for storing local state.enableIncrementalCheckpointing
- True if incremental checkpointing is enabled.public EmbeddedRocksDBStateBackend(TernaryBoolean enableIncrementalCheckpointing)
EmbeddedRocksDBStateBackend
for storing local state.enableIncrementalCheckpointing
- True if incremental checkpointing is enabled.public EmbeddedRocksDBStateBackend configure(ReadableConfig config, ClassLoader classLoader)
configure
in interface ConfigurableStateBackend
config
- The configuration.classLoader
- The class loader.public boolean supportsNoClaimRestoreMode()
StateBackend
RestoreMode.NO_CLAIM
mode.
If a state backend supports NO_CLAIM
mode, it should create an independent
snapshot when it receives CheckpointType.FULL_CHECKPOINT
in Snapshotable.snapshot(long, long, CheckpointStreamFactory, CheckpointOptions)
.
supportsNoClaimRestoreMode
in interface StateBackend
RestoreMode.NO_CLAIM
mode.public boolean supportsSavepointFormat(SavepointFormatType formatType)
supportsSavepointFormat
in interface StateBackend
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(StateBackend.KeyedStateBackendParameters<K> parameters) throws IOException
StateBackend
CheckpointableKeyedStateBackend
that is responsible for holding
keyed state and checkpointing it.
Keyed State is state where each value is bound to a key.
createKeyedStateBackend
in interface StateBackend
createKeyedStateBackend
in class AbstractManagedMemoryStateBackend
K
- The type of the keys by which the state is organized.parameters
- The arguments bundle for creating CheckpointableKeyedStateBackend
.IOException
public OperatorStateBackend createOperatorStateBackend(StateBackend.OperatorStateBackendParameters parameters) throws Exception
StateBackend
OperatorStateBackend
that can be used for storing operator state.
Operator state is state that is associated with parallel operator (or function) instances, rather than with keys.
createOperatorStateBackend
in interface StateBackend
createOperatorStateBackend
in class AbstractStateBackend
parameters
- The arguments bundle for creating OperatorStateBackend
.Exception
- This method may forward all exceptions that occur while instantiating the
backend.public RocksDBMemoryConfiguration getMemoryConfiguration()
public void setDbStoragePath(String path)
Passing null
to this function restores the default behavior, where the configured
temp directories will be used.
path
- The path where the local RocksDB database files are stored.public void setDbStoragePaths(String... paths)
If nothing is configured, these directories default to the TaskManager's local temporary file directories.
Each distinct state will be stored in one path, but when the state backend creates multiple states, they will store their files on different paths.
Passing null
to this function restores the default behavior, where the configured
temp directories will be used.
paths
- The paths across which the local RocksDB database files will be spread.public String[] getDbStoragePaths()
Under these directories on the TaskManager, RocksDB stores its SST files and metadata files. These directories do not need to be persistent, they can be ephermeral, meaning that they are lost on a machine failure, because state in RocksDB is persisted in checkpoints.
If nothing is configured, these directories default to the TaskManager's local temporary file directories.
public boolean isIncrementalCheckpointsEnabled()
public EmbeddedRocksDBStateBackend.PriorityQueueStateType getPriorityQueueStateType()
public void setPriorityQueueStateType(EmbeddedRocksDBStateBackend.PriorityQueueStateType priorityQueueStateType)
public void setPredefinedOptions(@Nonnull PredefinedOptions options)
If user-configured options within RocksDBConfigurableOptions
is set (through
config.yaml) or a user-defined options factory is set (via setRocksDBOptions(RocksDBOptionsFactory)
), then the options from the factory are applied on
top of the here specified predefined options and customized options.
options
- The options to set (must not be null).@VisibleForTesting public PredefinedOptions getPredefinedOptions()
setPredefinedOptions(PredefinedOptions)
) are PredefinedOptions.DEFAULT
.
If user-configured options within RocksDBConfigurableOptions
is set (through
config.yaml) of a user-defined options factory is set (via setRocksDBOptions(RocksDBOptionsFactory)
), then the options from the factory are applied on
top of the predefined and customized options.
public void setRocksDBOptions(RocksDBOptionsFactory optionsFactory)
Options
for the RocksDB instances. Because the options are not
serializable and hold native code references, they must be specified through a factory.
The options created by the factory here are applied on top of the pre-defined options
profile selected via setPredefinedOptions(PredefinedOptions)
and user-configured
options from configuration set by configure(ReadableConfig, ClassLoader)
with keys
in RocksDBConfigurableOptions
.
optionsFactory
- The options factory that lazily creates the RocksDB options.@Nullable public RocksDBOptionsFactory getRocksDBOptions()
Options
for the RocksDB instances.
The options created by the factory here are applied on top of the pre-defined options
profile selected via setPredefinedOptions(PredefinedOptions)
. If the pre-defined
options profile is the default (PredefinedOptions.DEFAULT
), then the factory fully
controls the RocksDB options.
public int getNumberOfTransferThreads()
public void setNumberOfTransferThreads(int numberOfTransferThreads)
numberOfTransferThreads
- The number of threads used to transfer files while
snapshotting/restoring.public long getWriteBatchSize()
RocksDBWriteBatchWrapper
.public void setWriteBatchSize(long writeBatchSize)
RocksDBWriteBatchWrapper
, no positive value
will disable memory size controller, just use item count controller.writeBatchSize
- The size will used to be used in RocksDBWriteBatchWrapper
.public void setRocksDBMemoryFactory(RocksDBMemoryControllerUtils.RocksDBMemoryFactory rocksDBMemoryFactory)
Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.