public class FsStateBackend extends AbstractStateBackend
The state backend has one core directory into which it puts all checkpoint data. Inside that
directory, it creates a directory per job, inside which each checkpoint gets a directory, with
files for each state, for example:
hdfs://namenode:port/flink-checkpoints/<job-id>/chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8
Modifier and Type | Field and Description |
---|---|
static int |
DEFAULT_FILE_STATE_THRESHOLD
By default, state smaller than 1024 bytes will not be written to files, but
will be stored directly with the metadata
|
FS_STATE_BACKEND_NAME, MEMORY_STATE_BACKEND_NAME, ROCKSDB_STATE_BACKEND_NAME
Constructor and Description |
---|
FsStateBackend(Path checkpointDataUri)
Creates a new state backend that stores its checkpoint data in the file system and location
defined by the given URI.
|
FsStateBackend(Path checkpointDataUri,
boolean asynchronousSnapshots)
Creates a new state backend that stores its checkpoint data in the file system and location
defined by the given URI.
|
FsStateBackend(String checkpointDataUri)
Creates a new state backend that stores its checkpoint data in the file system and location
defined by the given URI.
|
FsStateBackend(String checkpointDataUri,
boolean asynchronousSnapshots)
Creates a new state backend that stores its checkpoint data in the file system and location
defined by the given URI.
|
FsStateBackend(URI checkpointDataUri)
Creates a new state backend that stores its checkpoint data in the file system and location
defined by the given URI.
|
FsStateBackend(URI checkpointDataUri,
boolean asynchronousSnapshots)
Creates a new state backend that stores its checkpoint data in the file system and location
defined by the given URI.
|
FsStateBackend(URI checkpointDataUri,
int fileStateSizeThreshold)
Creates a new state backend that stores its checkpoint data in the file system and location
defined by the given URI.
|
FsStateBackend(URI checkpointDataUri,
int fileStateSizeThreshold,
boolean asynchronousSnapshots)
Creates a new state backend that stores its checkpoint data in the file system and location
defined by the given URI.
|
Modifier and Type | Method and Description |
---|---|
<K> AbstractKeyedStateBackend<K> |
createKeyedStateBackend(Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry)
Creates a new
AbstractKeyedStateBackend that is responsible for holding keyed state
and checkpointing it. |
OperatorStateBackend |
createOperatorStateBackend(Environment env,
String operatorIdentifier)
Creates a new
OperatorStateBackend that can be used for storing operator state. |
CheckpointStreamFactory |
createSavepointStreamFactory(JobID jobId,
String operatorIdentifier,
String targetLocation)
Creates a
CheckpointStreamFactory that can be used to create streams
that should end up in a savepoint. |
CheckpointStreamFactory |
createStreamFactory(JobID jobId,
String operatorIdentifier)
Creates a
CheckpointStreamFactory that can be used to create streams
that should end up in a checkpoint. |
Path |
getBasePath()
Gets the base directory where all state-containing files are stored.
|
int |
getMinFileSizeThreshold()
Gets the threshold below which state is stored as part of the metadata, rather than in files.
|
String |
toString() |
loadStateBackendFromConfig, loadStateBackendFromConfigOrCreateDefault
public static final int DEFAULT_FILE_STATE_THRESHOLD
public FsStateBackend(String checkpointDataUri) throws IOException
A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
must be accessible via FileSystem.get(URI)
.
For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
checkpointDataUri
- The URI describing the filesystem (scheme and optionally authority),
and the path to the checkpoint data directory.IOException
- Thrown, if no file system can be found for the scheme in the URI.public FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots) throws IOException
A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
must be accessible via FileSystem.get(URI)
.
For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
checkpointDataUri
- The URI describing the filesystem (scheme and optionally authority),
and the path to the checkpoint data directory.asynchronousSnapshots
- Switch to enable asynchronous snapshots.IOException
- Thrown, if no file system can be found for the scheme in the URI.public FsStateBackend(Path checkpointDataUri) throws IOException
A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
must be accessible via FileSystem.get(URI)
.
For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
checkpointDataUri
- The URI describing the filesystem (scheme and optionally authority),
and the path to the checkpoint data directory.IOException
- Thrown, if no file system can be found for the scheme in the URI.public FsStateBackend(Path checkpointDataUri, boolean asynchronousSnapshots) throws IOException
A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
must be accessible via FileSystem.get(URI)
.
For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
checkpointDataUri
- The URI describing the filesystem (scheme and optionally authority),
and the path to the checkpoint data directory.asynchronousSnapshots
- Switch to enable asynchronous snapshots.IOException
- Thrown, if no file system can be found for the scheme in the URI.public FsStateBackend(URI checkpointDataUri) throws IOException
A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
must be accessible via FileSystem.get(URI)
.
For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
checkpointDataUri
- The URI describing the filesystem (scheme and optionally authority),
and the path to the checkpoint data directory.IOException
- Thrown, if no file system can be found for the scheme in the URI.public FsStateBackend(URI checkpointDataUri, boolean asynchronousSnapshots) throws IOException
A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
must be accessible via FileSystem.get(URI)
.
For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
checkpointDataUri
- The URI describing the filesystem (scheme and optionally authority),
and the path to the checkpoint data directory.asynchronousSnapshots
- Switch to enable asynchronous snapshots.IOException
- Thrown, if no file system can be found for the scheme in the URI.public FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold) throws IOException
A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
must be accessible via FileSystem.get(URI)
.
For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
checkpointDataUri
- The URI describing the filesystem (scheme and optionally authority),
and the path to the checkpoint data directory.fileStateSizeThreshold
- State up to this size will be stored as part of the metadata,
rather than in filesIOException
- Thrown, if no file system can be found for the scheme in the URI.IllegalArgumentException
- Thrown, if the fileStateSizeThreshold
is out of bounds.public FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold, boolean asynchronousSnapshots) throws IOException
A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
must be accessible via FileSystem.get(URI)
.
For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
checkpointDataUri
- The URI describing the filesystem (scheme and optionally authority),
and the path to the checkpoint data directory.fileStateSizeThreshold
- State up to this size will be stored as part of the metadata,
rather than in filesasynchronousSnapshots
- Switch to enable asynchronous snapshots.IOException
- Thrown, if no file system can be found for the scheme in the URI.public Path getBasePath()
public int getMinFileSizeThreshold()
By default, this threshold is 1024.
public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException
StateBackend
CheckpointStreamFactory
that can be used to create streams
that should end up in a checkpoint.createStreamFactory
in interface StateBackend
createStreamFactory
in class AbstractStateBackend
jobId
- The JobID
of the job for which we are creating checkpoint streams.operatorIdentifier
- An identifier of the operator for which we create streams.IOException
public CheckpointStreamFactory createSavepointStreamFactory(JobID jobId, String operatorIdentifier, String targetLocation) throws IOException
StateBackend
CheckpointStreamFactory
that can be used to create streams
that should end up in a savepoint.
This is only called if the triggered checkpoint is a savepoint. Commonly this will return the same factory as for regular checkpoints, but maybe slightly adjusted.
createSavepointStreamFactory
in interface StateBackend
createSavepointStreamFactory
in class AbstractStateBackend
jobId
- The JobID
of the job for which we are creating checkpoint streams.operatorIdentifier
- An identifier of the operator for which we create streams.targetLocation
- An optional custom location for the savepoint stream.IOException
- Failures during stream creation are forwarded.public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry) throws IOException
StateBackend
AbstractKeyedStateBackend
that is responsible for holding keyed state
and checkpointing it.
Keyed State is state where each value is bound to a key.
createKeyedStateBackend
in interface StateBackend
createKeyedStateBackend
in class AbstractStateBackend
K
- The type of the keys by which the state is organized.IOException
public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception
StateBackend
OperatorStateBackend
that can be used for storing operator state.
Operator state is state that is associated with parallel operator (or function) instances, rather than with keys.
createOperatorStateBackend
in interface StateBackend
createOperatorStateBackend
in class AbstractStateBackend
env
- The runtime environment of the executing task.operatorIdentifier
- The identifier of the operator whose state should be stored.Exception
- This method may forward all exceptions that occur while instantiating the backend.Copyright © 2014–2018 The Apache Software Foundation. All rights reserved.