Class FileSystemCheckpointStorage
- java.lang.Object
-
- org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage
-
- All Implemented Interfaces:
Serializable
,CheckpointStorage
,ConfigurableCheckpointStorage
@PublicEvolving public class FileSystemCheckpointStorage extends Object implements CheckpointStorage, ConfigurableCheckpointStorage
FileSystemCheckpointStorage
checkpoints state as files to a file system.Each checkpoint individually will store all its files in a subdirectory that includes the checkpoint number, such as
hdfs://namenode:port/flink-checkpoints/chk-17/
.State Size Considerations
This checkpoint storage stores small state chunks directly with the metadata, to avoid creating many small files. The threshold for that is configurable. When increasing this threshold, the size of the checkpoint metadata increases. The checkpoint metadata of all retained completed checkpoints needs to fit into the JobManager's heap memory. This is typically not a problem, unless the threshold
getMinFileSizeThreshold()
is increased significantly.Persistence Guarantees
Checkpoints from this checkpoint storage are as persistent and available as filesystem that is written to. If the file system is a persistent distributed file system, this checkpoint storage supports highly available setups. The backend additionally supports savepoints and externalized checkpoints.
Configuration
As for all checkpoint storage policies, this backend can either be configured within the application (by creating the backend with the respective constructor parameters and setting it on the execution environment) or by specifying it in the Flink configuration.
If the checkpoint storage was specified in the application, it may pick up additional configuration parameters from the Flink configuration. For example, if the backend if configured in the application without a default savepoint directory, it will pick up a default savepoint directory specified in the Flink configuration of the running job/cluster. That behavior is implemented via the
configure(ReadableConfig, ClassLoader)
method.- See Also:
- Serialized Form
-
-
Constructor Summary
Constructors Constructor Description FileSystemCheckpointStorage(String checkpointDirectory)
Creates a new checkpoint storage that stores its checkpoint data in the file system and location defined by the given URI.FileSystemCheckpointStorage(URI checkpointDirectory)
Creates a new checkpoint storage that stores its checkpoint data in the file system and location defined by the given URI.FileSystemCheckpointStorage(URI checkpointDirectory, int fileStateSizeThreshold)
Creates a new checkpoint storage that stores its checkpoint data in the file system and location defined by the given URI.FileSystemCheckpointStorage(Path checkpointDirectory)
Creates a new checkpoint storage that stores its checkpoint data in the file system and location defined by the given URI.FileSystemCheckpointStorage(Path checkpointDirectory, int fileStateSizeThreshold, int writeBufferSize)
Creates a new checkpoint storage that stores its checkpoint data in the file system and location defined by the given URI.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description FileSystemCheckpointStorage
configure(ReadableConfig config, ClassLoader classLoader)
Creates a variant of the checkpoint storage that applies additional configuration parameters.CheckpointStorageAccess
createCheckpointStorage(JobID jobId)
Creates a storage for checkpoints for the given job.static FileSystemCheckpointStorage
createFromConfig(ReadableConfig config, ClassLoader classLoader)
Creates a newFileSystemCheckpointStorage
using the given configuration.boolean
equals(Object other)
Path
getCheckpointPath()
Gets the base directory where all the checkpoints are stored.int
getMinFileSizeThreshold()
Gets the threshold below which state is stored as part of the metadata, rather than in files.Path
getSavepointPath()
int
getWriteBufferSize()
Gets the write buffer size for created checkpoint stream.int
hashCode()
CompletedCheckpointStorageLocation
resolveCheckpoint(String pointer)
Resolves the given pointer to a checkpoint/savepoint into a checkpoint location.
-
-
-
Constructor Detail
-
FileSystemCheckpointStorage
public FileSystemCheckpointStorage(String checkpointDirectory)
Creates a new checkpoint storage that stores its checkpoint data in the file system and location defined by the given URI.A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') must be accessible via
FileSystem.get(URI)
.For a Job targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
- Parameters:
checkpointDirectory
- The path to write checkpoint metadata to.
-
FileSystemCheckpointStorage
public FileSystemCheckpointStorage(Path checkpointDirectory)
Creates a new checkpoint storage that stores its checkpoint data in the file system and location defined by the given URI.A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') must be accessible via
FileSystem.get(URI)
.For a Job targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
- Parameters:
checkpointDirectory
- The path to write checkpoint metadata to.
-
FileSystemCheckpointStorage
public FileSystemCheckpointStorage(URI checkpointDirectory)
Creates a new checkpoint storage that stores its checkpoint data in the file system and location defined by the given URI.A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') must be accessible via
FileSystem.get(URI)
.For a Job targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
- Parameters:
checkpointDirectory
- The path to write checkpoint metadata to.
-
FileSystemCheckpointStorage
public FileSystemCheckpointStorage(URI checkpointDirectory, int fileStateSizeThreshold)
Creates a new checkpoint storage that stores its checkpoint data in the file system and location defined by the given URI.A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') must be accessible via
FileSystem.get(URI)
.For a Job targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
- Parameters:
checkpointDirectory
- The path to write checkpoint metadata to.fileStateSizeThreshold
- State below this size will be stored as part of the metadata, rather than in files. If -1, the value configured in the runtime configuration will be used, or the default value (1KB) if nothing is configured.
-
FileSystemCheckpointStorage
public FileSystemCheckpointStorage(Path checkpointDirectory, int fileStateSizeThreshold, int writeBufferSize)
Creates a new checkpoint storage that stores its checkpoint data in the file system and location defined by the given URI.A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') must be accessible via
FileSystem.get(URI)
.For a Job targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
- Parameters:
checkpointDirectory
- The path to write checkpoint metadata to.fileStateSizeThreshold
- State below this size will be stored as part of the metadata, rather than in files. If -1, the value configured in the runtime configuration will be used, or the default value (1KB) if nothing is configured.writeBufferSize
- Write buffer size used to serialize state. If -1, the value configured in the runtime configuration will be used, or the default value (4KB) if nothing is configured.
-
-
Method Detail
-
configure
public FileSystemCheckpointStorage configure(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException
Description copied from interface:ConfigurableCheckpointStorage
Creates a variant of the checkpoint storage that applies additional configuration parameters.Settings that were directly done on the original checkpoint storage object in the application program typically have precedence over setting picked up from the configuration.
If no configuration is applied, or if the method directly applies configuration values to the (mutable) checkpoint storage object, this method may return the original checkpoint storage object. Otherwise it typically returns a modified copy.
- Specified by:
configure
in interfaceConfigurableCheckpointStorage
- Parameters:
config
- The configuration to pick the values from.classLoader
- The class loader that should be used to load the checkpoint storage.- Returns:
- A reconfigured checkpoint storage.
- Throws:
IllegalConfigurationException
- Thrown if the configuration contained invalid entries.
-
createFromConfig
public static FileSystemCheckpointStorage createFromConfig(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException
Creates a newFileSystemCheckpointStorage
using the given configuration.- Parameters:
config
- The Flink configuration (loaded by the TaskManager).classLoader
- The class loader that should be used to load the checkpoint storage.- Returns:
- The created checkpoint storage.
- Throws:
IllegalConfigurationException
- If the configuration misses critical values, or specifies invalid values
-
resolveCheckpoint
public CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) throws IOException
Description copied from interface:CheckpointStorage
Resolves the given pointer to a checkpoint/savepoint into a checkpoint location. The location supports reading the checkpoint metadata, or disposing the checkpoint storage location.- Specified by:
resolveCheckpoint
in interfaceCheckpointStorage
- Parameters:
pointer
- The external checkpoint pointer to resolve.- Returns:
- The checkpoint location handle.
- Throws:
IOException
- Thrown, if the state backend does not understand the pointer, or if the pointer could not be resolved due to an I/O error.
-
createCheckpointStorage
public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException
Description copied from interface:CheckpointStorage
Creates a storage for checkpoints for the given job. The checkpoint storage is used to write checkpoint data and metadata.- Specified by:
createCheckpointStorage
in interfaceCheckpointStorage
- Parameters:
jobId
- The job to store checkpoint data for.- Returns:
- A checkpoint storage for the given job.
- Throws:
IOException
- Thrown if the checkpoint storage cannot be initialized.
-
getCheckpointPath
@Nonnull public Path getCheckpointPath()
Gets the base directory where all the checkpoints are stored. The job-specific checkpoint directory is created inside this directory.- Returns:
- The base directory for checkpoints.
-
getSavepointPath
@Nullable public Path getSavepointPath()
- Returns:
- The default location where savepoints will be externalized if set.
-
getMinFileSizeThreshold
public int getMinFileSizeThreshold()
Gets the threshold below which state is stored as part of the metadata, rather than in files. This threshold ensures that the backend does not create a large amount of very small files, where potentially the file pointers are larger than the state itself.If not explicitly configured, this is the default value of
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD
.- Returns:
- The file size threshold, in bytes.
-
getWriteBufferSize
public int getWriteBufferSize()
Gets the write buffer size for created checkpoint stream.If not explicitly configured, this is the default value of
CheckpointingOptions.FS_WRITE_BUFFER_SIZE
.- Returns:
- The write buffer size, in bytes.
-
-