@Internal public interface OperatorCoordinator extends CheckpointListener, AutoCloseable
Operator coordinators are for example source and sink coordinators that discover and assign work, or aggregate and commit metadata.
All coordinator methods are called by the Job Manager's main thread (mailbox thread). That means that these methods must not, under any circumstances, perform blocking operations (like I/O or waiting on locks or futures). That would run a high risk of bringing down the entire JobManager.
Coordinators that involve more complex operations should hence spawn threads to handle the I/O
work. The methods on the OperatorCoordinator.Context
are safe to be called from another thread than the
thread that calls the Coordinator's methods.
The coordinator's view of the task execution is highly simplified, compared to the Scheduler's view, but allows for consistent interaction with the operators running on the parallel subtasks. In particular, the following methods are guaranteed to be called strictly in order:
executionAttemptReady(int, int, SubtaskGateway)
: Called once you can send events
to the subtask execution attempt. The provided gateway is bound to that specific execution
attempt. This is the start of interaction with the operator subtask attempt.
executionAttemptFailed(int, int, Throwable)
: Called for each subtask execution
attempt as soon as the attempt failed or was cancelled. At this point, interaction with the
subtask attempt should stop.
subtaskReset(int, long)
or resetToCheckpoint(long, byte[])
: Once the
scheduler determined which checkpoint to restore, these methods notify the coordinator of
that. The former method is called in case of a regional failure/recovery (affecting
possible a subset of subtasks), the later method in case of a global failure/recovery. This
method should be used to determine which actions to recover, because it tells you which
checkpoint to fall back to. The coordinator implementation needs to recover the
interactions with the relevant tasks since the checkpoint that is restored. It will be
called only after executionAttemptFailed(int, int, Throwable)
has been called on
all the attempts of the subtask.
executionAttemptReady(int, int, SubtaskGateway)
: Called again, once the recovered
tasks (new attempts) are ready to go. This is later than subtaskReset(int, long)
,
because between those methods, the new attempts are scheduled and deployed.
Modifier and Type | Interface and Description |
---|---|
static interface |
OperatorCoordinator.Context
The context gives the OperatorCoordinator access to contextual information and provides a
gateway to interact with other components, such as sending operator events.
|
static interface |
OperatorCoordinator.Provider
The provider creates an OperatorCoordinator and takes a
OperatorCoordinator.Context to pass to the
OperatorCoordinator. |
static interface |
OperatorCoordinator.SubtaskGateway
The
SubtaskGateway is the way to interact with a specific parallel instance of the
Operator (an Operator subtask), like sending events to the operator. |
Modifier and Type | Field and Description |
---|---|
static long |
BATCH_CHECKPOINT_ID
The checkpoint ID passed to the restore methods when batch scenarios.
|
static long |
NO_CHECKPOINT
The checkpoint ID passed to the restore methods when no completed checkpoint exists, yet.
|
Modifier and Type | Method and Description |
---|---|
void |
checkpointCoordinator(long checkpointId,
CompletableFuture<byte[]> resultFuture)
Takes a checkpoint of the coordinator.
|
void |
close()
This method is called when the coordinator is disposed.
|
void |
executionAttemptFailed(int subtask,
int attemptNumber,
Throwable reason)
Called when any subtask execution attempt of the task running the coordinated operator is
failed/canceled.
|
void |
executionAttemptReady(int subtask,
int attemptNumber,
OperatorCoordinator.SubtaskGateway gateway)
This is called when a subtask execution attempt of the Operator becomes ready to receive
events.
|
void |
handleEventFromOperator(int subtask,
int attemptNumber,
OperatorEvent event)
Hands an OperatorEvent coming from a parallel Operator instance (one attempt of the parallel
subtasks).
|
default void |
notifyCheckpointAborted(long checkpointId)
We override the method here to remove the checked exception.
|
void |
notifyCheckpointComplete(long checkpointId)
We override the method here to remove the checked exception.
|
void |
resetToCheckpoint(long checkpointId,
byte[] checkpointData)
Resets the coordinator to the given checkpoint.
|
void |
start()
Starts the coordinator.
|
void |
subtaskReset(int subtask,
long checkpointId)
Called if a subtask is recovered as part of a partial failover, meaning a failover
handled by the scheduler's failover strategy (by default recovering a pipelined region).
|
default boolean |
supportsBatchSnapshot()
Whether the operator coordinator supports taking snapshot in no-checkpoint/batch scenarios.
|
static final long NO_CHECKPOINT
static final long BATCH_CHECKPOINT_ID
void start() throws Exception
Exception
- Any exception thrown from this method causes a full job failure.void close() throws Exception
close
in interface AutoCloseable
Exception
void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) throws Exception
Exception
- Any exception thrown by this method results in a full job failure and
recovery.void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) throws Exception
To confirm the checkpoint and store state in it, the given CompletableFuture
must
be completed with the state. To abort or dis-confirm the checkpoint, the given CompletableFuture
must be completed exceptionally. In any case, the given CompletableFuture
must be completed in some way, otherwise the checkpoint will not progress.
The semantics are defined as follows:
Exception
- Any exception thrown by this method results in a full job failure and
recovery.void notifyCheckpointComplete(long checkpointId)
CheckpointListener.notifyCheckpointComplete(long)
for more detail semantic of the
method.notifyCheckpointComplete
in interface CheckpointListener
checkpointId
- The ID of the checkpoint that has been completed.default void notifyCheckpointAborted(long checkpointId)
CheckpointListener.notifyCheckpointAborted(long)
for more detail semantic of the
method.notifyCheckpointAborted
in interface CheckpointListener
checkpointId
- The ID of the checkpoint that has been aborted.void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception
This method is called in the case of a global failover of the system, which means a
failover of the coordinator (JobManager). This method is not invoked on a partial
failover; partial failovers call the subtaskReset(int, long)
method for the
involved subtasks.
This method is expected to behave synchronously with respect to other method calls and
calls to Context
methods. For example, Events being sent by the Coordinator after
this method returns are assumed to take place after the checkpoint that was restored.
This method is called with a null state argument in the following situations:
In both cases, the coordinator should reset to an empty (new) state.
Restoring to a checkpoint is a way of confirming that the checkpoint is complete. It is safe to commit side-effects that are predicated on checkpoint completion after this call.
Even if no call to notifyCheckpointComplete(long)
happened, the checkpoint can
still be complete (for example when a system failure happened directly after committing the
checkpoint, before calling the notifyCheckpointComplete(long)
method).
Exception
void subtaskReset(int subtask, long checkpointId)
In contrast to this method, the resetToCheckpoint(long, byte[])
method is called
in the case of a global failover, which is the case when the coordinator (JobManager) is
recovered.
Note that this method will not be called if an execution attempt of a subtask failed, if the subtask is not entirely failed, i.e. if the subtask has other execution attempts that are not failed/canceled.
void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason)
This method is called every time an execution attempt is failed/canceled, regardless of whether there it is caused by a partial failover or a global failover.
void executionAttemptReady(int subtask, int attemptNumber, OperatorCoordinator.SubtaskGateway gateway)
SubtaskGateway
can be used to send events to the execution attempt.
The given SubtaskGateway
is bound to that specific execution attempt that became
ready. All events sent through the gateway target that execution attempt; if the attempt is
no longer running by the time the event is sent, then the events are failed.
default boolean supportsBatchSnapshot()
checkpointCoordinator(long, java.util.concurrent.CompletableFuture<byte[]>)
and resetToCheckpoint(long, byte[])
methods supports taking snapshot and restoring from a
snapshot in batch processing scenarios. In such scenarios, the checkpointId will always be
-1.Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.