public abstract class FileMergingSnapshotManagerBase extends Object implements FileMergingSnapshotManager
FileMergingSnapshotManager
.FileMergingSnapshotManager.SpaceStat, FileMergingSnapshotManager.SubtaskKey
Modifier and Type | Field and Description |
---|---|
protected Path |
checkpointDir |
protected PhysicalFilePool.Type |
filePoolType
Type of physical file pool.
|
protected FileSystem |
fs
The
FileSystem that this manager works on. |
protected Executor |
ioExecutor
The executor for I/O operations in this manager.
|
protected Object |
lock
Guard for uploadedStates.
|
protected Path |
managedExclusiveStateDir
The private state files are merged across subtasks, there is only one directory for
merged-files within one TM per job.
|
protected DirectoryStreamStateHandle |
managedExclusiveStateDirHandle
The
DirectoryStreamStateHandle for private state directory, one for each task
manager. |
protected long |
maxPhysicalFileSize
Max size for a physical file.
|
protected PhysicalFile.PhysicalFileDeleter |
physicalFileDeleter |
protected Path |
sharedStateDir |
protected boolean |
shouldSyncAfterClosingLogicalFile
File-system dependent value.
|
protected FileMergingSnapshotManager.SpaceStat |
spaceStat
The current space statistic, updated on file creation/deletion.
|
protected Path |
taskOwnedStateDir |
protected TreeMap<Long,Set<LogicalFile>> |
uploadedStates |
protected int |
writeBufferSize
The buffer size for writing files to the file system.
|
Constructor and Description |
---|
FileMergingSnapshotManagerBase(String id,
long maxFileSize,
PhysicalFilePool.Type filePoolType,
Executor ioExecutor) |
Modifier and Type | Method and Description |
---|---|
void |
close() |
FileMergingCheckpointStateOutputStream |
createCheckpointStateOutputStream(FileMergingSnapshotManager.SubtaskKey subtaskKey,
long checkpointId,
CheckpointedStateScope scope)
Create a new
FileMergingCheckpointStateOutputStream . |
protected LogicalFile |
createLogicalFile(PhysicalFile physicalFile,
long startOffset,
long length,
FileMergingSnapshotManager.SubtaskKey subtaskKey)
Create a logical file on a physical file.
|
protected PhysicalFile |
createPhysicalFile(FileMergingSnapshotManager.SubtaskKey subtaskKey,
CheckpointedStateScope scope)
Create a physical file in right location (managed directory), which is specified by scope of
this checkpoint and current subtask.
|
protected PhysicalFilePool |
createPhysicalPool()
Create physical pool by filePoolType.
|
protected void |
deletePhysicalFile(Path filePath,
long size)
Delete a physical file by given file path.
|
protected abstract void |
discardCheckpoint(long checkpointId)
The callback which will be triggered when all subtasks discarded (aborted or subsumed).
|
void |
discardSingleLogicalFile(LogicalFile logicalFile,
long checkpointId) |
protected Path |
generatePhysicalFilePath(Path dirPath)
Generate a file path for a physical file.
|
LogicalFile |
getLogicalFile(LogicalFile.LogicalFileId fileId) |
Path |
getManagedDir(FileMergingSnapshotManager.SubtaskKey subtaskKey,
CheckpointedStateScope scope)
Get the managed directory of the file-merging snapshot manager, created in
FileMergingSnapshotManager.initFileSystem(org.apache.flink.core.fs.FileSystem, org.apache.flink.core.fs.Path, org.apache.flink.core.fs.Path, org.apache.flink.core.fs.Path, int) or FileMergingSnapshotManager.registerSubtaskForSharedStates(org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey) . |
DirectoryStreamStateHandle |
getManagedDirStateHandle(FileMergingSnapshotManager.SubtaskKey subtaskKey,
CheckpointedStateScope scope)
Get the
DirectoryStreamStateHandle of the managed directory, created in FileMergingSnapshotManager.initFileSystem(org.apache.flink.core.fs.FileSystem, org.apache.flink.core.fs.Path, org.apache.flink.core.fs.Path, org.apache.flink.core.fs.Path, int) or FileMergingSnapshotManager.registerSubtaskForSharedStates(org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey) . |
protected abstract PhysicalFile |
getOrCreatePhysicalFileForCheckpoint(FileMergingSnapshotManager.SubtaskKey subtaskKey,
long checkpointId,
CheckpointedStateScope scope)
Get a reused physical file or create one.
|
void |
initFileSystem(FileSystem fileSystem,
Path checkpointBaseDir,
Path sharedStateDir,
Path taskOwnedStateDir,
int writeBufferSize)
Initialize the file system, recording the checkpoint path the manager should work with.
|
void |
notifyCheckpointAborted(FileMergingSnapshotManager.SubtaskKey subtaskKey,
long checkpointId)
This method is called as a notification once a distributed checkpoint has been aborted.
|
void |
notifyCheckpointComplete(FileMergingSnapshotManager.SubtaskKey subtaskKey,
long checkpointId)
Notifies the manager that the checkpoint with the given
checkpointId completed and
was committed. |
void |
notifyCheckpointSubsumed(FileMergingSnapshotManager.SubtaskKey subtaskKey,
long checkpointId)
This method is called as a notification once a distributed checkpoint has been subsumed.
|
void |
registerSubtaskForSharedStates(FileMergingSnapshotManager.SubtaskKey subtaskKey)
Register a subtask and create the managed directory for shared states.
|
void |
restoreStateHandles(long checkpointId,
FileMergingSnapshotManager.SubtaskKey subtaskKey,
java.util.stream.Stream<SegmentFileStateHandle> stateHandles)
Restore and re-register the SegmentFileStateHandles into FileMergingSnapshotManager.
|
protected abstract void |
returnPhysicalFileForNextReuse(FileMergingSnapshotManager.SubtaskKey subtaskKey,
long checkpointId,
PhysicalFile physicalFile)
Try to return an existing physical file to the manager for next reuse.
|
void |
reusePreviousStateHandle(long checkpointId,
Collection<? extends StreamStateHandle> stateHandles)
A callback method which is called when previous state handles are reused by following
checkpoint(s).
|
protected final Executor ioExecutor
protected final Object lock
protected TreeMap<Long,Set<LogicalFile>> uploadedStates
protected FileSystem fs
FileSystem
that this manager works on.protected Path checkpointDir
protected Path sharedStateDir
protected Path taskOwnedStateDir
protected int writeBufferSize
protected boolean shouldSyncAfterClosingLogicalFile
protected long maxPhysicalFileSize
protected PhysicalFilePool.Type filePoolType
protected PhysicalFile.PhysicalFileDeleter physicalFileDeleter
protected Path managedExclusiveStateDir
protected DirectoryStreamStateHandle managedExclusiveStateDirHandle
DirectoryStreamStateHandle
for private state directory, one for each task
manager.protected FileMergingSnapshotManager.SpaceStat spaceStat
public FileMergingSnapshotManagerBase(String id, long maxFileSize, PhysicalFilePool.Type filePoolType, Executor ioExecutor)
public void initFileSystem(FileSystem fileSystem, Path checkpointBaseDir, Path sharedStateDir, Path taskOwnedStateDir, int writeBufferSize) throws IllegalArgumentException
FileMergingSnapshotManager
The layout of checkpoint directory: /user-defined-checkpoint-dir /{job-id} (checkpointBaseDir) | + --shared/ | + --subtask-1/ + -- merged shared state files + --subtask-2/ + -- merged shared state files + --taskowned/ + -- merged private state files + --chk-1/ + --chk-2/ + --chk-3/
The reason why initializing directories in this method instead of the constructor is that
the FileMergingSnapshotManager itself belongs to the TaskStateManager
, which is
initialized when receiving a task, while the base directories for checkpoint are created by
FsCheckpointStorageAccess
when the state backend initializes per subtask. After the
checkpoint directories are initialized, the managed subdirectories are initialized here.
Note: This method may be called several times, the implementation should ensure
idempotency, and throw IllegalArgumentException
when any of the path in params change
across function calls.
initFileSystem
in interface FileMergingSnapshotManager
fileSystem
- The filesystem to write to.checkpointBaseDir
- The base directory for checkpoints.sharedStateDir
- The directory for shared checkpoint data.taskOwnedStateDir
- The name of the directory for state not owned/released by the
master, but by the TaskManagers.writeBufferSize
- The buffer size for writing files to the file system.IllegalArgumentException
- thrown if these three paths are not deterministic across
calls.public void registerSubtaskForSharedStates(FileMergingSnapshotManager.SubtaskKey subtaskKey)
FileMergingSnapshotManager
registerSubtaskForSharedStates
in interface FileMergingSnapshotManager
subtaskKey
- the subtask key identifying a subtask.for layout information.
protected LogicalFile createLogicalFile(@Nonnull PhysicalFile physicalFile, long startOffset, long length, @Nonnull FileMergingSnapshotManager.SubtaskKey subtaskKey)
physicalFile
- the underlying physical file.startOffset
- the offset in the physical file that the logical file starts from.length
- the length of the logical file.subtaskKey
- the id of the subtask that the logical file belongs to.@Nonnull protected PhysicalFile createPhysicalFile(FileMergingSnapshotManager.SubtaskKey subtaskKey, CheckpointedStateScope scope) throws IOException
subtaskKey
- the SubtaskKey
of current subtask.scope
- the scope of the checkpoint.IOException
- if anything goes wrong with file system.public FileMergingCheckpointStateOutputStream createCheckpointStateOutputStream(FileMergingSnapshotManager.SubtaskKey subtaskKey, long checkpointId, CheckpointedStateScope scope)
FileMergingSnapshotManager
FileMergingCheckpointStateOutputStream
. According to the file merging
strategy, the streams returned by multiple calls to this function may share the same
underlying physical file, and each stream writes to a segment of the physical file.createCheckpointStateOutputStream
in interface FileMergingSnapshotManager
subtaskKey
- The subtask key identifying the subtask.checkpointId
- ID of the checkpoint.scope
- The state's scope, whether it is exclusive or shared.protected Path generatePhysicalFilePath(Path dirPath)
dirPath
- the parent directory path for the physical file.protected final void deletePhysicalFile(Path filePath, long size)
filePath
- the given file path to delete.protected final PhysicalFilePool createPhysicalPool()
@Nonnull protected abstract PhysicalFile getOrCreatePhysicalFileForCheckpoint(FileMergingSnapshotManager.SubtaskKey subtaskKey, long checkpointId, CheckpointedStateScope scope) throws IOException
Basic logic of file reusing: whenever a physical file is needed, this method is called
with necessary information provided for acquiring a file. The file will not be reused until
it is written and returned to the reused pool by calling returnPhysicalFileForNextReuse(org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey, long, org.apache.flink.runtime.checkpoint.filemerging.PhysicalFile)
.
subtaskKey
- the subtask key for the callercheckpointId
- the checkpoint idscope
- checkpoint scopeIOException
- thrown if anything goes wrong with file system.protected abstract void returnPhysicalFileForNextReuse(FileMergingSnapshotManager.SubtaskKey subtaskKey, long checkpointId, PhysicalFile physicalFile) throws IOException
Basic logic of file reusing, see getOrCreatePhysicalFileForCheckpoint(org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey, long, org.apache.flink.runtime.state.CheckpointedStateScope)
.
subtaskKey
- the subtask key for the callercheckpointId
- in which checkpoint this physical file is requested.physicalFile
- the returning checkpointIOException
- thrown if anything goes wrong with file system.#getOrCreatePhysicalFileForCheckpoint(SubtaskKey, long, CheckpointedStateScope)
protected abstract void discardCheckpoint(long checkpointId) throws IOException
checkpointId
- the discarded checkpoint id.IOException
- if anything goes wrong with file system.public void notifyCheckpointComplete(FileMergingSnapshotManager.SubtaskKey subtaskKey, long checkpointId) throws Exception
FileMergingSnapshotManager
checkpointId
completed and
was committed.notifyCheckpointComplete
in interface FileMergingSnapshotManager
subtaskKey
- the subtask key identifying the subtask.checkpointId
- The ID of the checkpoint that has been completed.Exception
- thrown if anything goes wrong with the listener.public void notifyCheckpointAborted(FileMergingSnapshotManager.SubtaskKey subtaskKey, long checkpointId) throws Exception
FileMergingSnapshotManager
notifyCheckpointAborted
in interface FileMergingSnapshotManager
subtaskKey
- the subtask key identifying the subtask.checkpointId
- The ID of the checkpoint that has been completed.Exception
- thrown if anything goes wrong with the listener.public void notifyCheckpointSubsumed(FileMergingSnapshotManager.SubtaskKey subtaskKey, long checkpointId) throws Exception
FileMergingSnapshotManager
notifyCheckpointSubsumed
in interface FileMergingSnapshotManager
subtaskKey
- the subtask key identifying the subtask.checkpointId
- The ID of the checkpoint that has been completed.Exception
- thrown if anything goes wrong with the listener.public void reusePreviousStateHandle(long checkpointId, Collection<? extends StreamStateHandle> stateHandles)
FileMergingSnapshotManager
reusePreviousStateHandle
in interface FileMergingSnapshotManager
checkpointId
- the checkpoint that reuses the handles.stateHandles
- the handles to be reused.public void discardSingleLogicalFile(LogicalFile logicalFile, long checkpointId) throws IOException
IOException
public Path getManagedDir(FileMergingSnapshotManager.SubtaskKey subtaskKey, CheckpointedStateScope scope)
FileMergingSnapshotManager
FileMergingSnapshotManager.initFileSystem(org.apache.flink.core.fs.FileSystem, org.apache.flink.core.fs.Path, org.apache.flink.core.fs.Path, org.apache.flink.core.fs.Path, int)
or FileMergingSnapshotManager.registerSubtaskForSharedStates(org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey)
.getManagedDir
in interface FileMergingSnapshotManager
subtaskKey
- the subtask key identifying the subtask.scope
- the checkpoint scope.public DirectoryStreamStateHandle getManagedDirStateHandle(FileMergingSnapshotManager.SubtaskKey subtaskKey, CheckpointedStateScope scope)
FileMergingSnapshotManager
DirectoryStreamStateHandle
of the managed directory, created in FileMergingSnapshotManager.initFileSystem(org.apache.flink.core.fs.FileSystem, org.apache.flink.core.fs.Path, org.apache.flink.core.fs.Path, org.apache.flink.core.fs.Path, int)
or FileMergingSnapshotManager.registerSubtaskForSharedStates(org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey)
.getManagedDirStateHandle
in interface FileMergingSnapshotManager
subtaskKey
- the subtask key identifying the subtask.scope
- the checkpoint scope.DirectoryStreamStateHandle
for one subtask in specified checkpoint scope.public void close() throws IOException
close
in interface Closeable
close
in interface AutoCloseable
IOException
public void restoreStateHandles(long checkpointId, FileMergingSnapshotManager.SubtaskKey subtaskKey, java.util.stream.Stream<SegmentFileStateHandle> stateHandles)
FileMergingSnapshotManager
restoreStateHandles
in interface FileMergingSnapshotManager
checkpointId
- the restored checkpoint id.subtaskKey
- the subtask key identifying the subtask.stateHandles
- the restored segment file handles.@VisibleForTesting public LogicalFile getLogicalFile(LogicalFile.LogicalFileId fileId)
Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.