Class AbstractFsCheckpointStorageAccess

  • All Implemented Interfaces:
    CheckpointStorageAccess, CheckpointStorageCoordinatorView, CheckpointStorageWorkerView
    Direct Known Subclasses:
    FsCheckpointStorageAccess, MemoryBackendCheckpointStorageAccess

    public abstract class AbstractFsCheckpointStorageAccess
    extends Object
    implements CheckpointStorageAccess
    An implementation of durable checkpoint storage to file systems.

    Checkpoint Layout

    The checkpoint storage is configured with a base directory and persists the checkpoint data of specific checkpoints in specific subdirectories. For example, if the base directory was set to hdfs://namenode:port/flink-checkpoints/, the state backend will create a subdirectory with the job's ID that will contain the actual checkpoints: ( hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b)

    Each checkpoint individually will store all its files in a subdirectory that includes the checkpoint number, such as hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b/chk-17/.

    Savepoint Layout

    A savepoint that is set to be stored in path hdfs://namenode:port/flink-savepoints/, will create a subdirectory savepoint-jobId(0, 6)-randomDigits in which it stores all savepoint data. The random digits are added as "entropy" to avoid directory collisions.

    Metadata File

    A completed checkpoint writes its metadata into a file '"_metadata"'.

    • Field Detail

      • CHECKPOINT_DIR_PREFIX

        public static final String CHECKPOINT_DIR_PREFIX
        The prefix of the directory containing the data exclusive to a checkpoint.
        See Also:
        Constant Field Values
      • CHECKPOINT_SHARED_STATE_DIR

        public static final String CHECKPOINT_SHARED_STATE_DIR
        The name of the directory for shared checkpoint state.
        See Also:
        Constant Field Values
      • CHECKPOINT_TASK_OWNED_STATE_DIR

        public static final String CHECKPOINT_TASK_OWNED_STATE_DIR
        The name of the directory for state not owned/released by the master, but by the TaskManagers.
        See Also:
        Constant Field Values
      • METADATA_FILE_NAME

        public static final String METADATA_FILE_NAME
        The name of the metadata files in checkpoints / savepoints.
        See Also:
        Constant Field Values
    • Constructor Detail

      • AbstractFsCheckpointStorageAccess

        protected AbstractFsCheckpointStorageAccess​(JobID jobId,
                                                    @Nullable
                                                    Path defaultSavepointDirectory)
        Creates a new checkpoint storage.
        Parameters:
        jobId - The ID of the job that writes the checkpoints.
        defaultSavepointDirectory - The default location for savepoints, or null, if none is set.
    • Method Detail

      • getDefaultSavepointDirectory

        @Nullable
        public Path getDefaultSavepointDirectory()
        Gets the default directory for savepoints. Returns null, if no default savepoint directory is configured.
      • resolveCheckpoint

        public CompletedCheckpointStorageLocation resolveCheckpoint​(String checkpointPointer)
                                                             throws IOException
        Description copied from interface: CheckpointStorageCoordinatorView
        Resolves the given pointer to a checkpoint/savepoint into a checkpoint location. The location supports reading the checkpoint metadata, or disposing the checkpoint storage location.

        If the state backend cannot understand the format of the pointer (for example because it was created by a different state backend) this method should throw an IOException.

        Specified by:
        resolveCheckpoint in interface CheckpointStorageCoordinatorView
        Parameters:
        checkpointPointer - The external checkpoint pointer to resolve.
        Returns:
        The checkpoint location handle.
        Throws:
        IOException - Thrown, if the state backend does not understand the pointer, or if the pointer could not be resolved due to an I/O error.
      • initializeLocationForSavepoint

        public CheckpointStorageLocation initializeLocationForSavepoint​(long checkpointId,
                                                                        @Nullable
                                                                        String externalLocationPointer)
                                                                 throws IOException
        Creates a file system based storage location for a savepoint.

        This methods implements the logic that decides which location to use (given optional parameters for a configured location and a location passed for this specific savepoint) and how to name and initialize the savepoint directory.

        Specified by:
        initializeLocationForSavepoint in interface CheckpointStorageCoordinatorView
        Parameters:
        externalLocationPointer - The target location pointer for the savepoint. Must be a valid URI. Null, if not supplied.
        checkpointId - The checkpoint ID of the savepoint.
        Returns:
        The checkpoint storage location for the savepoint.
        Throws:
        IOException - Thrown if the target directory could not be created.
      • getCheckpointDirectoryForJob

        protected static Path getCheckpointDirectoryForJob​(Path baseCheckpointPath,
                                                           JobID jobId)
        Builds directory into which a specific job checkpoints, meaning the directory inside which it creates the checkpoint-specific subdirectories.

        This method only succeeds if a base checkpoint directory has been set; otherwise the method fails with an exception.

        Parameters:
        jobId - The ID of the job
        Returns:
        The job's checkpoint directory, re
        Throws:
        UnsupportedOperationException - Thrown, if no base checkpoint directory has been set.
      • createCheckpointDirectory

        protected static Path createCheckpointDirectory​(Path baseDirectory,
                                                        long checkpointId)
        Creates the directory path for the data exclusive to a specific checkpoint.
        Parameters:
        baseDirectory - The base directory into which the job checkpoints.
        checkpointId - The ID (logical timestamp) of the checkpoint.
      • resolveCheckpointPointer

        @Internal
        public static FsCompletedCheckpointStorageLocation resolveCheckpointPointer​(String checkpointPointer)
                                                                             throws IOException
        Takes the given string (representing a pointer to a checkpoint) and resolves it to a file status for the checkpoint's metadata file.
        Parameters:
        checkpointPointer - The pointer to resolve.
        Returns:
        A state handle to checkpoint/savepoint's metadata.
        Throws:
        IOException - Thrown, if the pointer cannot be resolved, the file system not accessed, or the pointer points to a location that does not seem to be a checkpoint/savepoint.
      • encodePathAsReference

        public static CheckpointStorageLocationReference encodePathAsReference​(Path path)
        Encodes the given path as a reference in bytes. The path is encoded as a UTF-8 string and prepended as a magic number.
        Parameters:
        path - The path to encode.
        Returns:
        The location reference.
      • decodePathFromReference

        public static Path decodePathFromReference​(CheckpointStorageLocationReference reference)
        Decodes the given reference into a path. This method validates that the reference bytes start with the correct magic number (as written by encodePathAsReference(Path)) and converts the remaining bytes back to a proper path.
        Parameters:
        reference - The bytes representing the reference.
        Returns:
        The path decoded from the reference.
        Throws:
        IllegalArgumentException - Thrown, if the bytes do not represent a proper reference.