Class GlobalCommitterOperator<CommT,​GlobalCommT>

  • All Implemented Interfaces:
    Serializable, CheckpointListener, Input<CommittableMessage<CommT>>, KeyContext, KeyContextHandler, OneInputStreamOperator<CommittableMessage<CommT>,​Void>, StreamOperator<Void>, StreamOperatorStateHandler.CheckpointedStreamOperator, YieldingOperator<Void>

    @Internal
    public class GlobalCommitterOperator<CommT,​GlobalCommT>
    extends AbstractStreamOperator<Void>
    implements OneInputStreamOperator<CommittableMessage<CommT>,​Void>
    Implements the GlobalCommitter.

    This operator usually trails behind a CommitterOperator. In this case, the global committer will receive committables from the committer operator through processElement(StreamRecord). Once all committables from all subtasks have been received, the global committer will commit them. This approach also works for any number of intermediate custom operators between the committer and the global committer in a custom post-commit topology.

    That means that the global committer will not wait for notifyCheckpointComplete(long). In many cases, it receives the callback before the actual committables anyway. So it would effectively globally commit one checkpoint later.

    However, we can leverage the following observation: the global committer will only receive committables iff the respective checkpoint was completed and upstream committers received the notifyCheckpointComplete(long). So by waiting for all committables of a given checkpoint, we implicitly know that the checkpoint was successful and the global committer is supposed to globally commit.

    Note that committables of checkpoint X are not checkpointed in X because the global committer is trailing behind the checkpoint. They are replayed from the committer state in case of an error. The state only includes incomplete checkpoints coming from upstream committers not receiving notifyCheckpointComplete(long). All committables received are successful.

    In rare cases, the GlobalCommitterOperator may not be connected (in)directly to a committer but instead is connected (in)directly to a writer. In this case, the global committer needs to perform the 2PC protocol instead of the committer. Thus, we absolutely need to use notifyCheckpointComplete(long) similarly to the CommitterOperator. Hence, commitOnInput is set to false in this case. In particular, the following three prerequisites must be met:

    • No committer is upstream of which we could implicitly infer notifyCheckpointComplete(long) as sketched above.
    • The application runs in streaming mode.
    • Checkpointing is enabled.

    In all other cases (batch or upstream committer or checkpointing is disabled), the global committer commits on input.

    See Also:
    Serialized Form