Class SourceCoordinator<SplitT extends SourceSplit,EnumChkT>
- java.lang.Object
-
- org.apache.flink.runtime.source.coordinator.SourceCoordinator<SplitT,EnumChkT>
-
- All Implemented Interfaces:
AutoCloseable
,CheckpointListener
,OperatorCoordinator
@Internal public class SourceCoordinator<SplitT extends SourceSplit,EnumChkT> extends Object implements OperatorCoordinator
The default implementation of theOperatorCoordinator
for theSource
.The
SourceCoordinator
provides an event loop style thread model to interact with the Flink runtime. The coordinator ensures that all the state manipulations are made by its event loop thread. It also helps keep track of the necessary split assignments history per subtask to simplify theSplitEnumerator
implementation.The coordinator maintains a
SplitEnumeratorContxt
and shares it with the enumerator. When the coordinator receives an action request from the Flink runtime, it sets up the context, and calls corresponding method of the SplitEnumerator to take actions.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
SourceCoordinator.WatermarkElement
The watermark element forHeapPriorityQueue
.-
Nested classes/interfaces inherited from interface org.apache.flink.runtime.operators.coordination.OperatorCoordinator
OperatorCoordinator.Context, OperatorCoordinator.Provider, OperatorCoordinator.SubtaskGateway
-
-
Field Summary
-
Fields inherited from interface org.apache.flink.runtime.operators.coordination.OperatorCoordinator
BATCH_CHECKPOINT_ID, NO_CHECKPOINT
-
-
Constructor Summary
Constructors Constructor Description SourceCoordinator(String operatorName, Source<?,SplitT,EnumChkT> source, SourceCoordinatorContext<SplitT> context, CoordinatorStore coordinatorStore)
SourceCoordinator(String operatorName, Source<?,SplitT,EnumChkT> source, SourceCoordinatorContext<SplitT> context, CoordinatorStore coordinatorStore, WatermarkAlignmentParams watermarkAlignmentParams, String coordinatorListeningID)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result)
Takes a checkpoint of the coordinator.void
close()
This method is called when the coordinator is disposed.void
executionAttemptFailed(int subtaskId, int attemptNumber, Throwable reason)
Called when any subtask execution attempt of the task running the coordinated operator is failed/canceled.void
executionAttemptReady(int subtask, int attemptNumber, OperatorCoordinator.SubtaskGateway gateway)
This is called when a subtask execution attempt of the Operator becomes ready to receive events.ExecutorService
getCoordinatorExecutor()
SplitEnumerator<SplitT,EnumChkT>
getEnumerator()
void
handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)
Hands an OperatorEvent coming from a parallel Operator instance (one attempt of the parallel subtasks).CompletableFuture<Integer>
inferSourceParallelismAsync(int parallelismInferenceUpperBound, long dataVolumePerTask)
void
notifyCheckpointAborted(long checkpointId)
We override the method here to remove the checked exception.void
notifyCheckpointComplete(long checkpointId)
We override the method here to remove the checked exception.void
resetToCheckpoint(long checkpointId, byte[] checkpointData)
Resets the coordinator to the given checkpoint.void
start()
Starts the coordinator.void
subtaskReset(int subtaskId, long checkpointId)
Called if a subtask is recovered as part of a partial failover, meaning a failover handled by the scheduler's failover strategy (by default recovering a pipelined region).boolean
supportsBatchSnapshot()
Whether the enumerator supports batch snapshot.
-
-
-
Constructor Detail
-
SourceCoordinator
public SourceCoordinator(String operatorName, Source<?,SplitT,EnumChkT> source, SourceCoordinatorContext<SplitT> context, CoordinatorStore coordinatorStore)
-
SourceCoordinator
public SourceCoordinator(String operatorName, Source<?,SplitT,EnumChkT> source, SourceCoordinatorContext<SplitT> context, CoordinatorStore coordinatorStore, WatermarkAlignmentParams watermarkAlignmentParams, @Nullable String coordinatorListeningID)
-
-
Method Detail
-
start
public void start() throws Exception
Description copied from interface:OperatorCoordinator
Starts the coordinator. This method is called once at the beginning, before any other methods.- Specified by:
start
in interfaceOperatorCoordinator
- Throws:
Exception
- Any exception thrown from this method causes a full job failure.
-
close
public void close() throws Exception
Description copied from interface:OperatorCoordinator
This method is called when the coordinator is disposed. This method should release currently held resources. Exceptions in this method do not cause the job to fail.- Specified by:
close
in interfaceAutoCloseable
- Specified by:
close
in interfaceOperatorCoordinator
- Throws:
Exception
-
handleEventFromOperator
public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)
Description copied from interface:OperatorCoordinator
Hands an OperatorEvent coming from a parallel Operator instance (one attempt of the parallel subtasks).- Specified by:
handleEventFromOperator
in interfaceOperatorCoordinator
-
executionAttemptFailed
public void executionAttemptFailed(int subtaskId, int attemptNumber, @Nullable Throwable reason)
Description copied from interface:OperatorCoordinator
Called when any subtask execution attempt of the task running the coordinated operator is failed/canceled.This method is called every time an execution attempt is failed/canceled, regardless of whether there it is caused by a partial failover or a global failover.
- Specified by:
executionAttemptFailed
in interfaceOperatorCoordinator
-
subtaskReset
public void subtaskReset(int subtaskId, long checkpointId)
Description copied from interface:OperatorCoordinator
Called if a subtask is recovered as part of a partial failover, meaning a failover handled by the scheduler's failover strategy (by default recovering a pipelined region). The method is invoked for each subtask involved in that partial failover.In contrast to this method, the
OperatorCoordinator.resetToCheckpoint(long, byte[])
method is called in the case of a global failover, which is the case when the coordinator (JobManager) is recovered.Note that this method will not be called if an execution attempt of a subtask failed, if the subtask is not entirely failed, i.e. if the subtask has other execution attempts that are not failed/canceled.
- Specified by:
subtaskReset
in interfaceOperatorCoordinator
-
executionAttemptReady
public void executionAttemptReady(int subtask, int attemptNumber, OperatorCoordinator.SubtaskGateway gateway)
Description copied from interface:OperatorCoordinator
This is called when a subtask execution attempt of the Operator becomes ready to receive events. The givenSubtaskGateway
can be used to send events to the execution attempt.The given
SubtaskGateway
is bound to that specific execution attempt that became ready. All events sent through the gateway target that execution attempt; if the attempt is no longer running by the time the event is sent, then the events are failed.- Specified by:
executionAttemptReady
in interfaceOperatorCoordinator
-
supportsBatchSnapshot
public boolean supportsBatchSnapshot()
Whether the enumerator supports batch snapshot. Note the enumerator is created either during resetting the coordinator to a checkpoint, or when the coordinator is started.- Specified by:
supportsBatchSnapshot
in interfaceOperatorCoordinator
-
checkpointCoordinator
public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result)
Description copied from interface:OperatorCoordinator
Takes a checkpoint of the coordinator. The checkpoint is identified by the given ID.To confirm the checkpoint and store state in it, the given
CompletableFuture
must be completed with the state. To abort or dis-confirm the checkpoint, the givenCompletableFuture
must be completed exceptionally. In any case, the givenCompletableFuture
must be completed in some way, otherwise the checkpoint will not progress.Exactly-once Semantics
The semantics are defined as follows:
- The point in time when the checkpoint future is completed is considered the point in time when the coordinator's checkpoint takes place.
- The OperatorCoordinator implementation must have a way of strictly ordering the sending of events and the completion of the checkpoint future (for example the same thread does both actions, or both actions are guarded by a mutex).
- Every event sent before the checkpoint future is completed is considered before the checkpoint.
- Every event sent after the checkpoint future is completed is considered to be after the checkpoint.
- Specified by:
checkpointCoordinator
in interfaceOperatorCoordinator
-
notifyCheckpointComplete
public void notifyCheckpointComplete(long checkpointId)
Description copied from interface:OperatorCoordinator
We override the method here to remove the checked exception. Please check the Java docs ofCheckpointListener.notifyCheckpointComplete(long)
for more detail semantic of the method.- Specified by:
notifyCheckpointComplete
in interfaceCheckpointListener
- Specified by:
notifyCheckpointComplete
in interfaceOperatorCoordinator
- Parameters:
checkpointId
- The ID of the checkpoint that has been completed.
-
notifyCheckpointAborted
public void notifyCheckpointAborted(long checkpointId)
Description copied from interface:OperatorCoordinator
We override the method here to remove the checked exception. Please check the Java docs ofCheckpointListener.notifyCheckpointAborted(long)
for more detail semantic of the method.- Specified by:
notifyCheckpointAborted
in interfaceCheckpointListener
- Specified by:
notifyCheckpointAborted
in interfaceOperatorCoordinator
- Parameters:
checkpointId
- The ID of the checkpoint that has been aborted.
-
resetToCheckpoint
public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception
Description copied from interface:OperatorCoordinator
Resets the coordinator to the given checkpoint. When this method is called, the coordinator can discard all other in-flight working state. All subtasks will also have been reset to the same checkpoint.This method is called in the case of a global failover of the system, which means a failover of the coordinator (JobManager). This method is not invoked on a partial failover; partial failovers call the
OperatorCoordinator.subtaskReset(int, long)
method for the involved subtasks.This method is expected to behave synchronously with respect to other method calls and calls to
Context
methods. For example, Events being sent by the Coordinator after this method returns are assumed to take place after the checkpoint that was restored.This method is called with a null state argument in the following situations:
- There is a recovery and there was no completed checkpoint yet.
- There is a recovery from a completed checkpoint/savepoint but it contained no state for the coordinator.
In both cases, the coordinator should reset to an empty (new) state.
Restoring implicitly notifies of Checkpoint Completion
Restoring to a checkpoint is a way of confirming that the checkpoint is complete. It is safe to commit side-effects that are predicated on checkpoint completion after this call.
Even if no call to
OperatorCoordinator.notifyCheckpointComplete(long)
happened, the checkpoint can still be complete (for example when a system failure happened directly after committing the checkpoint, before calling theOperatorCoordinator.notifyCheckpointComplete(long)
method).- Specified by:
resetToCheckpoint
in interfaceOperatorCoordinator
- Throws:
Exception
-
getEnumerator
@VisibleForTesting public SplitEnumerator<SplitT,EnumChkT> getEnumerator()
-
getCoordinatorExecutor
@VisibleForTesting public ExecutorService getCoordinatorExecutor()
-
inferSourceParallelismAsync
public CompletableFuture<Integer> inferSourceParallelismAsync(int parallelismInferenceUpperBound, long dataVolumePerTask)
-
-