@PublicEvolving public class FileSystemCheckpointStorage extends Object implements CheckpointStorage, ConfigurableCheckpointStorage
FileSystemCheckpointStorage
checkpoints state as files to a file system.
Each checkpoint individually will store all its files in a subdirectory that includes the
checkpoint number, such as hdfs://namenode:port/flink-checkpoints/chk-17/
.
This checkpoint storage stores small state chunks directly with the metadata, to avoid
creating many small files. The threshold for that is configurable. When increasing this
threshold, the size of the checkpoint metadata increases. The checkpoint metadata of all retained
completed checkpoints needs to fit into the JobManager's heap memory. This is typically not a
problem, unless the threshold getMinFileSizeThreshold()
is increased significantly.
Checkpoints from this checkpoint storage are as persistent and available as filesystem that is written to. If the file system is a persistent distributed file system, this checkpoint storage supports highly available setups. The backend additionally supports savepoints and externalized checkpoints.
As for all checkpoint storage policies, this backend can either be configured within the application (by creating the backend with the respective constructor parameters and setting it on the execution environment) or by specifying it in the Flink configuration.
If the checkpoint storage was specified in the application, it may pick up additional
configuration parameters from the Flink configuration. For example, if the backend if configured
in the application without a default savepoint directory, it will pick up a default savepoint
directory specified in the Flink configuration of the running job/cluster. That behavior is
implemented via the configure(ReadableConfig, ClassLoader)
method.
Constructor and Description |
---|
FileSystemCheckpointStorage(Path checkpointDirectory)
Creates a new checkpoint storage that stores its checkpoint data in the file system and
location defined by the given URI.
|
FileSystemCheckpointStorage(Path checkpointDirectory,
int fileStateSizeThreshold,
int writeBufferSize)
Creates a new checkpoint storage that stores its checkpoint data in the file system and
location defined by the given URI.
|
FileSystemCheckpointStorage(String checkpointDirectory)
Creates a new checkpoint storage that stores its checkpoint data in the file system and
location defined by the given URI.
|
FileSystemCheckpointStorage(URI checkpointDirectory)
Creates a new checkpoint storage that stores its checkpoint data in the file system and
location defined by the given URI.
|
FileSystemCheckpointStorage(URI checkpointDirectory,
int fileStateSizeThreshold)
Creates a new checkpoint storage that stores its checkpoint data in the file system and
location defined by the given URI.
|
Modifier and Type | Method and Description |
---|---|
FileSystemCheckpointStorage |
configure(ReadableConfig config,
ClassLoader classLoader)
Creates a variant of the checkpoint storage that applies additional configuration parameters.
|
CheckpointStorageAccess |
createCheckpointStorage(JobID jobId)
Creates a storage for checkpoints for the given job.
|
static FileSystemCheckpointStorage |
createFromConfig(ReadableConfig config,
ClassLoader classLoader)
Creates a new
FileSystemCheckpointStorage using the given configuration. |
Path |
getCheckpointPath()
Gets the base directory where all the checkpoints are stored.
|
int |
getMinFileSizeThreshold()
Gets the threshold below which state is stored as part of the metadata, rather than in files.
|
Path |
getSavepointPath() |
int |
getWriteBufferSize()
Gets the write buffer size for created checkpoint stream.
|
CompletedCheckpointStorageLocation |
resolveCheckpoint(String pointer)
Resolves the given pointer to a checkpoint/savepoint into a checkpoint location.
|
public FileSystemCheckpointStorage(String checkpointDirectory)
A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or
'S3://') must be accessible via FileSystem.get(URI)
.
For a Job targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
checkpointDirectory
- The path to write checkpoint metadata to.public FileSystemCheckpointStorage(Path checkpointDirectory)
A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or
'S3://') must be accessible via FileSystem.get(URI)
.
For a Job targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
checkpointDirectory
- The path to write checkpoint metadata to.public FileSystemCheckpointStorage(URI checkpointDirectory)
A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or
'S3://') must be accessible via FileSystem.get(URI)
.
For a Job targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
checkpointDirectory
- The path to write checkpoint metadata to.public FileSystemCheckpointStorage(URI checkpointDirectory, int fileStateSizeThreshold)
A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or
'S3://') must be accessible via FileSystem.get(URI)
.
For a Job targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
checkpointDirectory
- The path to write checkpoint metadata to.fileStateSizeThreshold
- State below this size will be stored as part of the metadata,
rather than in files. If -1, the value configured in the runtime configuration will be
used, or the default value (1KB) if nothing is configured.public FileSystemCheckpointStorage(Path checkpointDirectory, int fileStateSizeThreshold, int writeBufferSize)
A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or
'S3://') must be accessible via FileSystem.get(URI)
.
For a Job targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
checkpointDirectory
- The path to write checkpoint metadata to.fileStateSizeThreshold
- State below this size will be stored as part of the metadata,
rather than in files. If -1, the value configured in the runtime configuration will be
used, or the default value (1KB) if nothing is configured.writeBufferSize
- Write buffer size used to serialize state. If -1, the value configured
in the runtime configuration will be used, or the default value (4KB) if nothing is
configured.public FileSystemCheckpointStorage configure(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException
ConfigurableCheckpointStorage
Settings that were directly done on the original checkpoint storage object in the application program typically have precedence over setting picked up from the configuration.
If no configuration is applied, or if the method directly applies configuration values to the (mutable) checkpoint storage object, this method may return the original checkpoint storage object. Otherwise it typically returns a modified copy.
configure
in interface ConfigurableCheckpointStorage
config
- The configuration to pick the values from.classLoader
- The class loader that should be used to load the checkpoint storage.IllegalConfigurationException
- Thrown if the configuration contained invalid entries.public static FileSystemCheckpointStorage createFromConfig(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException
FileSystemCheckpointStorage
using the given configuration.config
- The Flink configuration (loaded by the TaskManager).classLoader
- The class loader that should be used to load the checkpoint storage.IllegalConfigurationException
- If the configuration misses critical values, or
specifies invalid valuespublic CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) throws IOException
CheckpointStorage
resolveCheckpoint
in interface CheckpointStorage
pointer
- The external checkpoint pointer to resolve.IOException
- Thrown, if the state backend does not understand the pointer, or if the
pointer could not be resolved due to an I/O error.public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException
CheckpointStorage
createCheckpointStorage
in interface CheckpointStorage
jobId
- The job to store checkpoint data for.IOException
- Thrown if the checkpoint storage cannot be initialized.@Nonnull public Path getCheckpointPath()
@Nullable public Path getSavepointPath()
public int getMinFileSizeThreshold()
If not explicitly configured, this is the default value of CheckpointingOptions.FS_SMALL_FILE_THRESHOLD
.
public int getWriteBufferSize()
If not explicitly configured, this is the default value of CheckpointingOptions.FS_WRITE_BUFFER_SIZE
.
Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.