Class CheckpointCoordinator
- java.lang.Object
-
- org.apache.flink.runtime.checkpoint.CheckpointCoordinator
-
public class CheckpointCoordinator extends Object
The checkpoint coordinator coordinates the distributed snapshots of operators and state. It triggers the checkpoint by sending the messages to the relevant tasks and collects the checkpoint acknowledgements. It also collects and maintains the overview of the state handles reported by the tasks that acknowledge the checkpoint.
-
-
Constructor Summary
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
abortPendingCheckpoints(CheckpointException exception)
Aborts all the pending checkpoints due to en exception.boolean
addMasterHook(MasterTriggerRestoreHook<?> hook)
Adds the given master hook to the checkpoint coordinator.JobStatusListener
createActivatorDeactivator(boolean allTasksOutputNonBlocking)
CheckpointStorageCoordinatorView
getCheckpointStorage()
CompletedCheckpointStore
getCheckpointStore()
long
getCheckpointTimeout()
int
getNumberOfPendingCheckpoints()
int
getNumberOfRegisteredMasterHooks()
Gets the number of currently register master hooks.int
getNumberOfRetainedSuccessfulCheckpoints()
Map<Long,PendingCheckpoint>
getPendingCheckpoints()
ArrayDeque<Long>
getRecentExpiredCheckpoints()
List<CompletedCheckpoint>
getSuccessfulCheckpoints()
boolean
isPeriodicCheckpointingConfigured()
Returns whether periodic checkpointing has been configured.boolean
isPeriodicCheckpointingStarted()
boolean
isShutdown()
boolean
isTriggering()
boolean
receiveAcknowledgeMessage(AcknowledgeCheckpoint message, String taskManagerLocationInfo)
Receives an AcknowledgeCheckpoint message and returns whether the message was associated with a pending checkpoint.void
receiveDeclineMessage(DeclineCheckpoint message, String taskManagerLocationInfo)
Receives aDeclineCheckpoint
message for a pending checkpoint.void
reportCheckpointMetrics(long id, ExecutionAttemptID attemptId, CheckpointMetrics metrics)
void
reportInitializationMetrics(ExecutionAttemptID executionAttemptID, SubTaskInitializationMetrics initializationMetrics)
boolean
restoreInitialCheckpointIfPresent(Set<ExecutionJobVertex> tasks)
Restores the latest checkpointed at the beginning of the job execution.boolean
restoreLatestCheckpointedStateToAll(Set<ExecutionJobVertex> tasks, boolean allowNonRestoredState)
Restores the latest checkpointed state to all tasks and all coordinators.OptionalLong
restoreLatestCheckpointedStateToSubtasks(Set<ExecutionJobVertex> tasks)
Restores the latest checkpointed state to a set of subtasks.boolean
restoreSavepoint(SavepointRestoreSettings restoreSettings, Map<JobVertexID,ExecutionJobVertex> tasks, ClassLoader userClassLoader)
Restore the state with given savepoint.void
setIsProcessingBacklog(OperatorID operatorID, boolean isProcessingBacklog)
Reports whether a source operator is currently processing backlog.void
shutdown()
Shuts down the checkpoint coordinator.void
startCheckpointScheduler()
void
stopCheckpointScheduler()
CompletableFuture<CompletedCheckpoint>
triggerCheckpoint(boolean isPeriodic)
Triggers a new standard checkpoint and uses the given timestamp as the checkpoint timestamp.CompletableFuture<CompletedCheckpoint>
triggerCheckpoint(CheckpointType checkpointType)
Triggers one new checkpoint with the given checkpointType.CompletableFuture<CompletedCheckpoint>
triggerSavepoint(String targetLocation, SavepointFormatType formatType)
Triggers a savepoint with the given savepoint directory as a target.CompletableFuture<CompletedCheckpoint>
triggerSynchronousSavepoint(boolean terminate, String targetLocation, SavepointFormatType formatType)
Triggers a synchronous savepoint with the given savepoint directory as a target.
-
-
-
Constructor Detail
-
CheckpointCoordinator
public CheckpointCoordinator(JobID job, CheckpointCoordinatorConfiguration chkConfig, Collection<OperatorCoordinatorCheckpointContext> coordinatorsToCheckpoint, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore, CheckpointStorage checkpointStorage, Executor executor, CheckpointsCleaner checkpointsCleaner, ScheduledExecutor timer, CheckpointFailureManager failureManager, CheckpointPlanCalculator checkpointPlanCalculator, CheckpointStatsTracker statsTracker)
-
CheckpointCoordinator
@VisibleForTesting public CheckpointCoordinator(JobID job, CheckpointCoordinatorConfiguration chkConfig, Collection<OperatorCoordinatorCheckpointContext> coordinatorsToCheckpoint, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore, CheckpointStorage checkpointStorage, Executor executor, CheckpointsCleaner checkpointsCleaner, ScheduledExecutor timer, CheckpointFailureManager failureManager, CheckpointPlanCalculator checkpointPlanCalculator, Clock clock, CheckpointStatsTracker statsTracker, BiFunction<Set<ExecutionJobVertex>,Map<OperatorID,OperatorState>,VertexFinishedStateChecker> vertexFinishedStateCheckerFactory)
-
-
Method Detail
-
addMasterHook
public boolean addMasterHook(MasterTriggerRestoreHook<?> hook)
Adds the given master hook to the checkpoint coordinator. This method does nothing, if the checkpoint coordinator already contained a hook with the same ID (as defined viaMasterTriggerRestoreHook.getIdentifier()
).- Parameters:
hook
- The hook to add.- Returns:
- True, if the hook was added, false if the checkpoint coordinator already contained a hook with the same ID.
-
getNumberOfRegisteredMasterHooks
public int getNumberOfRegisteredMasterHooks()
Gets the number of currently register master hooks.
-
shutdown
public void shutdown() throws Exception
Shuts down the checkpoint coordinator.After this method has been called, the coordinator does not accept and further messages and cannot trigger any further checkpoints.
- Throws:
Exception
-
isShutdown
public boolean isShutdown()
-
setIsProcessingBacklog
public void setIsProcessingBacklog(OperatorID operatorID, boolean isProcessingBacklog)
Reports whether a source operator is currently processing backlog.If any source operator is processing backlog, the checkpoint interval would be decided by
execution.checkpointing.interval-during-backlog
instead ofexecution.checkpointing.interval
.If a source has not invoked this method, the source is considered to have isProcessingBacklog=false. If a source operator has invoked this method multiple times, the last reported value is used.
- Parameters:
operatorID
- the operator ID of the source operator.isProcessingBacklog
- whether the source operator is processing backlog.
-
triggerSavepoint
public CompletableFuture<CompletedCheckpoint> triggerSavepoint(@Nullable String targetLocation, SavepointFormatType formatType)
Triggers a savepoint with the given savepoint directory as a target.- Parameters:
targetLocation
- Target location for the savepoint, optional. If null, the state backend's configured default will be used.- Returns:
- A future to the completed checkpoint
- Throws:
IllegalStateException
- If no savepoint directory has been specified and no default savepoint directory has been configured
-
triggerSynchronousSavepoint
public CompletableFuture<CompletedCheckpoint> triggerSynchronousSavepoint(boolean terminate, @Nullable String targetLocation, SavepointFormatType formatType)
Triggers a synchronous savepoint with the given savepoint directory as a target.- Parameters:
terminate
- flag indicating if the job should terminate or just suspendtargetLocation
- Target location for the savepoint, optional. If null, the state backend's configured default will be used.- Returns:
- A future to the completed checkpoint
- Throws:
IllegalStateException
- If no savepoint directory has been specified and no default savepoint directory has been configured
-
triggerCheckpoint
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(boolean isPeriodic)
Triggers a new standard checkpoint and uses the given timestamp as the checkpoint timestamp. The return value is a future. It completes when the checkpoint triggered finishes or an error occurred.- Parameters:
isPeriodic
- Flag indicating whether this triggered checkpoint is periodic.- Returns:
- a future to the completed checkpoint.
-
triggerCheckpoint
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType checkpointType)
Triggers one new checkpoint with the given checkpointType. The returned future completes when the triggered checkpoint finishes or an error occurred.- Parameters:
checkpointType
- specifies the backup type of the checkpoint to trigger.- Returns:
- a future to the completed checkpoint.
-
receiveDeclineMessage
public void receiveDeclineMessage(DeclineCheckpoint message, String taskManagerLocationInfo)
Receives aDeclineCheckpoint
message for a pending checkpoint.- Parameters:
message
- Checkpoint decline from the task managertaskManagerLocationInfo
- The location info of the decline checkpoint message's sender
-
receiveAcknowledgeMessage
public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message, String taskManagerLocationInfo) throws CheckpointException
Receives an AcknowledgeCheckpoint message and returns whether the message was associated with a pending checkpoint.- Parameters:
message
- Checkpoint ack from the task managertaskManagerLocationInfo
- The location of the acknowledge checkpoint message's sender- Returns:
- Flag indicating whether the ack'd checkpoint was associated with a pending checkpoint.
- Throws:
CheckpointException
- If the checkpoint cannot be added to the completed checkpoint store.
-
restoreLatestCheckpointedStateToSubtasks
public OptionalLong restoreLatestCheckpointedStateToSubtasks(Set<ExecutionJobVertex> tasks) throws Exception
Restores the latest checkpointed state to a set of subtasks. This method represents a "local" or "regional" failover and does restore states to coordinators. Note that a regional failover might still include all tasks.- Parameters:
tasks
- Set of job vertices to restore. State for these vertices is restored viaExecution.setInitialState(JobManagerTaskRestore)
.- Returns:
- An
OptionalLong
with the checkpoint ID, if state was restored, an emptyOptionalLong
otherwise. - Throws:
IllegalStateException
- If the CheckpointCoordinator is shut down.IllegalStateException
- If no completed checkpoint is available and thefailIfNoCheckpoint
flag has been set.IllegalStateException
- If the checkpoint contains state that cannot be mapped to any job vertex intasks
and theallowNonRestoredState
flag has not been set.IllegalStateException
- If the max parallelism changed for an operator that restores state from this checkpoint.IllegalStateException
- If the parallelism changed for an operator that restores non-partitioned state from this checkpoint.Exception
-
restoreLatestCheckpointedStateToAll
public boolean restoreLatestCheckpointedStateToAll(Set<ExecutionJobVertex> tasks, boolean allowNonRestoredState) throws Exception
Restores the latest checkpointed state to all tasks and all coordinators. This method represents a "global restore"-style operation where all stateful tasks and coordinators from the given set of Job Vertices are restored. are restored to their latest checkpointed state.- Parameters:
tasks
- Set of job vertices to restore. State for these vertices is restored viaExecution.setInitialState(JobManagerTaskRestore)
.allowNonRestoredState
- Allow checkpoint state that cannot be mapped to any job vertex in tasks.- Returns:
true
if state was restored,false
otherwise.- Throws:
IllegalStateException
- If the CheckpointCoordinator is shut down.IllegalStateException
- If no completed checkpoint is available and thefailIfNoCheckpoint
flag has been set.IllegalStateException
- If the checkpoint contains state that cannot be mapped to any job vertex intasks
and theallowNonRestoredState
flag has not been set.IllegalStateException
- If the max parallelism changed for an operator that restores state from this checkpoint.IllegalStateException
- If the parallelism changed for an operator that restores non-partitioned state from this checkpoint.Exception
-
restoreInitialCheckpointIfPresent
public boolean restoreInitialCheckpointIfPresent(Set<ExecutionJobVertex> tasks) throws Exception
Restores the latest checkpointed at the beginning of the job execution. If there is a checkpoint, this method acts like a "global restore"-style operation where all stateful tasks and coordinators from the given set of Job Vertices are restored.- Parameters:
tasks
- Set of job vertices to restore. State for these vertices is restored viaExecution.setInitialState(JobManagerTaskRestore)
.- Returns:
- True, if a checkpoint was found and its state was restored, false otherwise.
- Throws:
Exception
-
restoreSavepoint
public boolean restoreSavepoint(SavepointRestoreSettings restoreSettings, Map<JobVertexID,ExecutionJobVertex> tasks, ClassLoader userClassLoader) throws Exception
Restore the state with given savepoint.- Parameters:
restoreSettings
- Settings for a snapshot to restore from. Includes the path and parameters for the restore process.tasks
- Map of job vertices to restore. State for these vertices is restored viaExecution.setInitialState(JobManagerTaskRestore)
.userClassLoader
- The class loader to resolve serialized classes in legacy savepoint versions.- Throws:
Exception
-
getNumberOfPendingCheckpoints
public int getNumberOfPendingCheckpoints()
-
getNumberOfRetainedSuccessfulCheckpoints
public int getNumberOfRetainedSuccessfulCheckpoints()
-
getPendingCheckpoints
public Map<Long,PendingCheckpoint> getPendingCheckpoints()
-
getSuccessfulCheckpoints
public List<CompletedCheckpoint> getSuccessfulCheckpoints() throws Exception
- Throws:
Exception
-
getRecentExpiredCheckpoints
@VisibleForTesting public ArrayDeque<Long> getRecentExpiredCheckpoints()
-
getCheckpointStorage
public CheckpointStorageCoordinatorView getCheckpointStorage()
-
getCheckpointStore
public CompletedCheckpointStore getCheckpointStore()
-
getCheckpointTimeout
public long getCheckpointTimeout()
-
isTriggering
public boolean isTriggering()
-
isPeriodicCheckpointingConfigured
public boolean isPeriodicCheckpointingConfigured()
Returns whether periodic checkpointing has been configured.- Returns:
true
if periodic checkpoints have been configured.
-
startCheckpointScheduler
public void startCheckpointScheduler()
-
stopCheckpointScheduler
public void stopCheckpointScheduler()
-
isPeriodicCheckpointingStarted
public boolean isPeriodicCheckpointingStarted()
-
abortPendingCheckpoints
public void abortPendingCheckpoints(CheckpointException exception)
Aborts all the pending checkpoints due to en exception.- Parameters:
exception
- The exception.
-
createActivatorDeactivator
public JobStatusListener createActivatorDeactivator(boolean allTasksOutputNonBlocking)
-
reportCheckpointMetrics
public void reportCheckpointMetrics(long id, ExecutionAttemptID attemptId, CheckpointMetrics metrics)
-
reportInitializationMetrics
public void reportInitializationMetrics(ExecutionAttemptID executionAttemptID, SubTaskInitializationMetrics initializationMetrics)
-
-