public class OperatorCoordinatorHolder extends Object implements OperatorCoordinatorCheckpointContext, AutoCloseable
OperatorCoordinatorHolder
holds the OperatorCoordinator
and manages all its
interactions with the remaining components. It provides the context and is responsible for
checkpointing and exactly once semantics.
The semantics are described under OperatorCoordinator.checkpointCoordinator(long,
CompletableFuture)
.
The mechanism for exactly once semantics is as follows:
SubtaskGatewayImpl
. If we are not
currently triggering a checkpoint, then events simply pass through.
AcknowledgeCheckpointEvent
from one of
its subtasks, which denotes that the subtask has received the checkpoint barrier and
completed checkpoint, the coordinator reopens the corresponding subtask gateway and sends
out buffered events.
IMPORTANT: A critical assumption is that all events from the scheduler to the Tasks are transported strictly in order. Events being sent from the coordinator after the checkpoint barrier was injected must not overtake the checkpoint barrier. This is currently guaranteed by Flink's RPC mechanism.
This component runs strictly in the Scheduler's main-thread-executor. All calls "from the outside" are either already in the main-thread-executor (when coming from Scheduler) or put into the main-thread-executor (when coming from the CheckpointCoordinator). We rely on the executor to preserve strict order of the calls.
Actions from the coordinator to the "outside world" (like completing a checkpoint and sending an event) are also enqueued back into the scheduler main-thread executor, strictly in order.
Modifier and Type | Method and Description |
---|---|
void |
abortCurrentTriggering() |
void |
checkpointCoordinator(long checkpointId,
CompletableFuture<byte[]> result) |
void |
close() |
OperatorCoordinator |
coordinator() |
static OperatorCoordinatorHolder |
create(SerializedValue<OperatorCoordinator.Provider> serializedProvider,
ExecutionJobVertex jobVertex,
ClassLoader classLoader,
CoordinatorStore coordinatorStore,
boolean supportsConcurrentExecutionAttempts,
TaskInformation taskInformation,
JobManagerJobMetricGroup metricGroup) |
int |
currentParallelism() |
void |
executionAttemptFailed(int subtask,
int attemptNumber,
Throwable reason) |
void |
handleEventFromOperator(int subtask,
int attemptNumber,
OperatorEvent event) |
void |
lazyInitialize(GlobalFailureHandler globalFailureHandler,
ComponentMainThreadExecutor mainThreadExecutor,
CheckpointCoordinator checkpointCoordinator) |
void |
lazyInitialize(GlobalFailureHandler globalFailureHandler,
ComponentMainThreadExecutor mainThreadExecutor,
CheckpointCoordinator checkpointCoordinator,
int operatorParallelism) |
int |
maxParallelism() |
void |
notifyCheckpointAborted(long checkpointId)
We override the method here to remove the checked exception.
|
void |
notifyCheckpointComplete(long checkpointId)
We override the method here to remove the checked exception.
|
OperatorID |
operatorId() |
void |
resetToCheckpoint(long checkpointId,
byte[] checkpointData)
Resets the coordinator to the checkpoint with the given state.
|
void |
setupSubtaskGatewayForAttempts(int subtask,
Set<Integer> attemptNumbers) |
void |
start() |
void |
subtaskReset(int subtask,
long checkpointId)
Called if a task is recovered as part of a partial failover, meaning a failover
handled by the scheduler's failover strategy (by default recovering a pipelined region).
|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
getIds
public void lazyInitialize(GlobalFailureHandler globalFailureHandler, ComponentMainThreadExecutor mainThreadExecutor, @Nullable CheckpointCoordinator checkpointCoordinator)
public void lazyInitialize(GlobalFailureHandler globalFailureHandler, ComponentMainThreadExecutor mainThreadExecutor, @Nullable CheckpointCoordinator checkpointCoordinator, int operatorParallelism)
public OperatorCoordinator coordinator()
public OperatorID operatorId()
operatorId
in interface OperatorInfo
public int maxParallelism()
maxParallelism
in interface OperatorInfo
public int currentParallelism()
currentParallelism
in interface OperatorInfo
public void close() throws Exception
close
in interface AutoCloseable
Exception
public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) throws Exception
Exception
public void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason)
public void subtaskReset(int subtask, long checkpointId)
OperatorCoordinatorCheckpointContext
In contrast to this method, the OperatorCoordinatorCheckpointContext.resetToCheckpoint(long, byte[])
method is called
in the case of a global failover, which is the case when the coordinator (JobManager) is
recovered.
subtaskReset
in interface OperatorCoordinatorCheckpointContext
public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result)
checkpointCoordinator
in interface OperatorCoordinatorCheckpointContext
public void notifyCheckpointComplete(long checkpointId)
OperatorCoordinatorCheckpointContext
CheckpointListener.notifyCheckpointComplete(long)
for more detail semantic of the
method.notifyCheckpointComplete
in interface CheckpointListener
notifyCheckpointComplete
in interface OperatorCoordinatorCheckpointContext
checkpointId
- The ID of the checkpoint that has been completed.public void notifyCheckpointAborted(long checkpointId)
OperatorCoordinatorCheckpointContext
CheckpointListener.notifyCheckpointAborted(long)
for more detail semantic of the
method.notifyCheckpointAborted
in interface CheckpointListener
notifyCheckpointAborted
in interface OperatorCoordinatorCheckpointContext
checkpointId
- The ID of the checkpoint that has been aborted.public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception
OperatorCoordinatorCheckpointContext
This method is called with a null state argument in the following situations:
In both cases, the coordinator should reset to an empty (new) state.
resetToCheckpoint
in interface OperatorCoordinatorCheckpointContext
Exception
public void abortCurrentTriggering()
abortCurrentTriggering
in interface OperatorCoordinatorCheckpointContext
public void setupSubtaskGatewayForAttempts(int subtask, Set<Integer> attemptNumbers)
public static OperatorCoordinatorHolder create(SerializedValue<OperatorCoordinator.Provider> serializedProvider, ExecutionJobVertex jobVertex, ClassLoader classLoader, CoordinatorStore coordinatorStore, boolean supportsConcurrentExecutionAttempts, TaskInformation taskInformation, JobManagerJobMetricGroup metricGroup) throws Exception
Exception
Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.