Class EmbeddedRocksDBStateBackend
- java.lang.Object
-
- org.apache.flink.runtime.state.AbstractStateBackend
-
- org.apache.flink.runtime.state.AbstractManagedMemoryStateBackend
-
- org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend
-
- All Implemented Interfaces:
Serializable
,ConfigurableStateBackend
,StateBackend
- Direct Known Subclasses:
EmbeddedRocksDBStateBackend
@PublicEvolving public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBackend implements ConfigurableStateBackend
AStateBackend
that stores its state in an embeddedRocksDB
instance. This state backend can store very large state that exceeds memory and spills to local disk. All key/value state (including windows) is stored in the key/value index of RocksDB. For persistence against loss of machines, please configure aCheckpointStorage
instance for the Job.The behavior of the RocksDB instances can be parametrized by setting RocksDB Options using the methods
setPredefinedOptions(PredefinedOptions)
andsetRocksDBOptions(RocksDBOptionsFactory)
.- See Also:
- Serialized Form
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
EmbeddedRocksDBStateBackend.PriorityQueueStateType
The options to chose for the type of priority queue state.-
Nested classes/interfaces inherited from interface org.apache.flink.runtime.state.StateBackend
StateBackend.CustomInitializationMetrics, StateBackend.KeyedStateBackendParameters<K>, StateBackend.OperatorStateBackendParameters
-
-
Field Summary
-
Fields inherited from class org.apache.flink.runtime.state.AbstractStateBackend
latencyTrackingConfigBuilder
-
-
Constructor Summary
Constructors Modifier Constructor Description EmbeddedRocksDBStateBackend()
Creates a newEmbeddedRocksDBStateBackend
for storing local state.EmbeddedRocksDBStateBackend(boolean enableIncrementalCheckpointing)
Creates a newEmbeddedRocksDBStateBackend
for storing local state.protected
EmbeddedRocksDBStateBackend(EmbeddedRocksDBStateBackend original, ReadableConfig config, ClassLoader classLoader)
Private constructor that creates a re-configured copy of the state backend.EmbeddedRocksDBStateBackend(TernaryBoolean enableIncrementalCheckpointing)
Creates a newEmbeddedRocksDBStateBackend
for storing local state.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description EmbeddedRocksDBStateBackend
configure(ReadableConfig config, ClassLoader classLoader)
Creates a copy of this state backend that uses the values defined in the configuration for fields where that were not yet specified in this state backend.<K> AbstractKeyedStateBackend<K>
createKeyedStateBackend(StateBackend.KeyedStateBackendParameters<K> parameters)
Creates a newCheckpointableKeyedStateBackend
that is responsible for holding keyed state and checkpointing it.OperatorStateBackend
createOperatorStateBackend(StateBackend.OperatorStateBackendParameters parameters)
Creates a newOperatorStateBackend
that can be used for storing operator state.String[]
getDbStoragePaths()
Gets the configured local DB storage paths, or null, if none were configured.RocksDBMemoryConfiguration
getMemoryConfiguration()
Gets the memory configuration object, which offers settings to control RocksDB's memory usage.int
getNumberOfTransferThreads()
Gets the number of threads used to transfer files while snapshotting/restoring.PredefinedOptions
getPredefinedOptions()
Gets the currently set predefined options for RocksDB.EmbeddedRocksDBStateBackend.PriorityQueueStateType
getPriorityQueueStateType()
Gets the type of the priority queue state.RocksDBOptionsFactory
getRocksDBOptions()
GetsOptions
for the RocksDB instances.long
getWriteBatchSize()
Gets the max batch size will be used inRocksDBWriteBatchWrapper
.boolean
isIncrementalCheckpointsEnabled()
Gets whether incremental checkpoints are enabled for this state backend.void
setDbStoragePath(String path)
Sets the path where the RocksDB local database files should be stored on the local file system.void
setDbStoragePaths(String... paths)
Sets the directories in which the local RocksDB database puts its files (like SST and metadata files).void
setNumberOfTransferThreads(int numberOfTransferThreads)
Sets the number of threads used to transfer files while snapshotting/restoring.void
setPredefinedOptions(PredefinedOptions options)
Sets the predefined options for RocksDB.void
setPriorityQueueStateType(EmbeddedRocksDBStateBackend.PriorityQueueStateType priorityQueueStateType)
Sets the type of the priority queue state.void
setRocksDBMemoryFactory(RocksDBMemoryControllerUtils.RocksDBMemoryFactory rocksDBMemoryFactory)
Set RocksDBMemoryFactory.void
setRocksDBOptions(RocksDBOptionsFactory optionsFactory)
SetsOptions
for the RocksDB instances.void
setWriteBatchSize(long writeBatchSize)
Sets the max batch size will be used inRocksDBWriteBatchWrapper
, no positive value will disable memory size controller, just use item count controller.boolean
supportsNoClaimRestoreMode()
Tells if a state backend supports theRecoveryClaimMode.NO_CLAIM
mode.boolean
supportsSavepointFormat(SavepointFormatType formatType)
String
toString()
-
Methods inherited from class org.apache.flink.runtime.state.AbstractManagedMemoryStateBackend
useManagedMemory
-
Methods inherited from class org.apache.flink.runtime.state.AbstractStateBackend
getCompressionDecorator
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
-
Methods inherited from interface org.apache.flink.runtime.state.StateBackend
createAsyncKeyedStateBackend, getName, supportsAsyncKeyedStateBackend, useManagedMemory
-
-
-
-
Constructor Detail
-
EmbeddedRocksDBStateBackend
public EmbeddedRocksDBStateBackend()
Creates a newEmbeddedRocksDBStateBackend
for storing local state.
-
EmbeddedRocksDBStateBackend
public EmbeddedRocksDBStateBackend(boolean enableIncrementalCheckpointing)
Creates a newEmbeddedRocksDBStateBackend
for storing local state.- Parameters:
enableIncrementalCheckpointing
- True if incremental checkpointing is enabled.
-
EmbeddedRocksDBStateBackend
public EmbeddedRocksDBStateBackend(TernaryBoolean enableIncrementalCheckpointing)
Creates a newEmbeddedRocksDBStateBackend
for storing local state.- Parameters:
enableIncrementalCheckpointing
- True if incremental checkpointing is enabled.
-
EmbeddedRocksDBStateBackend
protected EmbeddedRocksDBStateBackend(EmbeddedRocksDBStateBackend original, ReadableConfig config, ClassLoader classLoader)
Private constructor that creates a re-configured copy of the state backend.- Parameters:
original
- The state backend to re-configure.config
- The configuration.classLoader
- The class loader.
-
-
Method Detail
-
configure
public EmbeddedRocksDBStateBackend configure(ReadableConfig config, ClassLoader classLoader)
Creates a copy of this state backend that uses the values defined in the configuration for fields where that were not yet specified in this state backend.- Specified by:
configure
in interfaceConfigurableStateBackend
- Parameters:
config
- The configuration.classLoader
- The class loader.- Returns:
- The re-configured variant of the state backend
-
supportsNoClaimRestoreMode
public boolean supportsNoClaimRestoreMode()
Description copied from interface:StateBackend
Tells if a state backend supports theRecoveryClaimMode.NO_CLAIM
mode.If a state backend supports
NO_CLAIM
mode, it should create an independent snapshot when it receivesCheckpointType.FULL_CHECKPOINT
inSnapshotable.snapshot(long, long, CheckpointStreamFactory, CheckpointOptions)
.- Specified by:
supportsNoClaimRestoreMode
in interfaceStateBackend
- Returns:
- If the state backend supports
RecoveryClaimMode.NO_CLAIM
mode.
-
supportsSavepointFormat
public boolean supportsSavepointFormat(SavepointFormatType formatType)
- Specified by:
supportsSavepointFormat
in interfaceStateBackend
-
createKeyedStateBackend
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(StateBackend.KeyedStateBackendParameters<K> parameters) throws IOException
Description copied from interface:StateBackend
Creates a newCheckpointableKeyedStateBackend
that is responsible for holding keyed state and checkpointing it.Keyed State is state where each value is bound to a key.
- Specified by:
createKeyedStateBackend
in interfaceStateBackend
- Specified by:
createKeyedStateBackend
in classAbstractManagedMemoryStateBackend
- Type Parameters:
K
- The type of the keys by which the state is organized.- Parameters:
parameters
- The arguments bundle for creatingCheckpointableKeyedStateBackend
.- Returns:
- The Keyed State Backend for the given job, operator, and key group range.
- Throws:
IOException
-
createOperatorStateBackend
public OperatorStateBackend createOperatorStateBackend(StateBackend.OperatorStateBackendParameters parameters) throws Exception
Description copied from interface:StateBackend
Creates a newOperatorStateBackend
that can be used for storing operator state.Operator state is state that is associated with parallel operator (or function) instances, rather than with keys.
- Specified by:
createOperatorStateBackend
in interfaceStateBackend
- Specified by:
createOperatorStateBackend
in classAbstractStateBackend
- Parameters:
parameters
- The arguments bundle for creatingOperatorStateBackend
.- Returns:
- The OperatorStateBackend for operator identified by the job and operator identifier.
- Throws:
Exception
- This method may forward all exceptions that occur while instantiating the backend.
-
getMemoryConfiguration
public RocksDBMemoryConfiguration getMemoryConfiguration()
Gets the memory configuration object, which offers settings to control RocksDB's memory usage.
-
setDbStoragePath
public void setDbStoragePath(String path)
Sets the path where the RocksDB local database files should be stored on the local file system. Setting this path overrides the default behavior, where the files are stored across the configured temp directories.Passing
null
to this function restores the default behavior, where the configured temp directories will be used.- Parameters:
path
- The path where the local RocksDB database files are stored.
-
setDbStoragePaths
public void setDbStoragePaths(String... paths)
Sets the directories in which the local RocksDB database puts its files (like SST and metadata files). These directories do not need to be persistent, they can be ephemeral, meaning that they are lost on a machine failure, because state in RocksDB is persisted in checkpoints.If nothing is configured, these directories default to the TaskManager's local temporary file directories.
Each distinct state will be stored in one path, but when the state backend creates multiple states, they will store their files on different paths.
Passing
null
to this function restores the default behavior, where the configured temp directories will be used.- Parameters:
paths
- The paths across which the local RocksDB database files will be spread.
-
getDbStoragePaths
public String[] getDbStoragePaths()
Gets the configured local DB storage paths, or null, if none were configured.Under these directories on the TaskManager, RocksDB stores its SST files and metadata files. These directories do not need to be persistent, they can be ephermeral, meaning that they are lost on a machine failure, because state in RocksDB is persisted in checkpoints.
If nothing is configured, these directories default to the TaskManager's local temporary file directories.
-
isIncrementalCheckpointsEnabled
public boolean isIncrementalCheckpointsEnabled()
Gets whether incremental checkpoints are enabled for this state backend.
-
getPriorityQueueStateType
public EmbeddedRocksDBStateBackend.PriorityQueueStateType getPriorityQueueStateType()
Gets the type of the priority queue state. It will fallback to the default value, if it is not explicitly set.- Returns:
- The type of the priority queue state.
-
setPriorityQueueStateType
public void setPriorityQueueStateType(EmbeddedRocksDBStateBackend.PriorityQueueStateType priorityQueueStateType)
Sets the type of the priority queue state. It will fallback to the default value, if it is not explicitly set.
-
setPredefinedOptions
public void setPredefinedOptions(@Nonnull PredefinedOptions options)
Sets the predefined options for RocksDB.If user-configured options within
RocksDBConfigurableOptions
is set (through config.yaml) or a user-defined options factory is set (viasetRocksDBOptions(RocksDBOptionsFactory)
), then the options from the factory are applied on top of the here specified predefined options and customized options.- Parameters:
options
- The options to set (must not be null).
-
getPredefinedOptions
@VisibleForTesting public PredefinedOptions getPredefinedOptions()
Gets the currently set predefined options for RocksDB. The default options (if nothing was set viasetPredefinedOptions(PredefinedOptions)
) arePredefinedOptions.DEFAULT
.If user-configured options within
RocksDBConfigurableOptions
is set (through config.yaml) of a user-defined options factory is set (viasetRocksDBOptions(RocksDBOptionsFactory)
), then the options from the factory are applied on top of the predefined and customized options.- Returns:
- The currently set predefined options for RocksDB.
-
setRocksDBOptions
public void setRocksDBOptions(RocksDBOptionsFactory optionsFactory)
SetsOptions
for the RocksDB instances. Because the options are not serializable and hold native code references, they must be specified through a factory.The options created by the factory here are applied on top of the pre-defined options profile selected via
setPredefinedOptions(PredefinedOptions)
and user-configured options from configuration set byconfigure(ReadableConfig, ClassLoader)
with keys inRocksDBConfigurableOptions
.- Parameters:
optionsFactory
- The options factory that lazily creates the RocksDB options.
-
getRocksDBOptions
@Nullable public RocksDBOptionsFactory getRocksDBOptions()
GetsOptions
for the RocksDB instances.The options created by the factory here are applied on top of the pre-defined options profile selected via
setPredefinedOptions(PredefinedOptions)
. If the pre-defined options profile is the default (PredefinedOptions.DEFAULT
), then the factory fully controls the RocksDB options.
-
getNumberOfTransferThreads
public int getNumberOfTransferThreads()
Gets the number of threads used to transfer files while snapshotting/restoring.
-
setNumberOfTransferThreads
public void setNumberOfTransferThreads(int numberOfTransferThreads)
Sets the number of threads used to transfer files while snapshotting/restoring.- Parameters:
numberOfTransferThreads
- The number of threads used to transfer files while snapshotting/restoring.
-
getWriteBatchSize
public long getWriteBatchSize()
Gets the max batch size will be used inRocksDBWriteBatchWrapper
.
-
setWriteBatchSize
public void setWriteBatchSize(long writeBatchSize)
Sets the max batch size will be used inRocksDBWriteBatchWrapper
, no positive value will disable memory size controller, just use item count controller.- Parameters:
writeBatchSize
- The size will used to be used inRocksDBWriteBatchWrapper
.
-
setRocksDBMemoryFactory
public void setRocksDBMemoryFactory(RocksDBMemoryControllerUtils.RocksDBMemoryFactory rocksDBMemoryFactory)
Set RocksDBMemoryFactory.
-
-