Class FsCheckpointStreamFactory
- java.lang.Object
-
- org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory
-
- All Implemented Interfaces:
CheckpointStreamFactory
- Direct Known Subclasses:
FsCheckpointStorageLocation
public class FsCheckpointStreamFactory extends Object implements CheckpointStreamFactory
ACheckpointStreamFactory
that produces streams that write to aFileSystem
. The streams from the factory put their data into files with a random name, within the given directory.If the state written to the stream is fewer bytes than a configurable threshold, then no files are written, but the state is returned inline in the state handle instead. This reduces the problem of many small files that have only few bytes.
Note on directory creation
The given target directory must already exist, this factory does not ensure that the directory gets created. That is important, because if this factory checked for directory existence, there would be many checks per checkpoint (from each TaskManager and operator) and such floods of directory existence checks can be prohibitive on larger scale setups for some file systems.
For example many S3 file systems (like Hadoop's s3a) use HTTP HEAD requests to check for the existence of a directory. S3 sometimes limits the number of HTTP HEAD requests to a few hundred per second only. Those numbers are easily reached by moderately large setups. Surprisingly (and fortunately), the actual state writing (POST) have much higher quotas.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
FsCheckpointStreamFactory.FsCheckpointStateOutputStream
ACheckpointStateOutputStream
that writes into a file and returns aStreamStateHandle
upon closing.
-
Field Summary
Fields Modifier and Type Field Description static int
MAX_FILE_STATE_THRESHOLD
Maximum size of state that is stored with the metadata, rather than in files.
-
Constructor Summary
Constructors Constructor Description FsCheckpointStreamFactory(FileSystem fileSystem, Path checkpointDirectory, Path sharedStateDirectory, int fileStateSizeThreshold, int writeBufferSize)
Creates a new stream factory that stores its checkpoint data in the file system and location defined by the given Path.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description boolean
canFastDuplicate(StreamStateHandle stateHandle, CheckpointedStateScope scope)
Tells if we can duplicate the givenStreamStateHandle
into the path corresponding to the givenCheckpointedStateScope
.FsCheckpointStreamFactory.FsCheckpointStateOutputStream
createCheckpointStateOutputStream(CheckpointedStateScope scope)
Creates an newCheckpointStateOutputStream
.List<StreamStateHandle>
duplicate(List<StreamStateHandle> stateHandles, CheckpointedStateScope scope)
DuplicatesStreamStateHandle
into the path corresponding to * the givenCheckpointedStateScope
.String
toString()
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
-
Methods inherited from interface org.apache.flink.runtime.state.CheckpointStreamFactory
couldReuseStateHandle, reusePreviousStateHandle
-
-
-
-
Field Detail
-
MAX_FILE_STATE_THRESHOLD
public static final int MAX_FILE_STATE_THRESHOLD
Maximum size of state that is stored with the metadata, rather than in files.- See Also:
- Constant Field Values
-
-
Constructor Detail
-
FsCheckpointStreamFactory
public FsCheckpointStreamFactory(FileSystem fileSystem, Path checkpointDirectory, Path sharedStateDirectory, int fileStateSizeThreshold, int writeBufferSize)
Creates a new stream factory that stores its checkpoint data in the file system and location defined by the given Path.Important: The given checkpoint directory must already exist. Refer to the class-level JavaDocs for an explanation why this factory must not try and create the checkpoints.
- Parameters:
fileSystem
- The filesystem to write to.checkpointDirectory
- The directory for checkpoint exclusive state data.sharedStateDirectory
- The directory for shared checkpoint data.fileStateSizeThreshold
- State up to this size will be stored as part of the metadata, rather than in fileswriteBufferSize
- The write buffer size.
-
-
Method Detail
-
createCheckpointStateOutputStream
public FsCheckpointStreamFactory.FsCheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException
Description copied from interface:CheckpointStreamFactory
Creates an newCheckpointStateOutputStream
. When the stream is closed, it returns a state handle that can retrieve the state back.- Specified by:
createCheckpointStateOutputStream
in interfaceCheckpointStreamFactory
- Parameters:
scope
- The state's scope, whether it is exclusive or shared.- Returns:
- An output stream that writes state for the given checkpoint.
- Throws:
IOException
- Exceptions may occur while creating the stream and should be forwarded.
-
canFastDuplicate
public boolean canFastDuplicate(StreamStateHandle stateHandle, CheckpointedStateScope scope) throws IOException
Description copied from interface:CheckpointStreamFactory
Tells if we can duplicate the givenStreamStateHandle
into the path corresponding to the givenCheckpointedStateScope
.This should be a rather cheap operation, preferably not involving any remote accesses.
- Specified by:
canFastDuplicate
in interfaceCheckpointStreamFactory
- Parameters:
stateHandle
- The handle to duplicatescope
- Scope determining the location to duplicate into- Returns:
- true, if we can perform the duplication
- Throws:
IOException
-
duplicate
public List<StreamStateHandle> duplicate(List<StreamStateHandle> stateHandles, CheckpointedStateScope scope) throws IOException
Description copied from interface:CheckpointStreamFactory
DuplicatesStreamStateHandle
into the path corresponding to * the givenCheckpointedStateScope
.You should first check if you can duplicate with
CheckpointStreamFactory.canFastDuplicate(StreamStateHandle, CheckpointedStateScope)
.- Specified by:
duplicate
in interfaceCheckpointStreamFactory
- Parameters:
stateHandles
- The handles to duplicatescope
- Scope determining the location to duplicate into- Returns:
- The duplicated handle
- Throws:
IOException
-
-