@Internal public class SourceCoordinator<SplitT extends SourceSplit,EnumChkT> extends Object implements OperatorCoordinator
OperatorCoordinator
for the Source
.
The SourceCoordinator
provides an event loop style thread model to interact with
the Flink runtime. The coordinator ensures that all the state manipulations are made by its event
loop thread. It also helps keep track of the necessary split assignments history per subtask to
simplify the SplitEnumerator
implementation.
The coordinator maintains a SplitEnumeratorContxt
and shares it
with the enumerator. When the coordinator receives an action request from the Flink runtime, it
sets up the context, and calls corresponding method of the SplitEnumerator to take actions.
Modifier and Type | Class and Description |
---|---|
static class |
SourceCoordinator.WatermarkElement
The watermark element for
HeapPriorityQueue . |
OperatorCoordinator.Context, OperatorCoordinator.Provider, OperatorCoordinator.SubtaskGateway
NO_CHECKPOINT
Constructor and Description |
---|
SourceCoordinator(String operatorName,
Source<?,SplitT,EnumChkT> source,
SourceCoordinatorContext<SplitT> context,
CoordinatorStore coordinatorStore) |
SourceCoordinator(String operatorName,
Source<?,SplitT,EnumChkT> source,
SourceCoordinatorContext<SplitT> context,
CoordinatorStore coordinatorStore,
WatermarkAlignmentParams watermarkAlignmentParams,
String coordinatorListeningID) |
Modifier and Type | Method and Description |
---|---|
void |
checkpointCoordinator(long checkpointId,
CompletableFuture<byte[]> result)
Takes a checkpoint of the coordinator.
|
void |
close()
This method is called when the coordinator is disposed.
|
void |
executionAttemptFailed(int subtaskId,
int attemptNumber,
Throwable reason)
Called when any subtask execution attempt of the task running the coordinated operator is
failed/canceled.
|
void |
executionAttemptReady(int subtask,
int attemptNumber,
OperatorCoordinator.SubtaskGateway gateway)
This is called when a subtask execution attempt of the Operator becomes ready to receive
events.
|
void |
handleEventFromOperator(int subtask,
int attemptNumber,
OperatorEvent event)
Hands an OperatorEvent coming from a parallel Operator instance (one attempt of the parallel
subtasks).
|
void |
notifyCheckpointAborted(long checkpointId)
We override the method here to remove the checked exception.
|
void |
notifyCheckpointComplete(long checkpointId)
We override the method here to remove the checked exception.
|
void |
resetToCheckpoint(long checkpointId,
byte[] checkpointData)
Resets the coordinator to the given checkpoint.
|
void |
start()
Starts the coordinator.
|
void |
subtaskReset(int subtaskId,
long checkpointId)
Called if a subtask is recovered as part of a partial failover, meaning a failover
handled by the scheduler's failover strategy (by default recovering a pipelined region).
|
public SourceCoordinator(String operatorName, Source<?,SplitT,EnumChkT> source, SourceCoordinatorContext<SplitT> context, CoordinatorStore coordinatorStore)
public SourceCoordinator(String operatorName, Source<?,SplitT,EnumChkT> source, SourceCoordinatorContext<SplitT> context, CoordinatorStore coordinatorStore, WatermarkAlignmentParams watermarkAlignmentParams, @Nullable String coordinatorListeningID)
public void start() throws Exception
OperatorCoordinator
start
in interface OperatorCoordinator
Exception
- Any exception thrown from this method causes a full job failure.public void close() throws Exception
OperatorCoordinator
close
in interface AutoCloseable
close
in interface OperatorCoordinator
Exception
public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)
OperatorCoordinator
handleEventFromOperator
in interface OperatorCoordinator
public void executionAttemptFailed(int subtaskId, int attemptNumber, @Nullable Throwable reason)
OperatorCoordinator
This method is called every time an execution attempt is failed/canceled, regardless of whether there it is caused by a partial failover or a global failover.
executionAttemptFailed
in interface OperatorCoordinator
public void subtaskReset(int subtaskId, long checkpointId)
OperatorCoordinator
In contrast to this method, the OperatorCoordinator.resetToCheckpoint(long, byte[])
method is called
in the case of a global failover, which is the case when the coordinator (JobManager) is
recovered.
Note that this method will not be called if an execution attempt of a subtask failed, if the subtask is not entirely failed, i.e. if the subtask has other execution attempts that are not failed/canceled.
subtaskReset
in interface OperatorCoordinator
public void executionAttemptReady(int subtask, int attemptNumber, OperatorCoordinator.SubtaskGateway gateway)
OperatorCoordinator
SubtaskGateway
can be used to send events to the execution attempt.
The given SubtaskGateway
is bound to that specific execution attempt that became
ready. All events sent through the gateway target that execution attempt; if the attempt is
no longer running by the time the event is sent, then the events are failed.
executionAttemptReady
in interface OperatorCoordinator
public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result)
OperatorCoordinator
To confirm the checkpoint and store state in it, the given CompletableFuture
must
be completed with the state. To abort or dis-confirm the checkpoint, the given CompletableFuture
must be completed exceptionally. In any case, the given CompletableFuture
must be completed in some way, otherwise the checkpoint will not progress.
The semantics are defined as follows:
checkpointCoordinator
in interface OperatorCoordinator
public void notifyCheckpointComplete(long checkpointId)
OperatorCoordinator
CheckpointListener.notifyCheckpointComplete(long)
for more detail semantic of the
method.notifyCheckpointComplete
in interface CheckpointListener
notifyCheckpointComplete
in interface OperatorCoordinator
checkpointId
- The ID of the checkpoint that has been completed.public void notifyCheckpointAborted(long checkpointId)
OperatorCoordinator
CheckpointListener.notifyCheckpointAborted(long)
for more detail semantic of the
method.notifyCheckpointAborted
in interface CheckpointListener
notifyCheckpointAborted
in interface OperatorCoordinator
checkpointId
- The ID of the checkpoint that has been aborted.public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception
OperatorCoordinator
This method is called in the case of a global failover of the system, which means a
failover of the coordinator (JobManager). This method is not invoked on a partial
failover; partial failovers call the OperatorCoordinator.subtaskReset(int, long)
method for the
involved subtasks.
This method is expected to behave synchronously with respect to other method calls and
calls to Context
methods. For example, Events being sent by the Coordinator after
this method returns are assumed to take place after the checkpoint that was restored.
This method is called with a null state argument in the following situations:
In both cases, the coordinator should reset to an empty (new) state.
Restoring to a checkpoint is a way of confirming that the checkpoint is complete. It is safe to commit side-effects that are predicated on checkpoint completion after this call.
Even if no call to OperatorCoordinator.notifyCheckpointComplete(long)
happened, the checkpoint can
still be complete (for example when a system failure happened directly after committing the
checkpoint, before calling the OperatorCoordinator.notifyCheckpointComplete(long)
method).
resetToCheckpoint
in interface OperatorCoordinator
Exception
Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.