public class DbStateBackend extends AbstractStateBackend
AbstractStateBackend
for storing checkpoints in JDBC supporting databases.
Key-Value state is stored out-of-core and is lazily fetched using the
LazyDbValueState
implementation. A different backend can also be
provided in the constructor to store the non-partitioned states. A common use
case would be to store the key-value states in the database and store larger
non-partitioned states on a distributed file system.
This backend implementation also allows the sharding of the checkpointed
states among multiple database instances, which can be enabled by passing
multiple database urls to the DbBackendConfig
instance.
By default there are multiple tables created in the given databases: 1 table for non-partitioned checkpoints and 1 table for each key-value state in the streaming program.
To control table creation, insert/lookup operations and to provide
compatibility for different SQL implementations, a custom
MySqlAdapter
can be supplied in the DbBackendConfig
.
AbstractStateBackend.CheckpointStateOutputStream, AbstractStateBackend.CheckpointStateOutputView
currentKey, keySerializer, userCodeClassLoader
Constructor and Description |
---|
DbStateBackend(DbBackendConfig backendConfig)
Create a new
DbStateBackend using the provided
DbBackendConfig configuration. |
DbStateBackend(DbBackendConfig backendConfig,
AbstractStateBackend backend)
Create a new
DbStateBackend using the provided
DbBackendConfig configuration and a different backend for storing
non-partitioned state snapshots. |
Modifier and Type | Method and Description |
---|---|
<S extends Serializable> |
checkpointStateSerializable(S state,
long checkpointID,
long timestamp)
Writes the given state into the checkpoint, and returns a handle that can retrieve the state back.
|
void |
close()
Closes the state backend, releasing all internal resources, but does not delete any persistent
checkpoint data.
|
AbstractStateBackend.CheckpointStateOutputStream |
createCheckpointStateOutputStream(long checkpointID,
long timestamp)
Creates an output stream that writes into the state of the given checkpoint.
|
protected <N,T,ACC> FoldingState<T,ACC> |
createFoldingState(TypeSerializer<N> namespaceSerializer,
FoldingStateDescriptor<T,ACC> stateDesc)
Creates and returns a new
FoldingState . |
protected <N,T> ListState<T> |
createListState(TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<T> stateDesc)
Creates and returns a new
ListState . |
protected <N,T> ReducingState<T> |
createReducingState(TypeSerializer<N> namespaceSerializer,
ReducingStateDescriptor<T> stateDesc)
Creates and returns a new
ReducingState . |
protected <N,T> ValueState<T> |
createValueState(TypeSerializer<N> namespaceSerializer,
ValueStateDescriptor<T> stateDesc)
Creates and returns a new
ValueState . |
void |
disposeAllStateForCurrentJob()
Disposes all state associated with the current job.
|
DbBackendConfig |
getConfiguration()
Get the backend configuration object.
|
ShardedConnection |
getConnections()
Get the database connections maintained by the backend.
|
Environment |
getEnvironment() |
void |
initializeForJob(Environment env,
String operatorIdentifier,
TypeSerializer<?> keySerializer)
This method is called by the task upon deployment to initialize the state backend for
data for a specific job.
|
boolean |
isInitialized()
Check whether the backend has been initialized.
|
createCheckpointStateOutputView, dispose, getCurrentKey, getPartitionedState, injectKeyValueStateSnapshots, notifyOfCompletedCheckpoint, setCurrentKey, snapshotPartitionedState
public DbStateBackend(DbBackendConfig backendConfig)
DbStateBackend
using the provided
DbBackendConfig
configuration.public DbStateBackend(DbBackendConfig backendConfig, AbstractStateBackend backend)
DbStateBackend
using the provided
DbBackendConfig
configuration and a different backend for storing
non-partitioned state snapshots.public ShardedConnection getConnections()
public boolean isInitialized()
public Environment getEnvironment()
public DbBackendConfig getConfiguration()
public <S extends Serializable> StateHandle<S> checkpointStateSerializable(S state, long checkpointID, long timestamp) throws Exception
AbstractStateBackend
checkpointStateSerializable
in class AbstractStateBackend
S
- The type of the state.state
- The state to be checkpointed.checkpointID
- The ID of the checkpoint.timestamp
- The timestamp of the checkpoint.Exception
- Exceptions may occur during serialization / storing the state and should be forwarded.public AbstractStateBackend.CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception
AbstractStateBackend
createCheckpointStateOutputStream
in class AbstractStateBackend
checkpointID
- The ID of the checkpoint.timestamp
- The timestamp of the checkpoint.Exception
- Exceptions may occur while creating the stream and should be forwarded.protected <N,T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> stateDesc) throws Exception
AbstractStateBackend
ValueState
.createValueState
in class AbstractStateBackend
N
- The type of the namespace.T
- The type of the value that the ValueState
can store.namespaceSerializer
- TypeSerializer for the state namespace.stateDesc
- The StateDescriptor
that contains the name of the state.Exception
protected <N,T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception
AbstractStateBackend
ListState
.createListState
in class AbstractStateBackend
N
- The type of the namespace.T
- The type of the values that the ListState
can store.namespaceSerializer
- TypeSerializer for the state namespace.stateDesc
- The StateDescriptor
that contains the name of the state.Exception
protected <N,T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception
AbstractStateBackend
ReducingState
.createReducingState
in class AbstractStateBackend
N
- The type of the namespace.T
- The type of the values that the ListState
can store.namespaceSerializer
- TypeSerializer for the state namespace.stateDesc
- The StateDescriptor
that contains the name of the state.Exception
protected <N,T,ACC> FoldingState<T,ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T,ACC> stateDesc) throws Exception
AbstractStateBackend
FoldingState
.createFoldingState
in class AbstractStateBackend
N
- The type of the namespace.T
- Type of the values folded into the stateACC
- Type of the value in the state *namespaceSerializer
- TypeSerializer for the state namespace.stateDesc
- The StateDescriptor
that contains the name of the state.Exception
public void initializeForJob(Environment env, String operatorIdentifier, TypeSerializer<?> keySerializer) throws Exception
AbstractStateBackend
initializeForJob
in class AbstractStateBackend
env
- The Environment
of the task that instantiated the state backendoperatorIdentifier
- Unique identifier for naming states created by this backendException
- Overwritten versions of this method may throw exceptions, in which
case the job that uses the state backend is considered failed during
deployment.public void close() throws Exception
AbstractStateBackend
close
in class AbstractStateBackend
Exception
- Exceptions can be forwarded and will be logged by the systempublic void disposeAllStateForCurrentJob() throws Exception
AbstractStateBackend
disposeAllStateForCurrentJob
in class AbstractStateBackend
Exception
- Exceptions may occur during disposal of the state and should be forwarded.Copyright © 2014–2017 The Apache Software Foundation. All rights reserved.