@Deprecated public class RocksDBStateBackend extends AbstractManagedMemoryStateBackend implements CheckpointStorage, ConfigurableStateBackend
RocksDBStateBackend
is deprecated in favor of EmbeddedRocksDBStateBackend
. and FileSystemCheckpointStorage
. This change does not affect
the runtime characteristics of your Jobs and is simply an API change to help better communicate
the ways Flink separates local state storage from fault tolerance. Jobs can be upgraded without
loss of state. If configuring your state backend via the StreamExecutionEnvironment
please make the following changes.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("hdfs://checkpoints");
If you are configuring your state backend via the flink-conf.yaml
no changes are
required.
A State Backend that stores its state in RocksDB
. This state backend can store very
large state that exceeds memory and spills to disk.
All key/value state (including windows) is stored in the key/value index of RocksDB. For persistence against loss of machines, checkpoints take a snapshot of the RocksDB database, and persist that snapshot in a file system (by default) or another configurable state backend.
The behavior of the RocksDB instances can be parametrized by setting RocksDB Options using the
methods setPredefinedOptions(PredefinedOptions)
and setRocksDBOptions(RocksDBOptionsFactory)
.
Modifier and Type | Class and Description |
---|---|
static class |
RocksDBStateBackend.PriorityQueueStateType
Deprecated.
|
latencyTrackingConfigBuilder
Constructor and Description |
---|
RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend)
Deprecated.
Use
RocksDBStateBackend(StateBackend) instead. |
RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend,
boolean enableIncrementalCheckpointing)
Deprecated.
Use
RocksDBStateBackend(StateBackend, TernaryBoolean) instead. |
RocksDBStateBackend(StateBackend checkpointStreamBackend)
Deprecated.
Creates a new
RocksDBStateBackend that uses the given state backend to store its
checkpoint data streams. |
RocksDBStateBackend(StateBackend checkpointStreamBackend,
TernaryBoolean enableIncrementalCheckpointing)
Deprecated.
Creates a new
RocksDBStateBackend that uses the given state backend to store its
checkpoint data streams. |
RocksDBStateBackend(String checkpointDataUri)
Deprecated.
Creates a new
RocksDBStateBackend that stores its checkpoint data in the file system
and location defined by the given URI. |
RocksDBStateBackend(String checkpointDataUri,
boolean enableIncrementalCheckpointing)
Deprecated.
Creates a new
RocksDBStateBackend that stores its checkpoint data in the file system
and location defined by the given URI. |
RocksDBStateBackend(URI checkpointDataUri)
Deprecated.
Creates a new
RocksDBStateBackend that stores its checkpoint data in the file system
and location defined by the given URI. |
RocksDBStateBackend(URI checkpointDataUri,
boolean enableIncrementalCheckpointing)
Deprecated.
Creates a new
RocksDBStateBackend that stores its checkpoint data in the file system
and location defined by the given URI. |
Modifier and Type | Method and Description |
---|---|
RocksDBStateBackend |
configure(ReadableConfig config,
ClassLoader classLoader)
Deprecated.
Creates a copy of this state backend that uses the values defined in the configuration for
fields where that were not yet specified in this state backend.
|
CheckpointStorageAccess |
createCheckpointStorage(JobID jobId)
Deprecated.
Creates a storage for checkpoints for the given job.
|
<K> AbstractKeyedStateBackend<K> |
createKeyedStateBackend(Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider,
MetricGroup metricGroup,
Collection<KeyedStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry)
Deprecated.
Creates a new
CheckpointableKeyedStateBackend that is responsible for holding
keyed state and checkpointing it. |
<K> AbstractKeyedStateBackend<K> |
createKeyedStateBackend(Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider,
MetricGroup metricGroup,
Collection<KeyedStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry,
double managedMemoryFraction)
Deprecated.
Creates a new
CheckpointableKeyedStateBackend with the given managed memory fraction. |
OperatorStateBackend |
createOperatorStateBackend(Environment env,
String operatorIdentifier,
Collection<OperatorStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry)
Deprecated.
Creates a new
OperatorStateBackend that can be used for storing operator state. |
StateBackend |
getCheckpointBackend()
Deprecated.
Gets the state backend that this RocksDB state backend uses to persist its bytes to.
|
String[] |
getDbStoragePaths()
Deprecated.
Gets the configured local DB storage paths, or null, if none were configured.
|
int |
getNumberOfTransferingThreads()
Deprecated.
Typo in method name. Use
getNumberOfTransferThreads() instead. |
int |
getNumberOfTransferThreads()
Deprecated.
Gets the number of threads used to transfer files while snapshotting/restoring.
|
PredefinedOptions |
getPredefinedOptions()
Deprecated.
Gets the currently set predefined options for RocksDB.
|
RocksDBStateBackend.PriorityQueueStateType |
getPriorityQueueStateType()
Deprecated.
Gets the type of the priority queue state.
|
RocksDBOptionsFactory |
getRocksDBOptions()
Deprecated.
Gets
Options for the RocksDB instances. |
long |
getWriteBatchSize()
Deprecated.
Gets the max batch size will be used in
RocksDBWriteBatchWrapper . |
boolean |
isIncrementalCheckpointsEnabled()
Deprecated.
Gets whether incremental checkpoints are enabled for this state backend.
|
CompletedCheckpointStorageLocation |
resolveCheckpoint(String pointer)
Deprecated.
Resolves the given pointer to a checkpoint/savepoint into a checkpoint location.
|
void |
setDbStoragePath(String path)
Deprecated.
Sets the path where the RocksDB local database files should be stored on the local file
system.
|
void |
setDbStoragePaths(String... paths)
Deprecated.
Sets the directories in which the local RocksDB database puts its files (like SST and
metadata files).
|
void |
setNumberOfTransferingThreads(int numberOfTransferingThreads)
Deprecated.
Typo in method name. Use
setNumberOfTransferThreads(int) instead. |
void |
setNumberOfTransferThreads(int numberOfTransferThreads)
Deprecated.
Sets the number of threads used to transfer files while snapshotting/restoring.
|
void |
setPredefinedOptions(PredefinedOptions options)
Deprecated.
Sets the predefined options for RocksDB.
|
void |
setPriorityQueueStateType(RocksDBStateBackend.PriorityQueueStateType priorityQueueStateType)
Deprecated.
Sets the type of the priority queue state.
|
void |
setRocksDBOptions(RocksDBOptionsFactory optionsFactory)
Deprecated.
Sets
Options for the RocksDB instances. |
void |
setWriteBatchSize(long writeBatchSize)
Deprecated.
Sets the max batch size will be used in
RocksDBWriteBatchWrapper , no positive value
will disable memory size controller, just use item count controller. |
boolean |
supportsNoClaimRestoreMode()
Deprecated.
Tells if a state backend supports the
RestoreMode.NO_CLAIM mode. |
boolean |
supportsSavepointFormat(SavepointFormatType formatType)
Deprecated.
|
String |
toString()
Deprecated.
|
useManagedMemory
getCompressionDecorator
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
getName, useManagedMemory
public RocksDBStateBackend(String checkpointDataUri) throws IOException
RocksDBStateBackend
that stores its checkpoint data in the file system
and location defined by the given URI.
A state backend that stores checkpoints in HDFS or S3 must specify the file system host and port in the URI, or have the Hadoop configuration that describes the file system (host / high-availability group / possibly credentials) either referenced from the Flink config, or included in the classpath.
checkpointDataUri
- The URI describing the filesystem and path to the checkpoint data
directory.IOException
- Thrown, if no file system can be found for the scheme in the URI.public RocksDBStateBackend(String checkpointDataUri, boolean enableIncrementalCheckpointing) throws IOException
RocksDBStateBackend
that stores its checkpoint data in the file system
and location defined by the given URI.
A state backend that stores checkpoints in HDFS or S3 must specify the file system host and port in the URI, or have the Hadoop configuration that describes the file system (host / high-availability group / possibly credentials) either referenced from the Flink config, or included in the classpath.
checkpointDataUri
- The URI describing the filesystem and path to the checkpoint data
directory.enableIncrementalCheckpointing
- True if incremental checkpointing is enabled.IOException
- Thrown, if no file system can be found for the scheme in the URI.public RocksDBStateBackend(URI checkpointDataUri) throws IOException
RocksDBStateBackend
that stores its checkpoint data in the file system
and location defined by the given URI.
A state backend that stores checkpoints in HDFS or S3 must specify the file system host and port in the URI, or have the Hadoop configuration that describes the file system (host / high-availability group / possibly credentials) either referenced from the Flink config, or included in the classpath.
checkpointDataUri
- The URI describing the filesystem and path to the checkpoint data
directory.IOException
- Thrown, if no file system can be found for the scheme in the URI.public RocksDBStateBackend(URI checkpointDataUri, boolean enableIncrementalCheckpointing) throws IOException
RocksDBStateBackend
that stores its checkpoint data in the file system
and location defined by the given URI.
A state backend that stores checkpoints in HDFS or S3 must specify the file system host and port in the URI, or have the Hadoop configuration that describes the file system (host / high-availability group / possibly credentials) either referenced from the Flink config, or included in the classpath.
checkpointDataUri
- The URI describing the filesystem and path to the checkpoint data
directory.enableIncrementalCheckpointing
- True if incremental checkpointing is enabled.IOException
- Thrown, if no file system can be found for the scheme in the URI.public RocksDBStateBackend(StateBackend checkpointStreamBackend)
RocksDBStateBackend
that uses the given state backend to store its
checkpoint data streams. Typically, one would supply a filesystem or database state backend
here where the snapshots from RocksDB would be stored.
The snapshots of the RocksDB state will be stored using the given backend's CheckpointStorage.createCheckpointStorage(JobID)
.
checkpointStreamBackend
- The backend write the checkpoint streams to.public RocksDBStateBackend(StateBackend checkpointStreamBackend, TernaryBoolean enableIncrementalCheckpointing)
RocksDBStateBackend
that uses the given state backend to store its
checkpoint data streams. Typically, one would supply a filesystem or database state backend
here where the snapshots from RocksDB would be stored.
The snapshots of the RocksDB state will be stored using the given backend's StateBackend#createCheckpointStorage(JobID)
.
checkpointStreamBackend
- The backend write the checkpoint streams to.enableIncrementalCheckpointing
- True if incremental checkpointing is enabled.@Deprecated public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend)
RocksDBStateBackend(StateBackend)
instead.@Deprecated public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend, boolean enableIncrementalCheckpointing)
RocksDBStateBackend(StateBackend, TernaryBoolean)
instead.public RocksDBStateBackend configure(ReadableConfig config, ClassLoader classLoader)
configure
in interface ConfigurableStateBackend
config
- The configuration.classLoader
- The class loader.public StateBackend getCheckpointBackend()
This RocksDB state backend only implements the RocksDB specific parts, it relies on the 'CheckpointBackend' to persist the checkpoint and savepoint bytes streams.
public boolean supportsNoClaimRestoreMode()
StateBackend
RestoreMode.NO_CLAIM
mode.
If a state backend supports NO_CLAIM
mode, it should create an independent
snapshot when it receives CheckpointType.FULL_CHECKPOINT
in Snapshotable.snapshot(long, long, CheckpointStreamFactory, CheckpointOptions)
.
supportsNoClaimRestoreMode
in interface StateBackend
RestoreMode.NO_CLAIM
mode.public boolean supportsSavepointFormat(SavepointFormatType formatType)
supportsSavepointFormat
in interface StateBackend
public CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) throws IOException
CheckpointStorage
resolveCheckpoint
in interface CheckpointStorage
pointer
- The external checkpoint pointer to resolve.IOException
- Thrown, if the state backend does not understand the pointer, or if the
pointer could not be resolved due to an I/O error.public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException
CheckpointStorage
createCheckpointStorage
in interface CheckpointStorage
jobId
- The job to store checkpoint data for.IOException
- Thrown if the checkpoint storage cannot be initialized.public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) throws IOException
StateBackend
CheckpointableKeyedStateBackend
that is responsible for holding
keyed state and checkpointing it.
Keyed State is state where each value is bound to a key.
createKeyedStateBackend
in interface StateBackend
createKeyedStateBackend
in class AbstractStateBackend
K
- The type of the keys by which the state is organized.env
- The environment of the task.jobID
- The ID of the job that the task belongs to.operatorIdentifier
- The identifier text of the operator.keySerializer
- The key-serializer for the operator.numberOfKeyGroups
- The number of key-groups aka max parallelism.keyGroupRange
- Range of key-groups for which the to-be-created backend is responsible.kvStateRegistry
- KvStateRegistry helper for this task.ttlTimeProvider
- Provider for TTL logic to judge about state expiration.metricGroup
- The parent metric group for all state backend metrics.stateHandles
- The state handles for restore.cancelStreamRegistry
- The registry to which created closeable objects will be
registered during restore.IOException
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry, double managedMemoryFraction) throws IOException
StateBackend
CheckpointableKeyedStateBackend
with the given managed memory fraction.
Backends that use managed memory are required to implement this interface.createKeyedStateBackend
in interface StateBackend
createKeyedStateBackend
in class AbstractManagedMemoryStateBackend
IOException
public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier, @Nonnull Collection<OperatorStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) throws Exception
StateBackend
OperatorStateBackend
that can be used for storing operator state.
Operator state is state that is associated with parallel operator (or function) instances, rather than with keys.
createOperatorStateBackend
in interface StateBackend
createOperatorStateBackend
in class AbstractStateBackend
env
- The runtime environment of the executing task.operatorIdentifier
- The identifier of the operator whose state should be stored.stateHandles
- The state handles for restore.cancelStreamRegistry
- The registry to register streams to close if task canceled.Exception
- This method may forward all exceptions that occur while instantiating the
backend.public void setDbStoragePath(String path)
Passing null
to this function restores the default behavior, where the configured
temp directories will be used.
path
- The path where the local RocksDB database files are stored.public void setDbStoragePaths(String... paths)
If nothing is configured, these directories default to the TaskManager's local temporary file directories.
Each distinct state will be stored in one path, but when the state backend creates multiple states, they will store their files on different paths.
Passing null
to this function restores the default behavior, where the configured
temp directories will be used.
paths
- The paths across which the local RocksDB database files will be spread.public String[] getDbStoragePaths()
Under these directories on the TaskManager, RocksDB stores its SST files and metadata files. These directories do not need to be persistent, they can be ephermeral, meaning that they are lost on a machine failure, because state in RocksDB is persisted in checkpoints.
If nothing is configured, these directories default to the TaskManager's local temporary file directories.
public boolean isIncrementalCheckpointsEnabled()
public RocksDBStateBackend.PriorityQueueStateType getPriorityQueueStateType()
public void setPriorityQueueStateType(RocksDBStateBackend.PriorityQueueStateType priorityQueueStateType)
public void setPredefinedOptions(@Nonnull PredefinedOptions options)
If user-configured options within RocksDBConfigurableOptions
is set (through
flink-conf.yaml) or a user-defined options factory is set (via setRocksDBOptions(RocksDBOptionsFactory)
), then the options from the factory are applied on
top of the here specified predefined options and customized options.
options
- The options to set (must not be null).@VisibleForTesting public PredefinedOptions getPredefinedOptions()
setPredefinedOptions(PredefinedOptions)
) are PredefinedOptions.DEFAULT
.
If user-configured options within RocksDBConfigurableOptions
is set (through
flink-conf.yaml) of a user-defined options factory is set (via setRocksDBOptions(RocksDBOptionsFactory)
), then the options from the factory are applied on
top of the predefined and customized options.
public void setRocksDBOptions(RocksDBOptionsFactory optionsFactory)
Options
for the RocksDB instances. Because the options are not
serializable and hold native code references, they must be specified through a factory.
The options created by the factory here are applied on top of the pre-defined options
profile selected via setPredefinedOptions(PredefinedOptions)
. If the pre-defined
options profile is the default (PredefinedOptions.DEFAULT
), then the factory fully
controls the RocksDB options.
optionsFactory
- The options factory that lazily creates the RocksDB options.@Nullable public RocksDBOptionsFactory getRocksDBOptions()
Options
for the RocksDB instances.
The options created by the factory here are applied on top of the pre-defined options
profile selected via setPredefinedOptions(PredefinedOptions)
. If the pre-defined
options profile is the default (PredefinedOptions.DEFAULT
), then the factory fully
controls the RocksDB options.
public int getNumberOfTransferThreads()
public void setNumberOfTransferThreads(int numberOfTransferThreads)
numberOfTransferThreads
- The number of threads used to transfer files while
snapshotting/restoring.@Deprecated public int getNumberOfTransferingThreads()
getNumberOfTransferThreads()
instead.@Deprecated public void setNumberOfTransferingThreads(int numberOfTransferingThreads)
setNumberOfTransferThreads(int)
instead.public long getWriteBatchSize()
RocksDBWriteBatchWrapper
.public void setWriteBatchSize(long writeBatchSize)
RocksDBWriteBatchWrapper
, no positive value
will disable memory size controller, just use item count controller.writeBatchSize
- The size will used to be used in RocksDBWriteBatchWrapper
.Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.