public class FsStateBackend extends AbstractStateBackend
The state backend has one core directory into which it puts all checkpoint data. Inside that
directory, it creates a directory per job, inside which each checkpoint gets a directory, with
files for each state, for example:
hdfs://namenode:port/flink-checkpoints/<job-id>/chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8
Modifier and Type | Class and Description |
---|---|
static class |
FsStateBackend.FsCheckpointStateOutputStream
A CheckpointStateOutputStream that writes into a file and returns the path to that file upon
closing.
|
AbstractStateBackend.CheckpointStateOutputStream, AbstractStateBackend.CheckpointStateOutputView
Modifier and Type | Field and Description |
---|---|
static int |
DEFAULT_FILE_STATE_THRESHOLD
By default, state smaller than 1024 bytes will not be written to files, but
will be stored directly with the metadata
|
static int |
MAX_FILE_STATE_THRESHOLD
Maximum size of state that is stored with the metadata, rather than in files
|
currentKey, keySerializer, userCodeClassLoader
Constructor and Description |
---|
FsStateBackend(Path checkpointDataUri)
Creates a new state backend that stores its checkpoint data in the file system and location
defined by the given URI.
|
FsStateBackend(String checkpointDataUri)
Creates a new state backend that stores its checkpoint data in the file system and location
defined by the given URI.
|
FsStateBackend(URI checkpointDataUri)
Creates a new state backend that stores its checkpoint data in the file system and location
defined by the given URI.
|
FsStateBackend(URI checkpointDataUri,
int fileStateSizeThreshold)
Creates a new state backend that stores its checkpoint data in the file system and location
defined by the given URI.
|
Modifier and Type | Method and Description |
---|---|
<S extends Serializable> |
checkpointStateSerializable(S state,
long checkpointID,
long timestamp)
Writes the given state into the checkpoint, and returns a handle that can retrieve the state back.
|
void |
close()
Closes the state backend, releasing all internal resources, but does not delete any persistent
checkpoint data.
|
FsStateBackend.FsCheckpointStateOutputStream |
createCheckpointStateOutputStream(long checkpointID,
long timestamp)
Creates an output stream that writes into the state of the given checkpoint.
|
protected <N,T,ACC> FoldingState<T,ACC> |
createFoldingState(TypeSerializer<N> namespaceSerializer,
FoldingStateDescriptor<T,ACC> stateDesc)
Creates and returns a new
FoldingState . |
<N,T> ListState<T> |
createListState(TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<T> stateDesc)
Creates and returns a new
ListState . |
<N,T> ReducingState<T> |
createReducingState(TypeSerializer<N> namespaceSerializer,
ReducingStateDescriptor<T> stateDesc)
Creates and returns a new
ReducingState . |
<N,V> ValueState<V> |
createValueState(TypeSerializer<N> namespaceSerializer,
ValueStateDescriptor<V> stateDesc)
Creates and returns a new
ValueState . |
void |
disposeAllStateForCurrentJob()
Disposes all state associated with the current job.
|
Path |
getBasePath()
Gets the base directory where all state-containing files are stored.
|
Path |
getCheckpointDirectory()
Gets the directory where this state backend stores its checkpoint data.
|
int |
getFileStateSizeThreshold()
Gets the size (in bytes) above which the state will written to files.
|
FileSystem |
getFileSystem()
Gets the file system handle for the file system that stores the state for this backend.
|
void |
initializeForJob(Environment env,
String operatorIdentifier,
TypeSerializer<?> keySerializer)
This method is called by the task upon deployment to initialize the state backend for
data for a specific job.
|
boolean |
isInitialized()
Checks whether this state backend is initialized.
|
String |
toString() |
static Path |
validateAndNormalizeUri(URI checkpointDataUri)
Checks and normalizes the checkpoint data URI.
|
createCheckpointStateOutputView, dispose, getCurrentKey, getPartitionedState, injectKeyValueStateSnapshots, notifyOfCompletedCheckpoint, setCurrentKey, snapshotPartitionedState
public static final int DEFAULT_FILE_STATE_THRESHOLD
public static final int MAX_FILE_STATE_THRESHOLD
public FsStateBackend(String checkpointDataUri) throws IOException
A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
must be accessible via FileSystem.get(URI)
.
For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
checkpointDataUri
- The URI describing the filesystem (scheme and optionally authority),
and the path to the checkpoint data directory.IOException
- Thrown, if no file system can be found for the scheme in the URI.public FsStateBackend(Path checkpointDataUri) throws IOException
A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
must be accessible via FileSystem.get(URI)
.
For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
checkpointDataUri
- The URI describing the filesystem (scheme and optionally authority),
and the path to the checkpoint data directory.IOException
- Thrown, if no file system can be found for the scheme in the URI.public FsStateBackend(URI checkpointDataUri) throws IOException
A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
must be accessible via FileSystem.get(URI)
.
For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
checkpointDataUri
- The URI describing the filesystem (scheme and optionally authority),
and the path to the checkpoint data directory.IOException
- Thrown, if no file system can be found for the scheme in the URI.public FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold) throws IOException
A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
must be accessible via FileSystem.get(URI)
.
For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
checkpointDataUri
- The URI describing the filesystem (scheme and optionally authority),
and the path to the checkpoint data directory.fileStateSizeThreshold
- State up to this size will be stored as part of the metadata,
rather than in filesIOException
- Thrown, if no file system can be found for the scheme in the URI.public Path getBasePath()
public Path getCheckpointDirectory()
public int getFileStateSizeThreshold()
public boolean isInitialized()
public FileSystem getFileSystem()
public void initializeForJob(Environment env, String operatorIdentifier, TypeSerializer<?> keySerializer) throws Exception
AbstractStateBackend
initializeForJob
in class AbstractStateBackend
env
- The Environment
of the task that instantiated the state backendoperatorIdentifier
- Unique identifier for naming states created by this backendException
- Overwritten versions of this method may throw exceptions, in which
case the job that uses the state backend is considered failed during
deployment.public void disposeAllStateForCurrentJob() throws Exception
AbstractStateBackend
disposeAllStateForCurrentJob
in class AbstractStateBackend
Exception
- Exceptions may occur during disposal of the state and should be forwarded.public void close() throws Exception
AbstractStateBackend
close
in class AbstractStateBackend
Exception
- Exceptions can be forwarded and will be logged by the systempublic <N,V> ValueState<V> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<V> stateDesc) throws Exception
AbstractStateBackend
ValueState
.createValueState
in class AbstractStateBackend
N
- The type of the namespace.V
- The type of the value that the ValueState
can store.namespaceSerializer
- TypeSerializer for the state namespace.stateDesc
- The StateDescriptor
that contains the name of the state.Exception
public <N,T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception
AbstractStateBackend
ListState
.createListState
in class AbstractStateBackend
N
- The type of the namespace.T
- The type of the values that the ListState
can store.namespaceSerializer
- TypeSerializer for the state namespace.stateDesc
- The StateDescriptor
that contains the name of the state.Exception
public <N,T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception
AbstractStateBackend
ReducingState
.createReducingState
in class AbstractStateBackend
N
- The type of the namespace.T
- The type of the values that the ListState
can store.namespaceSerializer
- TypeSerializer for the state namespace.stateDesc
- The StateDescriptor
that contains the name of the state.Exception
protected <N,T,ACC> FoldingState<T,ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T,ACC> stateDesc) throws Exception
AbstractStateBackend
FoldingState
.createFoldingState
in class AbstractStateBackend
N
- The type of the namespace.T
- Type of the values folded into the stateACC
- Type of the value in the state *namespaceSerializer
- TypeSerializer for the state namespace.stateDesc
- The StateDescriptor
that contains the name of the state.Exception
public <S extends Serializable> StateHandle<S> checkpointStateSerializable(S state, long checkpointID, long timestamp) throws Exception
AbstractStateBackend
checkpointStateSerializable
in class AbstractStateBackend
S
- The type of the state.state
- The state to be checkpointed.checkpointID
- The ID of the checkpoint.timestamp
- The timestamp of the checkpoint.Exception
- Exceptions may occur during serialization / storing the state and should be forwarded.public FsStateBackend.FsCheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception
AbstractStateBackend
createCheckpointStateOutputStream
in class AbstractStateBackend
checkpointID
- The ID of the checkpoint.timestamp
- The timestamp of the checkpoint.Exception
- Exceptions may occur while creating the stream and should be forwarded.public static Path validateAndNormalizeUri(URI checkpointDataUri) throws IOException
If the URI does not include an authority, but the file system configured for the URI has an authority, then the normalized path will include this authority.
checkpointDataUri
- The URI to check and normalize.IllegalArgumentException
- Thrown, if the URI misses scheme or path.IOException
- Thrown, if no file system can be found for the URI's scheme.Copyright © 2014–2017 The Apache Software Foundation. All rights reserved.