Class OperatorCoordinatorHolder

  • All Implemented Interfaces:
    AutoCloseable, CheckpointListener, OperatorCoordinatorCheckpointContext, OperatorInfo

    public class OperatorCoordinatorHolder
    extends Object
    implements OperatorCoordinatorCheckpointContext, AutoCloseable
    The OperatorCoordinatorHolder holds the OperatorCoordinator and manages all its interactions with the remaining components. It provides the context and is responsible for checkpointing and exactly once semantics.

    Exactly-one Semantics

    The semantics are described under OperatorCoordinator.checkpointCoordinator(long, CompletableFuture).

    Exactly-one Mechanism

    The mechanism for exactly once semantics is as follows:

    • Events pass through a special channel, the SubtaskGatewayImpl. If we are not currently triggering a checkpoint, then events simply pass through.
    • With the completion of the checkpoint future for the coordinator, this subtask gateway is closed. Events coming after that are held back (buffered), because they belong to the epoch after the checkpoint.
    • Once all coordinators in the job have completed the checkpoint, the barriers to the sources are injected. If a coordinator receives an AcknowledgeCheckpointEvent from one of its subtasks, which denotes that the subtask has received the checkpoint barrier and completed checkpoint, the coordinator reopens the corresponding subtask gateway and sends out buffered events.
    • If a task fails in the meantime, the events are dropped from the gateways. From the coordinator's perspective, these events are lost, because they were sent to a failed subtask after it's latest complete checkpoint.

    IMPORTANT: A critical assumption is that all events from the scheduler to the Tasks are transported strictly in order. Events being sent from the coordinator after the checkpoint barrier was injected must not overtake the checkpoint barrier. This is currently guaranteed by Flink's RPC mechanism.

    Concurrency and Threading Model

    This component runs strictly in the Scheduler's main-thread-executor. All calls "from the outside" are either already in the main-thread-executor (when coming from Scheduler) or put into the main-thread-executor (when coming from the CheckpointCoordinator). We rely on the executor to preserve strict order of the calls.

    Actions from the coordinator to the "outside world" (like completing a checkpoint and sending an event) are also enqueued back into the scheduler main-thread executor, strictly in order.