Class CheckpointCoordinator


  • public class CheckpointCoordinator
    extends Object
    The checkpoint coordinator coordinates the distributed snapshots of operators and state. It triggers the checkpoint by sending the messages to the relevant tasks and collects the checkpoint acknowledgements. It also collects and maintains the overview of the state handles reported by the tasks that acknowledge the checkpoint.
    • Method Detail

      • addMasterHook

        public boolean addMasterHook​(MasterTriggerRestoreHook<?> hook)
        Adds the given master hook to the checkpoint coordinator. This method does nothing, if the checkpoint coordinator already contained a hook with the same ID (as defined via MasterTriggerRestoreHook.getIdentifier()).
        Parameters:
        hook - The hook to add.
        Returns:
        True, if the hook was added, false if the checkpoint coordinator already contained a hook with the same ID.
      • getNumberOfRegisteredMasterHooks

        public int getNumberOfRegisteredMasterHooks()
        Gets the number of currently register master hooks.
      • shutdown

        public void shutdown()
                      throws Exception
        Shuts down the checkpoint coordinator.

        After this method has been called, the coordinator does not accept and further messages and cannot trigger any further checkpoints.

        Throws:
        Exception
      • isShutdown

        public boolean isShutdown()
      • setIsProcessingBacklog

        public void setIsProcessingBacklog​(OperatorID operatorID,
                                           boolean isProcessingBacklog)
        Reports whether a source operator is currently processing backlog.

        If any source operator is processing backlog, the checkpoint interval would be decided by execution.checkpointing.interval-during-backlog instead of execution.checkpointing.interval.

        If a source has not invoked this method, the source is considered to have isProcessingBacklog=false. If a source operator has invoked this method multiple times, the last reported value is used.

        Parameters:
        operatorID - the operator ID of the source operator.
        isProcessingBacklog - whether the source operator is processing backlog.
      • triggerSavepoint

        public CompletableFuture<CompletedCheckpoint> triggerSavepoint​(@Nullable
                                                                       String targetLocation,
                                                                       SavepointFormatType formatType)
        Triggers a savepoint with the given savepoint directory as a target.
        Parameters:
        targetLocation - Target location for the savepoint, optional. If null, the state backend's configured default will be used.
        Returns:
        A future to the completed checkpoint
        Throws:
        IllegalStateException - If no savepoint directory has been specified and no default savepoint directory has been configured
      • triggerSynchronousSavepoint

        public CompletableFuture<CompletedCheckpoint> triggerSynchronousSavepoint​(boolean terminate,
                                                                                  @Nullable
                                                                                  String targetLocation,
                                                                                  SavepointFormatType formatType)
        Triggers a synchronous savepoint with the given savepoint directory as a target.
        Parameters:
        terminate - flag indicating if the job should terminate or just suspend
        targetLocation - Target location for the savepoint, optional. If null, the state backend's configured default will be used.
        Returns:
        A future to the completed checkpoint
        Throws:
        IllegalStateException - If no savepoint directory has been specified and no default savepoint directory has been configured
      • triggerCheckpoint

        public CompletableFuture<CompletedCheckpoint> triggerCheckpoint​(boolean isPeriodic)
        Triggers a new standard checkpoint and uses the given timestamp as the checkpoint timestamp. The return value is a future. It completes when the checkpoint triggered finishes or an error occurred.
        Parameters:
        isPeriodic - Flag indicating whether this triggered checkpoint is periodic.
        Returns:
        a future to the completed checkpoint.
      • triggerCheckpoint

        public CompletableFuture<CompletedCheckpoint> triggerCheckpoint​(CheckpointType checkpointType)
        Triggers one new checkpoint with the given checkpointType. The returned future completes when the triggered checkpoint finishes or an error occurred.
        Parameters:
        checkpointType - specifies the backup type of the checkpoint to trigger.
        Returns:
        a future to the completed checkpoint.
      • receiveDeclineMessage

        public void receiveDeclineMessage​(DeclineCheckpoint message,
                                          String taskManagerLocationInfo)
        Receives a DeclineCheckpoint message for a pending checkpoint.
        Parameters:
        message - Checkpoint decline from the task manager
        taskManagerLocationInfo - The location info of the decline checkpoint message's sender
      • receiveAcknowledgeMessage

        public boolean receiveAcknowledgeMessage​(AcknowledgeCheckpoint message,
                                                 String taskManagerLocationInfo)
                                          throws CheckpointException
        Receives an AcknowledgeCheckpoint message and returns whether the message was associated with a pending checkpoint.
        Parameters:
        message - Checkpoint ack from the task manager
        taskManagerLocationInfo - The location of the acknowledge checkpoint message's sender
        Returns:
        Flag indicating whether the ack'd checkpoint was associated with a pending checkpoint.
        Throws:
        CheckpointException - If the checkpoint cannot be added to the completed checkpoint store.
      • restoreLatestCheckpointedStateToSubtasks

        public OptionalLong restoreLatestCheckpointedStateToSubtasks​(Set<ExecutionJobVertex> tasks)
                                                              throws Exception
        Restores the latest checkpointed state to a set of subtasks. This method represents a "local" or "regional" failover and does restore states to coordinators. Note that a regional failover might still include all tasks.
        Parameters:
        tasks - Set of job vertices to restore. State for these vertices is restored via Execution.setInitialState(JobManagerTaskRestore).
        Returns:
        An OptionalLong with the checkpoint ID, if state was restored, an empty OptionalLong otherwise.
        Throws:
        IllegalStateException - If the CheckpointCoordinator is shut down.
        IllegalStateException - If no completed checkpoint is available and the failIfNoCheckpoint flag has been set.
        IllegalStateException - If the checkpoint contains state that cannot be mapped to any job vertex in tasks and the allowNonRestoredState flag has not been set.
        IllegalStateException - If the max parallelism changed for an operator that restores state from this checkpoint.
        IllegalStateException - If the parallelism changed for an operator that restores non-partitioned state from this checkpoint.
        Exception
      • restoreLatestCheckpointedStateToAll

        public boolean restoreLatestCheckpointedStateToAll​(Set<ExecutionJobVertex> tasks,
                                                           boolean allowNonRestoredState)
                                                    throws Exception
        Restores the latest checkpointed state to all tasks and all coordinators. This method represents a "global restore"-style operation where all stateful tasks and coordinators from the given set of Job Vertices are restored. are restored to their latest checkpointed state.
        Parameters:
        tasks - Set of job vertices to restore. State for these vertices is restored via Execution.setInitialState(JobManagerTaskRestore).
        allowNonRestoredState - Allow checkpoint state that cannot be mapped to any job vertex in tasks.
        Returns:
        true if state was restored, false otherwise.
        Throws:
        IllegalStateException - If the CheckpointCoordinator is shut down.
        IllegalStateException - If no completed checkpoint is available and the failIfNoCheckpoint flag has been set.
        IllegalStateException - If the checkpoint contains state that cannot be mapped to any job vertex in tasks and the allowNonRestoredState flag has not been set.
        IllegalStateException - If the max parallelism changed for an operator that restores state from this checkpoint.
        IllegalStateException - If the parallelism changed for an operator that restores non-partitioned state from this checkpoint.
        Exception
      • restoreInitialCheckpointIfPresent

        public boolean restoreInitialCheckpointIfPresent​(Set<ExecutionJobVertex> tasks)
                                                  throws Exception
        Restores the latest checkpointed at the beginning of the job execution. If there is a checkpoint, this method acts like a "global restore"-style operation where all stateful tasks and coordinators from the given set of Job Vertices are restored.
        Parameters:
        tasks - Set of job vertices to restore. State for these vertices is restored via Execution.setInitialState(JobManagerTaskRestore).
        Returns:
        True, if a checkpoint was found and its state was restored, false otherwise.
        Throws:
        Exception
      • getNumberOfPendingCheckpoints

        public int getNumberOfPendingCheckpoints()
      • getNumberOfRetainedSuccessfulCheckpoints

        public int getNumberOfRetainedSuccessfulCheckpoints()
      • getCheckpointTimeout

        public long getCheckpointTimeout()
      • isTriggering

        public boolean isTriggering()
      • isPeriodicCheckpointingConfigured

        public boolean isPeriodicCheckpointingConfigured()
        Returns whether periodic checkpointing has been configured.
        Returns:
        true if periodic checkpoints have been configured.
      • startCheckpointScheduler

        public void startCheckpointScheduler()
      • stopCheckpointScheduler

        public void stopCheckpointScheduler()
      • isPeriodicCheckpointingStarted

        public boolean isPeriodicCheckpointingStarted()
      • abortPendingCheckpoints

        public void abortPendingCheckpoints​(CheckpointException exception)
        Aborts all the pending checkpoints due to en exception.
        Parameters:
        exception - The exception.
      • createActivatorDeactivator

        public JobStatusListener createActivatorDeactivator​(boolean allTasksOutputNonBlocking)