Class OperatorCoordinatorHolder
- java.lang.Object
-
- org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder
-
- All Implemented Interfaces:
AutoCloseable
,CheckpointListener
,OperatorCoordinatorCheckpointContext
,OperatorInfo
public class OperatorCoordinatorHolder extends Object implements OperatorCoordinatorCheckpointContext, AutoCloseable
TheOperatorCoordinatorHolder
holds theOperatorCoordinator
and manages all its interactions with the remaining components. It provides the context and is responsible for checkpointing and exactly once semantics.Exactly-one Semantics
The semantics are described under
OperatorCoordinator.checkpointCoordinator(long, CompletableFuture)
.Exactly-one Mechanism
The mechanism for exactly once semantics is as follows:
- Events pass through a special channel, the
SubtaskGatewayImpl
. If we are not currently triggering a checkpoint, then events simply pass through. - With the completion of the checkpoint future for the coordinator, this subtask gateway is closed. Events coming after that are held back (buffered), because they belong to the epoch after the checkpoint.
- Once all coordinators in the job have completed the checkpoint, the barriers to the sources
are injected. If a coordinator receives an
AcknowledgeCheckpointEvent
from one of its subtasks, which denotes that the subtask has received the checkpoint barrier and completed checkpoint, the coordinator reopens the corresponding subtask gateway and sends out buffered events. - If a task fails in the meantime, the events are dropped from the gateways. From the coordinator's perspective, these events are lost, because they were sent to a failed subtask after it's latest complete checkpoint.
IMPORTANT: A critical assumption is that all events from the scheduler to the Tasks are transported strictly in order. Events being sent from the coordinator after the checkpoint barrier was injected must not overtake the checkpoint barrier. This is currently guaranteed by Flink's RPC mechanism.
Concurrency and Threading Model
This component runs strictly in the Scheduler's main-thread-executor. All calls "from the outside" are either already in the main-thread-executor (when coming from Scheduler) or put into the main-thread-executor (when coming from the CheckpointCoordinator). We rely on the executor to preserve strict order of the calls.
Actions from the coordinator to the "outside world" (like completing a checkpoint and sending an event) are also enqueued back into the scheduler main-thread executor, strictly in order.
-
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description void
abortCurrentTriggering()
void
checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result)
void
close()
OperatorCoordinator
coordinator()
static OperatorCoordinatorHolder
create(SerializedValue<OperatorCoordinator.Provider> serializedProvider, ExecutionJobVertex jobVertex, ClassLoader classLoader, CoordinatorStore coordinatorStore, boolean supportsConcurrentExecutionAttempts, TaskInformation taskInformation, JobManagerJobMetricGroup metricGroup)
int
currentParallelism()
void
executionAttemptFailed(int subtask, int attemptNumber, Throwable reason)
void
handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)
void
lazyInitialize(GlobalFailureHandler globalFailureHandler, ComponentMainThreadExecutor mainThreadExecutor, CheckpointCoordinator checkpointCoordinator)
void
lazyInitialize(GlobalFailureHandler globalFailureHandler, ComponentMainThreadExecutor mainThreadExecutor, CheckpointCoordinator checkpointCoordinator, int operatorParallelism)
int
maxParallelism()
void
notifyCheckpointAborted(long checkpointId)
We override the method here to remove the checked exception.void
notifyCheckpointComplete(long checkpointId)
We override the method here to remove the checked exception.OperatorID
operatorId()
void
resetToCheckpoint(long checkpointId, byte[] checkpointData)
Resets the coordinator to the checkpoint with the given state.void
setupSubtaskGatewayForAttempts(int subtask, Set<Integer> attemptNumbers)
void
start()
void
subtaskReset(int subtask, long checkpointId)
Called if a task is recovered as part of a partial failover, meaning a failover handled by the scheduler's failover strategy (by default recovering a pipelined region).
-
-
-
Method Detail
-
lazyInitialize
public void lazyInitialize(GlobalFailureHandler globalFailureHandler, ComponentMainThreadExecutor mainThreadExecutor, @Nullable CheckpointCoordinator checkpointCoordinator)
-
lazyInitialize
public void lazyInitialize(GlobalFailureHandler globalFailureHandler, ComponentMainThreadExecutor mainThreadExecutor, @Nullable CheckpointCoordinator checkpointCoordinator, int operatorParallelism)
-
coordinator
public OperatorCoordinator coordinator()
-
operatorId
public OperatorID operatorId()
- Specified by:
operatorId
in interfaceOperatorInfo
-
maxParallelism
public int maxParallelism()
- Specified by:
maxParallelism
in interfaceOperatorInfo
-
currentParallelism
public int currentParallelism()
- Specified by:
currentParallelism
in interfaceOperatorInfo
-
close
public void close() throws Exception
- Specified by:
close
in interfaceAutoCloseable
- Throws:
Exception
-
handleEventFromOperator
public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) throws Exception
- Throws:
Exception
-
executionAttemptFailed
public void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason)
-
subtaskReset
public void subtaskReset(int subtask, long checkpointId)
Description copied from interface:OperatorCoordinatorCheckpointContext
Called if a task is recovered as part of a partial failover, meaning a failover handled by the scheduler's failover strategy (by default recovering a pipelined region). The method is invoked for each subtask involved in that partial failover.In contrast to this method, the
OperatorCoordinatorCheckpointContext.resetToCheckpoint(long, byte[])
method is called in the case of a global failover, which is the case when the coordinator (JobManager) is recovered.- Specified by:
subtaskReset
in interfaceOperatorCoordinatorCheckpointContext
-
checkpointCoordinator
public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result)
- Specified by:
checkpointCoordinator
in interfaceOperatorCoordinatorCheckpointContext
-
notifyCheckpointComplete
public void notifyCheckpointComplete(long checkpointId)
Description copied from interface:OperatorCoordinatorCheckpointContext
We override the method here to remove the checked exception. Please check the Java docs ofCheckpointListener.notifyCheckpointComplete(long)
for more detail semantic of the method.- Specified by:
notifyCheckpointComplete
in interfaceCheckpointListener
- Specified by:
notifyCheckpointComplete
in interfaceOperatorCoordinatorCheckpointContext
- Parameters:
checkpointId
- The ID of the checkpoint that has been completed.
-
notifyCheckpointAborted
public void notifyCheckpointAborted(long checkpointId)
Description copied from interface:OperatorCoordinatorCheckpointContext
We override the method here to remove the checked exception. Please check the Java docs ofCheckpointListener.notifyCheckpointAborted(long)
for more detail semantic of the method.- Specified by:
notifyCheckpointAborted
in interfaceCheckpointListener
- Specified by:
notifyCheckpointAborted
in interfaceOperatorCoordinatorCheckpointContext
- Parameters:
checkpointId
- The ID of the checkpoint that has been aborted.
-
resetToCheckpoint
public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception
Description copied from interface:OperatorCoordinatorCheckpointContext
Resets the coordinator to the checkpoint with the given state.This method is called with a null state argument in the following situations:
- There is a recovery and there was no completed checkpoint yet.
- There is a recovery from a completed checkpoint/savepoint but it contained no state for the coordinator.
In both cases, the coordinator should reset to an empty (new) state.
- Specified by:
resetToCheckpoint
in interfaceOperatorCoordinatorCheckpointContext
- Throws:
Exception
-
abortCurrentTriggering
public void abortCurrentTriggering()
- Specified by:
abortCurrentTriggering
in interfaceOperatorCoordinatorCheckpointContext
-
setupSubtaskGatewayForAttempts
public void setupSubtaskGatewayForAttempts(int subtask, Set<Integer> attemptNumbers)
-
create
public static OperatorCoordinatorHolder create(SerializedValue<OperatorCoordinator.Provider> serializedProvider, ExecutionJobVertex jobVertex, ClassLoader classLoader, CoordinatorStore coordinatorStore, boolean supportsConcurrentExecutionAttempts, TaskInformation taskInformation, JobManagerJobMetricGroup metricGroup) throws Exception
- Throws:
Exception
-
-