public class RocksDBStateBackend extends AbstractStateBackend
StateBackend
that stores its state in RocksDB
. This state backend can
store very large state that exceeds memory and spills to disk.
All key/value state (including windows) is stored in the key/value index of RocksDB. For persistence against loss of machines, checkpoints take a snapshot of the RocksDB database, and persist that snapshot in a file system (by default) or another configurable state backend.
The behavior of the RocksDB instances can be parametrized by setting RocksDB Options
using the methods setPredefinedOptions(PredefinedOptions)
and
setOptions(OptionsFactory)
.
AbstractStateBackend.CheckpointStateOutputStream, AbstractStateBackend.CheckpointStateOutputView
Modifier and Type | Field and Description |
---|---|
protected org.rocksdb.RocksDB |
db
Our RocksDB data base, this is used by the actual subclasses of
AbstractRocksDBState
to store state. |
currentKey, keySerializer, keyValueStatesByName, userCodeClassLoader
Constructor and Description |
---|
RocksDBStateBackend(String checkpointDataUri)
Creates a new
RocksDBStateBackend that stores its checkpoint data in the
file system and location defined by the given URI. |
RocksDBStateBackend(String checkpointDataUri,
AbstractStateBackend nonPartitionedStateBackend) |
RocksDBStateBackend(URI checkpointDataUri)
Creates a new
RocksDBStateBackend that stores its checkpoint data in the
file system and location defined by the given URI. |
RocksDBStateBackend(URI checkpointDataUri,
AbstractStateBackend nonPartitionedStateBackend) |
Modifier and Type | Method and Description |
---|---|
<S extends Serializable> |
checkpointStateSerializable(S state,
long checkpointID,
long timestamp)
Writes the given state into the checkpoint, and returns a handle that can retrieve the state back.
|
void |
close()
Closes the state backend, dropping and aborting all I/O operations that are currently
pending.
|
AbstractStateBackend.CheckpointStateOutputStream |
createCheckpointStateOutputStream(long checkpointID,
long timestamp)
Creates an output stream that writes into the state of the given checkpoint.
|
protected <N,T,ACC> FoldingState<T,ACC> |
createFoldingState(TypeSerializer<N> namespaceSerializer,
FoldingStateDescriptor<T,ACC> stateDesc)
Creates and returns a new
FoldingState . |
protected <N,T> ListState<T> |
createListState(TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<T> stateDesc)
Creates and returns a new
ListState . |
protected <N,T> ReducingState<T> |
createReducingState(TypeSerializer<N> namespaceSerializer,
ReducingStateDescriptor<T> stateDesc)
Creates and returns a new
ReducingState . |
protected <N,T> ValueState<T> |
createValueState(TypeSerializer<N> namespaceSerializer,
ValueStateDescriptor<T> stateDesc)
Creates and returns a new
ValueState . |
Object |
currentKey()
Used by k/v states to access the current key.
|
void |
disableFullyAsyncSnapshots()
Disables fully asynchronous snapshotting of the partitioned state held in RocksDB.
|
void |
dispose()
Releases all resources held by this state backend.
|
void |
disposeAllStateForCurrentJob()
Disposes all state associated with the current job.
|
void |
enableFullyAsyncSnapshots()
Enables fully asynchronous snapshotting of the partitioned state held in RocksDB.
|
protected org.rocksdb.ColumnFamilyHandle |
getColumnFamily(StateDescriptor descriptor)
Creates a column family handle for use with a k/v state.
|
org.rocksdb.ColumnFamilyOptions |
getColumnOptions()
Gets the RocksDB
ColumnFamilyOptions to be used for all RocksDB instances. |
org.rocksdb.DBOptions |
getDbOptions()
Gets the RocksDB
DBOptions to be used for all RocksDB instances. |
String[] |
getDbStoragePaths() |
OptionsFactory |
getOptions()
Gets the options factory that lazily creates the RocksDB options.
|
PredefinedOptions |
getPredefinedOptions()
Gets the currently set predefined options for RocksDB.
|
File[] |
getStoragePaths()
Visible for tests.
|
void |
initializeForJob(Environment env,
String operatorIdentifier,
TypeSerializer<?> keySerializer)
This method is called by the task upon deployment to initialize the state backend for
data for a specific job.
|
void |
injectKeyValueStateSnapshots(HashMap<String,KvStateSnapshot> keyValueStateSnapshots)
Injects K/V state snapshots for lazy restore.
|
TypeSerializer |
keySerializer()
Used by k/v states to access the key serializer.
|
void |
setDbStoragePath(String path)
Sets the path where the RocksDB local database files should be stored on the local
file system.
|
void |
setDbStoragePaths(String... paths)
Sets the paths across which the local RocksDB database files are distributed on the local
file system.
|
void |
setOptions(OptionsFactory optionsFactory)
Sets
Options for the RocksDB instances. |
void |
setPredefinedOptions(PredefinedOptions options)
Sets the predefined options for RocksDB.
|
HashMap<String,KvStateSnapshot<?,?,?,?,?>> |
snapshotPartitionedState(long checkpointId,
long timestamp) |
createCheckpointStateOutputView, getCurrentKey, getPartitionedState, mergePartitionedStates, notifyOfCompletedCheckpoint, setCurrentKey
protected transient volatile org.rocksdb.RocksDB db
AbstractRocksDBState
to store state. The different k/v states that we have don't each have their own RocksDB
instance. They all write to this instance but to their own column family.public RocksDBStateBackend(String checkpointDataUri) throws IOException
RocksDBStateBackend
that stores its checkpoint data in the
file system and location defined by the given URI.
A state backend that stores checkpoints in HDFS or S3 must specify the file system host and port in the URI, or have the Hadoop configuration that describes the file system (host / high-availability group / possibly credentials) either referenced from the Flink config, or included in the classpath.
checkpointDataUri
- The URI describing the filesystem and path to the checkpoint data directory.IOException
- Thrown, if no file system can be found for the scheme in the URI.public RocksDBStateBackend(URI checkpointDataUri) throws IOException
RocksDBStateBackend
that stores its checkpoint data in the
file system and location defined by the given URI.
A state backend that stores checkpoints in HDFS or S3 must specify the file system host and port in the URI, or have the Hadoop configuration that describes the file system (host / high-availability group / possibly credentials) either referenced from the Flink config, or included in the classpath.
checkpointDataUri
- The URI describing the filesystem and path to the checkpoint data directory.IOException
- Thrown, if no file system can be found for the scheme in the URI.public RocksDBStateBackend(String checkpointDataUri, AbstractStateBackend nonPartitionedStateBackend) throws IOException
IOException
public RocksDBStateBackend(URI checkpointDataUri, AbstractStateBackend nonPartitionedStateBackend) throws IOException
IOException
public void initializeForJob(Environment env, String operatorIdentifier, TypeSerializer<?> keySerializer) throws Exception
AbstractStateBackend
initializeForJob
in class AbstractStateBackend
env
- The Environment
of the task that instantiated the state backendoperatorIdentifier
- Unique identifier for naming states created by this backendException
- Overwritten versions of this method may throw exceptions, in which
case the job that uses the state backend is considered failed during
deployment.public void disposeAllStateForCurrentJob() throws Exception
AbstractStateBackend
disposeAllStateForCurrentJob
in class AbstractStateBackend
Exception
- Exceptions may occur during disposal of the state and should be forwarded.public void dispose() throws Exception
AbstractStateBackend
This method must make sure that all resources are disposed, even if an exception happens on the way.
dispose
in class AbstractStateBackend
Exception
- This method should report exceptions that occur.public void close() throws IOException
AbstractStateBackend
close
in interface Closeable
close
in interface AutoCloseable
close
in class AbstractStateBackend
IOException
- Exceptions can be forwarded and will be logged by the systempublic File[] getStoragePaths()
public HashMap<String,KvStateSnapshot<?,?,?,?,?>> snapshotPartitionedState(long checkpointId, long timestamp) throws Exception
snapshotPartitionedState
in class AbstractStateBackend
Exception
public final void injectKeyValueStateSnapshots(HashMap<String,KvStateSnapshot> keyValueStateSnapshots) throws Exception
AbstractStateBackend
injectKeyValueStateSnapshots
in class AbstractStateBackend
keyValueStateSnapshots
- The Map of snapshotsException
protected org.rocksdb.ColumnFamilyHandle getColumnFamily(StateDescriptor descriptor)
This also checks whether the StateDescriptor
for a state matches the one
that we checkpointed, i.e. is already in the map of column families.
public Object currentKey()
public TypeSerializer keySerializer()
protected <N,T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> stateDesc) throws Exception
AbstractStateBackend
ValueState
.createValueState
in class AbstractStateBackend
N
- The type of the namespace.T
- The type of the value that the ValueState
can store.namespaceSerializer
- TypeSerializer for the state namespace.stateDesc
- The StateDescriptor
that contains the name of the state.Exception
protected <N,T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception
AbstractStateBackend
ListState
.createListState
in class AbstractStateBackend
N
- The type of the namespace.T
- The type of the values that the ListState
can store.namespaceSerializer
- TypeSerializer for the state namespace.stateDesc
- The StateDescriptor
that contains the name of the state.Exception
protected <N,T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception
AbstractStateBackend
ReducingState
.createReducingState
in class AbstractStateBackend
N
- The type of the namespace.T
- The type of the values that the ListState
can store.namespaceSerializer
- TypeSerializer for the state namespace.stateDesc
- The StateDescriptor
that contains the name of the state.Exception
protected <N,T,ACC> FoldingState<T,ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T,ACC> stateDesc) throws Exception
AbstractStateBackend
FoldingState
.createFoldingState
in class AbstractStateBackend
N
- The type of the namespace.T
- Type of the values folded into the stateACC
- Type of the value in the state *namespaceSerializer
- TypeSerializer for the state namespace.stateDesc
- The StateDescriptor
that contains the name of the state.Exception
public AbstractStateBackend.CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception
AbstractStateBackend
createCheckpointStateOutputStream
in class AbstractStateBackend
checkpointID
- The ID of the checkpoint.timestamp
- The timestamp of the checkpoint.Exception
- Exceptions may occur while creating the stream and should be forwarded.public <S extends Serializable> StateHandle<S> checkpointStateSerializable(S state, long checkpointID, long timestamp) throws Exception
AbstractStateBackend
checkpointStateSerializable
in class AbstractStateBackend
S
- The type of the state.state
- The state to be checkpointed.checkpointID
- The ID of the checkpoint.timestamp
- The timestamp of the checkpoint.Exception
- Exceptions may occur during serialization / storing the state and should be forwarded.public void enableFullyAsyncSnapshots()
By default, this is disabled. This means that RocksDB state is copied in a synchronous step, during which normal processing of elements pauses, followed by an asynchronous step of copying the RocksDB backup to the final checkpoint location. Fully asynchronous snapshots take longer (linear time requirement with respect to number of unique keys) but normal processing of elements is not paused.
public void disableFullyAsyncSnapshots()
By default, this is disabled.
public void setDbStoragePath(String path)
Passing null
to this function restores the default behavior, where the configured
temp directories will be used.
path
- The path where the local RocksDB database files are stored.public void setDbStoragePaths(String... paths)
Each distinct state will be stored in one path, but when the state backend creates multiple states, they will store their files on different paths.
Passing null
to this function restores the default behavior, where the configured
temp directories will be used.
paths
- The paths across which the local RocksDB database files will be spread.public String[] getDbStoragePaths()
public void setPredefinedOptions(PredefinedOptions options)
If a user-defined options factory is set (via setOptions(OptionsFactory)
),
then the options from the factory are applied on top of the here specified
predefined options.
options
- The options to set (must not be null).public PredefinedOptions getPredefinedOptions()
setPredefinedOptions(PredefinedOptions)
)
are PredefinedOptions.DEFAULT
.
If a user-defined options factory is set (via setOptions(OptionsFactory)
),
then the options from the factory are applied on top of the predefined options.
public void setOptions(OptionsFactory optionsFactory)
Options
for the RocksDB instances.
Because the options are not serializable and hold native code references,
they must be specified through a factory.
The options created by the factory here are applied on top of the pre-defined
options profile selected via setPredefinedOptions(PredefinedOptions)
.
If the pre-defined options profile is the default
(PredefinedOptions.DEFAULT
), then the factory fully controls the RocksDB
options.
optionsFactory
- The options factory that lazily creates the RocksDB options.public OptionsFactory getOptions()
public org.rocksdb.DBOptions getDbOptions()
DBOptions
to be used for all RocksDB instances.public org.rocksdb.ColumnFamilyOptions getColumnOptions()
ColumnFamilyOptions
to be used for all RocksDB instances.Copyright © 2014–2017 The Apache Software Foundation. All rights reserved.