public class FsCheckpointStreamFactory extends Object implements CheckpointStreamFactory
CheckpointStreamFactory
that produces streams that write to a FileSystem
. The
streams from the factory put their data into files with a random name, within the given
directory.
If the state written to the stream is fewer bytes than a configurable threshold, then no files are written, but the state is returned inline in the state handle instead. This reduces the problem of many small files that have only few bytes.
The given target directory must already exist, this factory does not ensure that the directory gets created. That is important, because if this factory checked for directory existence, there would be many checks per checkpoint (from each TaskManager and operator) and such floods of directory existence checks can be prohibitive on larger scale setups for some file systems.
For example many S3 file systems (like Hadoop's s3a) use HTTP HEAD requests to check for the existence of a directory. S3 sometimes limits the number of HTTP HEAD requests to a few hundred per second only. Those numbers are easily reached by moderately large setups. Surprisingly (and fortunately), the actual state writing (POST) have much higher quotas.
Modifier and Type | Class and Description |
---|---|
static class |
FsCheckpointStreamFactory.FsCheckpointStateOutputStream
A
CheckpointStateOutputStream that writes into a file and returns a StreamStateHandle upon closing. |
Modifier and Type | Field and Description |
---|---|
static int |
MAX_FILE_STATE_THRESHOLD
Maximum size of state that is stored with the metadata, rather than in files.
|
Constructor and Description |
---|
FsCheckpointStreamFactory(FileSystem fileSystem,
Path checkpointDirectory,
Path sharedStateDirectory,
int fileStateSizeThreshold,
int writeBufferSize)
Creates a new stream factory that stores its checkpoint data in the file system and location
defined by the given Path.
|
Modifier and Type | Method and Description |
---|---|
boolean |
canFastDuplicate(StreamStateHandle stateHandle,
CheckpointedStateScope scope)
Tells if we can duplicate the given
StreamStateHandle into the path corresponding to
the given CheckpointedStateScope . |
FsCheckpointStreamFactory.FsCheckpointStateOutputStream |
createCheckpointStateOutputStream(CheckpointedStateScope scope)
Creates an new
CheckpointStateOutputStream . |
List<StreamStateHandle> |
duplicate(List<StreamStateHandle> stateHandles,
CheckpointedStateScope scope)
Duplicates
StreamStateHandle into the path corresponding to * the given CheckpointedStateScope . |
String |
toString() |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
couldReuseStateHandle, reusePreviousStateHandle
public static final int MAX_FILE_STATE_THRESHOLD
public FsCheckpointStreamFactory(FileSystem fileSystem, Path checkpointDirectory, Path sharedStateDirectory, int fileStateSizeThreshold, int writeBufferSize)
Important: The given checkpoint directory must already exist. Refer to the class-level JavaDocs for an explanation why this factory must not try and create the checkpoints.
fileSystem
- The filesystem to write to.checkpointDirectory
- The directory for checkpoint exclusive state data.sharedStateDirectory
- The directory for shared checkpoint data.fileStateSizeThreshold
- State up to this size will be stored as part of the metadata,
rather than in fileswriteBufferSize
- The write buffer size.public FsCheckpointStreamFactory.FsCheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException
CheckpointStreamFactory
CheckpointStateOutputStream
. When the stream is closed, it returns a
state handle that can retrieve the state back.createCheckpointStateOutputStream
in interface CheckpointStreamFactory
scope
- The state's scope, whether it is exclusive or shared.IOException
- Exceptions may occur while creating the stream and should be forwarded.public boolean canFastDuplicate(StreamStateHandle stateHandle, CheckpointedStateScope scope) throws IOException
CheckpointStreamFactory
StreamStateHandle
into the path corresponding to
the given CheckpointedStateScope
.
This should be a rather cheap operation, preferably not involving any remote accesses.
canFastDuplicate
in interface CheckpointStreamFactory
stateHandle
- The handle to duplicatescope
- Scope determining the location to duplicate intoIOException
public List<StreamStateHandle> duplicate(List<StreamStateHandle> stateHandles, CheckpointedStateScope scope) throws IOException
CheckpointStreamFactory
StreamStateHandle
into the path corresponding to * the given CheckpointedStateScope
.
You should first check if you can duplicate with CheckpointStreamFactory.canFastDuplicate(StreamStateHandle, CheckpointedStateScope)
.
duplicate
in interface CheckpointStreamFactory
stateHandles
- The handles to duplicatescope
- Scope determining the location to duplicate intoIOException
Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.