Class FileSystemCheckpointStorage

  • All Implemented Interfaces:
    Serializable, CheckpointStorage, ConfigurableCheckpointStorage

    @PublicEvolving
    public class FileSystemCheckpointStorage
    extends Object
    implements CheckpointStorage, ConfigurableCheckpointStorage
    FileSystemCheckpointStorage checkpoints state as files to a file system.

    Each checkpoint individually will store all its files in a subdirectory that includes the checkpoint number, such as hdfs://namenode:port/flink-checkpoints/chk-17/.

    State Size Considerations

    This checkpoint storage stores small state chunks directly with the metadata, to avoid creating many small files. The threshold for that is configurable. When increasing this threshold, the size of the checkpoint metadata increases. The checkpoint metadata of all retained completed checkpoints needs to fit into the JobManager's heap memory. This is typically not a problem, unless the threshold getMinFileSizeThreshold() is increased significantly.

    Persistence Guarantees

    Checkpoints from this checkpoint storage are as persistent and available as filesystem that is written to. If the file system is a persistent distributed file system, this checkpoint storage supports highly available setups. The backend additionally supports savepoints and externalized checkpoints.

    Configuration

    As for all checkpoint storage policies, this backend can either be configured within the application (by creating the backend with the respective constructor parameters and setting it on the execution environment) or by specifying it in the Flink configuration.

    If the checkpoint storage was specified in the application, it may pick up additional configuration parameters from the Flink configuration. For example, if the backend if configured in the application without a default savepoint directory, it will pick up a default savepoint directory specified in the Flink configuration of the running job/cluster. That behavior is implemented via the configure(ReadableConfig, ClassLoader) method.

    See Also:
    Serialized Form
    • Constructor Detail

      • FileSystemCheckpointStorage

        public FileSystemCheckpointStorage​(String checkpointDirectory)
        Creates a new checkpoint storage that stores its checkpoint data in the file system and location defined by the given URI.

        A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') must be accessible via FileSystem.get(URI).

        For a Job targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.

        Parameters:
        checkpointDirectory - The path to write checkpoint metadata to.
      • FileSystemCheckpointStorage

        public FileSystemCheckpointStorage​(Path checkpointDirectory)
        Creates a new checkpoint storage that stores its checkpoint data in the file system and location defined by the given URI.

        A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') must be accessible via FileSystem.get(URI).

        For a Job targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.

        Parameters:
        checkpointDirectory - The path to write checkpoint metadata to.
      • FileSystemCheckpointStorage

        public FileSystemCheckpointStorage​(URI checkpointDirectory)
        Creates a new checkpoint storage that stores its checkpoint data in the file system and location defined by the given URI.

        A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') must be accessible via FileSystem.get(URI).

        For a Job targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.

        Parameters:
        checkpointDirectory - The path to write checkpoint metadata to.
      • FileSystemCheckpointStorage

        public FileSystemCheckpointStorage​(URI checkpointDirectory,
                                           int fileStateSizeThreshold)
        Creates a new checkpoint storage that stores its checkpoint data in the file system and location defined by the given URI.

        A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') must be accessible via FileSystem.get(URI).

        For a Job targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.

        Parameters:
        checkpointDirectory - The path to write checkpoint metadata to.
        fileStateSizeThreshold - State below this size will be stored as part of the metadata, rather than in files. If -1, the value configured in the runtime configuration will be used, or the default value (1KB) if nothing is configured.
      • FileSystemCheckpointStorage

        public FileSystemCheckpointStorage​(Path checkpointDirectory,
                                           int fileStateSizeThreshold,
                                           int writeBufferSize)
        Creates a new checkpoint storage that stores its checkpoint data in the file system and location defined by the given URI.

        A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') must be accessible via FileSystem.get(URI).

        For a Job targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.

        Parameters:
        checkpointDirectory - The path to write checkpoint metadata to.
        fileStateSizeThreshold - State below this size will be stored as part of the metadata, rather than in files. If -1, the value configured in the runtime configuration will be used, or the default value (1KB) if nothing is configured.
        writeBufferSize - Write buffer size used to serialize state. If -1, the value configured in the runtime configuration will be used, or the default value (4KB) if nothing is configured.
    • Method Detail

      • configure

        public FileSystemCheckpointStorage configure​(ReadableConfig config,
                                                     ClassLoader classLoader)
                                              throws IllegalConfigurationException
        Description copied from interface: ConfigurableCheckpointStorage
        Creates a variant of the checkpoint storage that applies additional configuration parameters.

        Settings that were directly done on the original checkpoint storage object in the application program typically have precedence over setting picked up from the configuration.

        If no configuration is applied, or if the method directly applies configuration values to the (mutable) checkpoint storage object, this method may return the original checkpoint storage object. Otherwise it typically returns a modified copy.

        Specified by:
        configure in interface ConfigurableCheckpointStorage
        Parameters:
        config - The configuration to pick the values from.
        classLoader - The class loader that should be used to load the checkpoint storage.
        Returns:
        A reconfigured checkpoint storage.
        Throws:
        IllegalConfigurationException - Thrown if the configuration contained invalid entries.
      • resolveCheckpoint

        public CompletedCheckpointStorageLocation resolveCheckpoint​(String pointer)
                                                             throws IOException
        Description copied from interface: CheckpointStorage
        Resolves the given pointer to a checkpoint/savepoint into a checkpoint location. The location supports reading the checkpoint metadata, or disposing the checkpoint storage location.
        Specified by:
        resolveCheckpoint in interface CheckpointStorage
        Parameters:
        pointer - The external checkpoint pointer to resolve.
        Returns:
        The checkpoint location handle.
        Throws:
        IOException - Thrown, if the state backend does not understand the pointer, or if the pointer could not be resolved due to an I/O error.
      • createCheckpointStorage

        public CheckpointStorageAccess createCheckpointStorage​(JobID jobId)
                                                        throws IOException
        Description copied from interface: CheckpointStorage
        Creates a storage for checkpoints for the given job. The checkpoint storage is used to write checkpoint data and metadata.
        Specified by:
        createCheckpointStorage in interface CheckpointStorage
        Parameters:
        jobId - The job to store checkpoint data for.
        Returns:
        A checkpoint storage for the given job.
        Throws:
        IOException - Thrown if the checkpoint storage cannot be initialized.
      • getCheckpointPath

        @Nonnull
        public Path getCheckpointPath()
        Gets the base directory where all the checkpoints are stored. The job-specific checkpoint directory is created inside this directory.
        Returns:
        The base directory for checkpoints.
      • getSavepointPath

        @Nullable
        public Path getSavepointPath()
        Returns:
        The default location where savepoints will be externalized if set.
      • getMinFileSizeThreshold

        public int getMinFileSizeThreshold()
        Gets the threshold below which state is stored as part of the metadata, rather than in files. This threshold ensures that the backend does not create a large amount of very small files, where potentially the file pointers are larger than the state itself.

        If not explicitly configured, this is the default value of CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.

        Returns:
        The file size threshold, in bytes.
      • getWriteBufferSize

        public int getWriteBufferSize()
        Gets the write buffer size for created checkpoint stream.

        If not explicitly configured, this is the default value of CheckpointingOptions.FS_WRITE_BUFFER_SIZE.

        Returns:
        The write buffer size, in bytes.
      • hashCode

        public int hashCode()
        Overrides:
        hashCode in class Object