Class AbstractFsCheckpointStorageAccess
- java.lang.Object
-
- org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess
-
- All Implemented Interfaces:
CheckpointStorageAccess
,CheckpointStorageCoordinatorView
,CheckpointStorageWorkerView
- Direct Known Subclasses:
FsCheckpointStorageAccess
,MemoryBackendCheckpointStorageAccess
public abstract class AbstractFsCheckpointStorageAccess extends Object implements CheckpointStorageAccess
An implementation of durable checkpoint storage to file systems.Checkpoint Layout
The checkpoint storage is configured with a base directory and persists the checkpoint data of specific checkpoints in specific subdirectories. For example, if the base directory was set to
hdfs://namenode:port/flink-checkpoints/
, the state backend will create a subdirectory with the job's ID that will contain the actual checkpoints: (hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b
)Each checkpoint individually will store all its files in a subdirectory that includes the checkpoint number, such as
hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b/chk-17/
.Savepoint Layout
A savepoint that is set to be stored in path
hdfs://namenode:port/flink-savepoints/
, will create a subdirectorysavepoint-jobId(0, 6)-randomDigits
in which it stores all savepoint data. The random digits are added as "entropy" to avoid directory collisions.Metadata File
A completed checkpoint writes its metadata into a file '"_metadata"'.
-
-
Field Summary
Fields Modifier and Type Field Description static String
CHECKPOINT_DIR_PREFIX
The prefix of the directory containing the data exclusive to a checkpoint.static String
CHECKPOINT_SHARED_STATE_DIR
The name of the directory for shared checkpoint state.static String
CHECKPOINT_TASK_OWNED_STATE_DIR
The name of the directory for state not owned/released by the master, but by the TaskManagers.static String
METADATA_FILE_NAME
The name of the metadata files in checkpoints / savepoints.
-
Constructor Summary
Constructors Modifier Constructor Description protected
AbstractFsCheckpointStorageAccess(JobID jobId, Path defaultSavepointDirectory)
Creates a new checkpoint storage.
-
Method Summary
All Methods Static Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description protected static Path
createCheckpointDirectory(Path baseDirectory, long checkpointId)
Creates the directory path for the data exclusive to a specific checkpoint.protected abstract CheckpointStorageLocation
createSavepointLocation(FileSystem fs, Path location)
static Path
decodePathFromReference(CheckpointStorageLocationReference reference)
Decodes the given reference into a path.static CheckpointStorageLocationReference
encodePathAsReference(Path path)
Encodes the given path as a reference in bytes.protected static Path
getCheckpointDirectoryForJob(Path baseCheckpointPath, JobID jobId)
Builds directory into which a specific job checkpoints, meaning the directory inside which it creates the checkpoint-specific subdirectories.Path
getDefaultSavepointDirectory()
Gets the default directory for savepoints.boolean
hasDefaultSavepointLocation()
Checks whether the storage has a default savepoint location configured.CheckpointStorageLocation
initializeLocationForSavepoint(long checkpointId, String externalLocationPointer)
Creates a file system based storage location for a savepoint.CompletedCheckpointStorageLocation
resolveCheckpoint(String checkpointPointer)
Resolves the given pointer to a checkpoint/savepoint into a checkpoint location.static FsCompletedCheckpointStorageLocation
resolveCheckpointPointer(String checkpointPointer)
Takes the given string (representing a pointer to a checkpoint) and resolves it to a file status for the checkpoint's metadata file.-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.apache.flink.runtime.state.CheckpointStorageCoordinatorView
initializeBaseLocationsForCheckpoint, initializeLocationForCheckpoint, supportsHighlyAvailableStorage
-
Methods inherited from interface org.apache.flink.runtime.state.CheckpointStorageWorkerView
createTaskOwnedCheckpointStateToolset, createTaskOwnedStateStream, resolveCheckpointStorageLocation, toFileMergingStorage
-
-
-
-
Field Detail
-
CHECKPOINT_DIR_PREFIX
public static final String CHECKPOINT_DIR_PREFIX
The prefix of the directory containing the data exclusive to a checkpoint.- See Also:
- Constant Field Values
-
CHECKPOINT_SHARED_STATE_DIR
public static final String CHECKPOINT_SHARED_STATE_DIR
The name of the directory for shared checkpoint state.- See Also:
- Constant Field Values
-
CHECKPOINT_TASK_OWNED_STATE_DIR
public static final String CHECKPOINT_TASK_OWNED_STATE_DIR
The name of the directory for state not owned/released by the master, but by the TaskManagers.- See Also:
- Constant Field Values
-
METADATA_FILE_NAME
public static final String METADATA_FILE_NAME
The name of the metadata files in checkpoints / savepoints.- See Also:
- Constant Field Values
-
-
Constructor Detail
-
AbstractFsCheckpointStorageAccess
protected AbstractFsCheckpointStorageAccess(JobID jobId, @Nullable Path defaultSavepointDirectory)
Creates a new checkpoint storage.- Parameters:
jobId
- The ID of the job that writes the checkpoints.defaultSavepointDirectory
- The default location for savepoints, or null, if none is set.
-
-
Method Detail
-
getDefaultSavepointDirectory
@Nullable public Path getDefaultSavepointDirectory()
Gets the default directory for savepoints. Returns null, if no default savepoint directory is configured.
-
hasDefaultSavepointLocation
public boolean hasDefaultSavepointLocation()
Description copied from interface:CheckpointStorageCoordinatorView
Checks whether the storage has a default savepoint location configured.- Specified by:
hasDefaultSavepointLocation
in interfaceCheckpointStorageCoordinatorView
-
resolveCheckpoint
public CompletedCheckpointStorageLocation resolveCheckpoint(String checkpointPointer) throws IOException
Description copied from interface:CheckpointStorageCoordinatorView
Resolves the given pointer to a checkpoint/savepoint into a checkpoint location. The location supports reading the checkpoint metadata, or disposing the checkpoint storage location.If the state backend cannot understand the format of the pointer (for example because it was created by a different state backend) this method should throw an
IOException
.- Specified by:
resolveCheckpoint
in interfaceCheckpointStorageCoordinatorView
- Parameters:
checkpointPointer
- The external checkpoint pointer to resolve.- Returns:
- The checkpoint location handle.
- Throws:
IOException
- Thrown, if the state backend does not understand the pointer, or if the pointer could not be resolved due to an I/O error.
-
initializeLocationForSavepoint
public CheckpointStorageLocation initializeLocationForSavepoint(long checkpointId, @Nullable String externalLocationPointer) throws IOException
Creates a file system based storage location for a savepoint.This methods implements the logic that decides which location to use (given optional parameters for a configured location and a location passed for this specific savepoint) and how to name and initialize the savepoint directory.
- Specified by:
initializeLocationForSavepoint
in interfaceCheckpointStorageCoordinatorView
- Parameters:
externalLocationPointer
- The target location pointer for the savepoint. Must be a valid URI. Null, if not supplied.checkpointId
- The checkpoint ID of the savepoint.- Returns:
- The checkpoint storage location for the savepoint.
- Throws:
IOException
- Thrown if the target directory could not be created.
-
createSavepointLocation
protected abstract CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException
- Throws:
IOException
-
getCheckpointDirectoryForJob
protected static Path getCheckpointDirectoryForJob(Path baseCheckpointPath, JobID jobId)
Builds directory into which a specific job checkpoints, meaning the directory inside which it creates the checkpoint-specific subdirectories.This method only succeeds if a base checkpoint directory has been set; otherwise the method fails with an exception.
- Parameters:
jobId
- The ID of the job- Returns:
- The job's checkpoint directory, re
- Throws:
UnsupportedOperationException
- Thrown, if no base checkpoint directory has been set.
-
createCheckpointDirectory
protected static Path createCheckpointDirectory(Path baseDirectory, long checkpointId)
Creates the directory path for the data exclusive to a specific checkpoint.- Parameters:
baseDirectory
- The base directory into which the job checkpoints.checkpointId
- The ID (logical timestamp) of the checkpoint.
-
resolveCheckpointPointer
@Internal public static FsCompletedCheckpointStorageLocation resolveCheckpointPointer(String checkpointPointer) throws IOException
Takes the given string (representing a pointer to a checkpoint) and resolves it to a file status for the checkpoint's metadata file.- Parameters:
checkpointPointer
- The pointer to resolve.- Returns:
- A state handle to checkpoint/savepoint's metadata.
- Throws:
IOException
- Thrown, if the pointer cannot be resolved, the file system not accessed, or the pointer points to a location that does not seem to be a checkpoint/savepoint.
-
encodePathAsReference
public static CheckpointStorageLocationReference encodePathAsReference(Path path)
Encodes the given path as a reference in bytes. The path is encoded as a UTF-8 string and prepended as a magic number.- Parameters:
path
- The path to encode.- Returns:
- The location reference.
-
decodePathFromReference
public static Path decodePathFromReference(CheckpointStorageLocationReference reference)
Decodes the given reference into a path. This method validates that the reference bytes start with the correct magic number (as written byencodePathAsReference(Path)
) and converts the remaining bytes back to a proper path.- Parameters:
reference
- The bytes representing the reference.- Returns:
- The path decoded from the reference.
- Throws:
IllegalArgumentException
- Thrown, if the bytes do not represent a proper reference.
-
-