public class OperatorCoordinatorHolder extends Object implements OperatorCoordinator, OperatorCoordinatorCheckpointContext
OperatorCoordinatorHolder
holds the OperatorCoordinator
and manages all its
interactions with the remaining components. It provides the context and is responsible for
checkpointing and exactly once semantics.
The semantics are described under OperatorCoordinator.checkpointCoordinator(long,
CompletableFuture)
.
This implementation can handle one checkpoint being triggered at a time. If another checkpoint is triggered while the triggering of the first one was not completed or aborted, this class will throw an exception. That is in line with the capabilities of the Checkpoint Coordinator, which can handle multiple concurrent checkpoints on the TaskManagers, but only one concurrent triggering phase.
The mechanism for exactly once semantics is as follows:
OperatorEventValve
. If we are not
currently triggering a checkpoint, then events simply pass through.
afterSourceBarrierInjection(long)
) the valves are
opened again and the events are sent.
IMPORTANT: A critical assumption is that all events from the scheduler to the Tasks are transported strictly in order. Events being sent from the coordinator after the checkpoint barrier was injected must not overtake the checkpoint barrier. This is currently guaranteed by Flink's RPC mechanism.
Consider this example:
Coordinator one events: => a . . b . |trigger| . . |complete| . . c . . d . |barrier| . e . f Coordinator two events: => . . x . . |trigger| . . . . . . . . . .|complete||barrier| . . y . . z
Two coordinators trigger checkpoints at the same time. 'Coordinator Two' takes longer to complete, and in the meantime 'Coordinator One' sends more events.
'Coordinator One' emits events 'c' and 'd' after it finished its checkpoint, meaning the events must take place after the checkpoint. But they are before the barrier injection, meaning the runtime task would see them before the checkpoint, if they were immediately transported.
'Coordinator One' closes its valve as soon as the checkpoint future completes. Events 'c' and 'd' get held back in the valve. Once 'Coordinator Two' completes its checkpoint, the barriers are sent to the sources. Then the valves are opened, and events 'c' and 'd' can flow to the tasks where they are received after the barrier.
This component runs mainly in a main-thread-executor, like RPC endpoints. However, some
actions need to be triggered synchronously by other threads. Most notably, when the checkpoint
future is completed by the OperatorCoordinator
implementation, we need to synchronously
suspend event-sending.
OperatorCoordinator.Context, OperatorCoordinator.Provider
NO_CHECKPOINT
Modifier and Type | Method and Description |
---|---|
void |
abortCurrentTriggering() |
void |
afterSourceBarrierInjection(long checkpointId) |
void |
checkpointCoordinator(long checkpointId,
CompletableFuture<byte[]> result)
Takes a checkpoint of the coordinator.
|
void |
close()
This method is called when the coordinator is disposed.
|
OperatorCoordinator |
coordinator() |
static OperatorCoordinatorHolder |
create(SerializedValue<OperatorCoordinator.Provider> serializedProvider,
ExecutionJobVertex jobVertex,
ClassLoader classLoader) |
int |
currentParallelism() |
void |
handleEventFromOperator(int subtask,
OperatorEvent event)
Hands an OperatorEvent from a task (on the Task Manager) to this coordinator.
|
void |
lazyInitialize(SchedulerNG scheduler,
ComponentMainThreadExecutor mainThreadExecutor) |
int |
maxParallelism() |
void |
notifyCheckpointAborted(long checkpointId)
We override the method here to remove the checked exception.
|
void |
notifyCheckpointComplete(long checkpointId)
We override the method here to remove the checked exception.
|
OperatorID |
operatorId() |
void |
resetToCheckpoint(long checkpointId,
byte[] checkpointData)
Resets the coordinator to the given checkpoint.
|
void |
start()
Starts the coordinator.
|
void |
subtaskFailed(int subtask,
Throwable reason)
Called when one of the subtasks of the task running the coordinated operator goes through a
failover (failure / recovery cycle).
|
void |
subtaskReset(int subtask,
long checkpointId)
Called if a task is recovered as part of a partial failover, meaning a failover
handled by the scheduler's failover strategy (by default recovering a pipelined region).
|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
getIds
public void lazyInitialize(SchedulerNG scheduler, ComponentMainThreadExecutor mainThreadExecutor)
public OperatorCoordinator coordinator()
public OperatorID operatorId()
operatorId
in interface OperatorInfo
public int maxParallelism()
maxParallelism
in interface OperatorInfo
public int currentParallelism()
currentParallelism
in interface OperatorInfo
public void start() throws Exception
OperatorCoordinator
start
in interface OperatorCoordinator
Exception
- Any exception thrown from this method causes a full job failure.public void close() throws Exception
OperatorCoordinator
close
in interface AutoCloseable
close
in interface OperatorCoordinator
Exception
public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception
OperatorCoordinator
handleEventFromOperator
in interface OperatorCoordinator
Exception
- Any exception thrown by this method results in a full job failure and
recovery.public void subtaskFailed(int subtask, @Nullable Throwable reason)
OperatorCoordinator
This method is called every time there is a failover of a subtasks, regardless of whether there it is a partial failover or a global failover.
subtaskFailed
in interface OperatorCoordinator
public void subtaskReset(int subtask, long checkpointId)
OperatorCoordinator
In contrast to this method, the OperatorCoordinator.resetToCheckpoint(long, byte[])
method is called
in the case of a global failover, which is the case when the coordinator (JobManager) is
recovered.
subtaskReset
in interface OperatorCoordinatorCheckpointContext
subtaskReset
in interface OperatorCoordinator
public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result)
OperatorCoordinator
To confirm the checkpoint and store state in it, the given CompletableFuture
must
be completed with the state. To abort or dis-confirm the checkpoint, the given CompletableFuture
must be completed exceptionally. In any case, the given CompletableFuture
must be completed in some way, otherwise the checkpoint will not progress.
The semantics are defined as follows:
checkpointCoordinator
in interface OperatorCoordinatorCheckpointContext
checkpointCoordinator
in interface OperatorCoordinator
public void notifyCheckpointComplete(long checkpointId)
OperatorCoordinator
CheckpointListener.notifyCheckpointComplete(long)
for more detail semantic of the
method.notifyCheckpointComplete
in interface OperatorCoordinatorCheckpointContext
notifyCheckpointComplete
in interface OperatorCoordinator
notifyCheckpointComplete
in interface CheckpointListener
checkpointId
- The ID of the checkpoint that has been completed.public void notifyCheckpointAborted(long checkpointId)
OperatorCoordinator
CheckpointListener.notifyCheckpointAborted(long)
for more detail semantic of the
method.notifyCheckpointAborted
in interface OperatorCoordinatorCheckpointContext
notifyCheckpointAborted
in interface OperatorCoordinator
notifyCheckpointAborted
in interface CheckpointListener
checkpointId
- The ID of the checkpoint that has been aborted.public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception
OperatorCoordinator
This method is called in the case of a global failover of the system, which means a
failover of the coordinator (JobManager). This method is not invoked on a partial
failover; partial failovers call the OperatorCoordinator.subtaskReset(int, long)
method for the
involved subtasks.
This method is expected to behave synchronously with respect to other method calls and
calls to Context
methods. For example, Events being sent by the Coordinator after
this method returns are assumed to take place after the checkpoint that was restored.
This method is called with a null state argument in the following situations:
Restoring to a checkpoint is a way of confirming that the checkpoint is complete. It is safe to commit side-effects that are predicated on checkpoint completion after this call.
Even if no call to OperatorCoordinator.notifyCheckpointComplete(long)
happened, the checkpoint can
still be complete (for example when a system failure happened directly after committing the
checkpoint, before calling the OperatorCoordinator.notifyCheckpointComplete(long)
method).
resetToCheckpoint
in interface OperatorCoordinatorCheckpointContext
resetToCheckpoint
in interface OperatorCoordinator
Exception
public void afterSourceBarrierInjection(long checkpointId)
afterSourceBarrierInjection
in interface OperatorCoordinatorCheckpointContext
public void abortCurrentTriggering()
abortCurrentTriggering
in interface OperatorCoordinatorCheckpointContext
public static OperatorCoordinatorHolder create(SerializedValue<OperatorCoordinator.Provider> serializedProvider, ExecutionJobVertex jobVertex, ClassLoader classLoader) throws Exception
Exception
Copyright © 2014–2021 The Apache Software Foundation. All rights reserved.