Class GlobalCommitterOperator<CommT,GlobalCommT>
- java.lang.Object
-
- org.apache.flink.streaming.api.operators.AbstractStreamOperator<Void>
-
- org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator<CommT,GlobalCommT>
-
- All Implemented Interfaces:
Serializable
,CheckpointListener
,Input<CommittableMessage<CommT>>
,KeyContext
,KeyContextHandler
,OneInputStreamOperator<CommittableMessage<CommT>,Void>
,StreamOperator<Void>
,StreamOperatorStateHandler.CheckpointedStreamOperator
,YieldingOperator<Void>
@Internal public class GlobalCommitterOperator<CommT,GlobalCommT> extends AbstractStreamOperator<Void> implements OneInputStreamOperator<CommittableMessage<CommT>,Void>
Implements theGlobalCommitter
.This operator usually trails behind a
CommitterOperator
. In this case, the global committer will receive committables from the committer operator throughprocessElement(StreamRecord)
. Once all committables from all subtasks have been received, the global committer will commit them. This approach also works for any number of intermediate custom operators between the committer and the global committer in a custom post-commit topology.That means that the global committer will not wait for
notifyCheckpointComplete(long)
. In many cases, it receives the callback before the actual committables anyway. So it would effectively globally commit one checkpoint later.However, we can leverage the following observation: the global committer will only receive committables iff the respective checkpoint was completed and upstream committers received the
notifyCheckpointComplete(long)
. So by waiting for all committables of a given checkpoint, we implicitly know that the checkpoint was successful and the global committer is supposed to globally commit.Note that committables of checkpoint X are not checkpointed in X because the global committer is trailing behind the checkpoint. They are replayed from the committer state in case of an error. The state only includes incomplete checkpoints coming from upstream committers not receiving
notifyCheckpointComplete(long)
. All committables received are successful.In rare cases, the GlobalCommitterOperator may not be connected (in)directly to a committer but instead is connected (in)directly to a writer. In this case, the global committer needs to perform the 2PC protocol instead of the committer. Thus, we absolutely need to use
notifyCheckpointComplete(long)
similarly to theCommitterOperator
. Hence,commitOnInput
is set to false in this case. In particular, the following three prerequisites must be met:- No committer is upstream of which we could implicitly infer
notifyCheckpointComplete(long)
as sketched above. - The application runs in streaming mode.
- Checkpointing is enabled.
In all other cases (batch or upstream committer or checkpointing is disabled), the global committer commits on input.
- See Also:
- Serialized Form
-
-
Field Summary
-
Fields inherited from class org.apache.flink.streaming.api.operators.AbstractStreamOperator
combinedWatermark, config, lastRecordAttributes1, lastRecordAttributes2, latencyStats, LOG, metrics, output, processingTimeService, stateHandler, stateKeySelector1, stateKeySelector2, timeServiceManager
-
-
Constructor Summary
Constructors Constructor Description GlobalCommitterOperator(SerializableSupplier<Committer<CommT>> committerFactory, SerializableSupplier<SimpleVersionedSerializer<CommT>> committableSerializerFactory, boolean commitOnInput)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
initializeState(StateInitializationContext context)
Stream operators with state which can be restored need to override this hook method.void
notifyCheckpointComplete(long checkpointId)
Notifies the listener that the checkpoint with the givencheckpointId
completed and was committed.void
processElement(StreamRecord<CommittableMessage<CommT>> element)
Processes one element that arrived on this input of theMultipleInputStreamOperator
.protected void
setup(StreamTask<?,?> containingTask, StreamConfig config, Output<StreamRecord<Void>> output)
void
snapshotState(StateSnapshotContext context)
Stream operators with state, which want to participate in a snapshot need to override this hook method.-
Methods inherited from class org.apache.flink.streaming.api.operators.AbstractStreamOperator
close, finish, getContainingTask, getCurrentKey, getExecutionConfig, getInternalTimerService, getKeyedStateBackend, getKeyedStateStore, getMetricGroup, getOperatorConfig, getOperatorID, getOperatorName, getOperatorStateBackend, getOrCreateKeyedState, getPartitionedState, getPartitionedState, getProcessingTimeService, getRuntimeContext, getStateKeySelector1, getStateKeySelector2, getTimeServiceManager, getUserCodeClassloader, hasKeyContext1, hasKeyContext2, initializeState, isAsyncStateProcessingEnabled, isUsingCustomRawKeyedState, notifyCheckpointAborted, open, prepareSnapshotPreBarrier, processLatencyMarker, processLatencyMarker1, processLatencyMarker2, processRecordAttributes, processRecordAttributes1, processRecordAttributes2, processWatermark, processWatermark1, processWatermark2, processWatermarkStatus, processWatermarkStatus, processWatermarkStatus1, processWatermarkStatus2, reportOrForwardLatencyMarker, setCurrentKey, setKeyContextElement1, setKeyContextElement2, setMailboxExecutor, setProcessingTimeService, snapshotState, useSplittableTimers
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.apache.flink.api.common.state.CheckpointListener
notifyCheckpointAborted
-
Methods inherited from interface org.apache.flink.streaming.api.operators.Input
processLatencyMarker, processRecordAttributes, processWatermark, processWatermarkStatus
-
Methods inherited from interface org.apache.flink.streaming.api.operators.KeyContext
getCurrentKey, setCurrentKey
-
Methods inherited from interface org.apache.flink.streaming.api.operators.KeyContextHandler
hasKeyContext
-
Methods inherited from interface org.apache.flink.streaming.api.operators.OneInputStreamOperator
setKeyContextElement
-
Methods inherited from interface org.apache.flink.streaming.api.operators.StreamOperator
close, finish, getMetricGroup, getOperatorAttributes, getOperatorID, initializeState, open, prepareSnapshotPreBarrier, setKeyContextElement1, setKeyContextElement2, snapshotState
-
-
-
-
Constructor Detail
-
GlobalCommitterOperator
public GlobalCommitterOperator(SerializableSupplier<Committer<CommT>> committerFactory, SerializableSupplier<SimpleVersionedSerializer<CommT>> committableSerializerFactory, boolean commitOnInput)
-
-
Method Detail
-
setup
protected void setup(StreamTask<?,?> containingTask, StreamConfig config, Output<StreamRecord<Void>> output)
- Overrides:
setup
in classAbstractStreamOperator<Void>
-
snapshotState
public void snapshotState(StateSnapshotContext context) throws Exception
Description copied from class:AbstractStreamOperator
Stream operators with state, which want to participate in a snapshot need to override this hook method.- Specified by:
snapshotState
in interfaceStreamOperatorStateHandler.CheckpointedStreamOperator
- Overrides:
snapshotState
in classAbstractStreamOperator<Void>
- Parameters:
context
- context that provides information and means required for taking a snapshot- Throws:
Exception
-
initializeState
public void initializeState(StateInitializationContext context) throws Exception
Description copied from class:AbstractStreamOperator
Stream operators with state which can be restored need to override this hook method.- Specified by:
initializeState
in interfaceStreamOperatorStateHandler.CheckpointedStreamOperator
- Overrides:
initializeState
in classAbstractStreamOperator<Void>
- Parameters:
context
- context that allows to register different states.- Throws:
Exception
-
notifyCheckpointComplete
public void notifyCheckpointComplete(long checkpointId) throws Exception
Description copied from interface:CheckpointListener
Notifies the listener that the checkpoint with the givencheckpointId
completed and was committed.These notifications are "best effort", meaning they can sometimes be skipped. To behave properly, implementers need to follow the "Checkpoint Subsuming Contract". Please see the
class-level JavaDocs
for details.Please note that checkpoints may generally overlap, so you cannot assume that the
notifyCheckpointComplete()
call is always for the latest prior checkpoint (or snapshot) that was taken on the function/operator implementing this interface. It might be for a checkpoint that was triggered earlier. Implementing the "Checkpoint Subsuming Contract" (see above) properly handles this situation correctly as well.Please note that throwing exceptions from this method will not cause the completed checkpoint to be revoked. Throwing exceptions will typically cause task/job failure and trigger recovery.
- Specified by:
notifyCheckpointComplete
in interfaceCheckpointListener
- Overrides:
notifyCheckpointComplete
in classAbstractStreamOperator<Void>
- Parameters:
checkpointId
- The ID of the checkpoint that has been completed.- Throws:
Exception
- This method can propagate exceptions, which leads to a failure/recovery for the task. Note that this will NOT lead to the checkpoint being revoked.
-
processElement
public void processElement(StreamRecord<CommittableMessage<CommT>> element) throws Exception
Description copied from interface:Input
Processes one element that arrived on this input of theMultipleInputStreamOperator
. This method is guaranteed to not be called concurrently with other methods of the operator.- Specified by:
processElement
in interfaceInput<CommT>
- Throws:
Exception
-
-