Interface CheckpointListener
-
- All Known Subinterfaces:
AsyncKeyedStateBackend<K>
,ExternallyInducedSourceReader<T,SplitT>
,InternalCheckpointListener
,MultipleInputStreamOperator<OUT>
,OneInputStreamOperator<IN,OUT>
,OperatorCoordinator
,OperatorCoordinatorCheckpointContext
,SourceReader<T,SplitT>
,SplitEnumerator<SplitT,CheckpointT>
,StreamOperator<OUT>
,TaskStateManager
,TwoInputStreamOperator<IN1,IN2,OUT>
,YieldingOperator<OUT>
- All Known Implementing Classes:
AbstractArrowPythonAggregateFunctionOperator
,AbstractAsyncStateStreamingJoinOperator
,AbstractAsyncStateStreamOperator
,AbstractAsyncStateStreamOperatorV2
,AbstractAsyncStateUdfStreamOperator
,AbstractEmbeddedDataStreamPythonFunctionOperator
,AbstractEmbeddedPythonFunctionOperator
,AbstractEmbeddedStatelessFunctionOperator
,AbstractExternalDataStreamPythonFunctionOperator
,AbstractExternalOneInputPythonFunctionOperator
,AbstractExternalPythonFunctionOperator
,AbstractExternalTwoInputPythonFunctionOperator
,AbstractKeyedStateBackend
,AbstractMapBundleOperator
,AbstractOneInputEmbeddedPythonFunctionOperator
,AbstractOneInputPythonFunctionOperator
,AbstractPythonFunctionOperator
,AbstractPythonScalarFunctionOperator
,AbstractPythonStreamAggregateOperator
,AbstractPythonStreamGroupAggregateOperator
,AbstractStatelessFunctionOperator
,AbstractStreamArrowPythonBoundedRangeOperator
,AbstractStreamArrowPythonBoundedRowsOperator
,AbstractStreamArrowPythonOverWindowAggregateFunctionOperator
,AbstractStreamingJoinOperator
,AbstractStreamingWriter
,AbstractStreamOperator
,AbstractStreamOperatorV2
,AbstractTwoInputEmbeddedPythonFunctionOperator
,AbstractUdfStreamOperator
,AggregateWindowOperator
,AlignedWindowTableFunctionOperator
,ArrowPythonScalarFunctionOperator
,AsyncKeyedStateBackendAdaptor
,AsyncStateStreamingJoinOperator
,AsyncWaitOperator
,BaseTwoInputStreamOperatorWithStateRetention
,BatchArrowPythonGroupAggregateFunctionOperator
,BatchArrowPythonGroupWindowAggregateFunctionOperator
,BatchArrowPythonOverWindowAggregateFunctionOperator
,BatchCoBroadcastWithKeyedOperator
,BatchCoBroadcastWithNonKeyedOperator
,BatchCompactCoordinator
,BatchCompactOperator
,BatchFileWriter
,BatchGroupedReduceOperator
,BatchMultipleInputStreamOperator
,BootstrapStreamTaskRunner
,BroadcastStateBootstrapOperator
,BufferDataOverWindowOperator
,CacheTransformationTranslator.IdentityStreamOperator
,CacheTransformationTranslator.NoOpStreamOperator
,CepOperator
,ChangelogKeyedStateBackend
,CoBroadcastWithKeyedOperator
,CoBroadcastWithNonKeyedOperator
,CollectSinkFunction
,CollectSinkOperator
,CollectSinkOperatorCoordinator
,CompactCoordinator
,CompactCoordinator
,CompactCoordinatorStateHandler
,CompactFileWriter
,CompactOperator
,CompactorOperator
,CompactorOperatorStateHandler
,ConstraintEnforcer
,ContinuousFileReaderOperator
,ContinuousFileSplitEnumerator
,CoProcessOperator
,CoStreamFlatMap
,CoStreamMap
,DelegateOperatorTransformation.DelegateOperator
,DoubleEmittingSourceReaderWithCheckpointsInBetween
,DynamicFileSplitEnumerator
,DynamicFilteringDataCollectorOperator
,DynamicFilteringDataCollectorOperatorCoordinator
,EmbeddedPythonBatchCoBroadcastProcessOperator
,EmbeddedPythonBatchKeyedCoBroadcastProcessOperator
,EmbeddedPythonCoProcessOperator
,EmbeddedPythonKeyedCoProcessOperator
,EmbeddedPythonKeyedProcessOperator
,EmbeddedPythonProcessOperator
,EmbeddedPythonScalarFunctionOperator
,EmbeddedPythonTableFunctionOperator
,EmbeddedPythonWindowOperator
,EvictingWindowOperator
,ExternalPythonBatchCoBroadcastProcessOperator
,ExternalPythonBatchKeyedCoBroadcastProcessOperator
,ExternalPythonCoProcessOperator
,ExternalPythonKeyedCoProcessOperator
,ExternalPythonKeyedProcessOperator
,ExternalPythonProcessOperator
,FailureMapper
,FileSourceReader
,FiniteTestSource
,ForStIncrementalSnapshotStrategy
,ForStKeyedStateBackend
,ForStNativeFullSnapshotStrategy
,ForStSnapshotStrategyBase
,ForStSyncKeyedStateBackend
,FromElementsSourceReader
,FusionStreamOperatorBase
,GeneratingIteratorSourceReader
,GenericWriteAheadSink
,GlobalCommitterOperator
,GlobalRuntimeFilterBuilderOperator
,GroupReduceOperator
,HashJoinOperator
,HeapKeyedStateBackend
,HybridSourceReader
,HybridSourceSplitEnumerator
,InputConversionOperator
,IntervalJoinOperator
,IteratorSourceEnumerator
,IteratorSourceReader
,IteratorSourceReaderBase
,KeyedCoProcessOperator
,KeyedCoProcessOperatorWithWatermarkDelay
,KeyedMapBundleOperator
,KeyedProcessOperator
,KeyedProcessOperator
,KeyedSortPartitionOperator
,KeyedStateBootstrapOperator
,KeyedTwoInputBroadcastProcessOperator
,KeyedTwoInputNonBroadcastProcessOperator
,KeyedTwoOutputProcessOperator
,LegacyKeyedCoProcessOperator
,LegacyKeyedProcessOperator
,LimitOperator
,LocalRuntimeFilterBuilderOperator
,LocalSlicingWindowAggOperator
,MapBundleOperator
,MapPartitionOperator
,MiniBatchStreamingJoinOperator
,MultipleInputStreamOperatorBase
,NonBufferOverWindowOperator
,NoOpEnumerator
,OperatorCoordinatorHolder
,OutputConversionOperator
,PartitionAggregateOperator
,PartitionCommitter
,PartitionReduceOperator
,ProcessOperator
,ProcessOperator
,ProcTimeMiniBatchAssignerOperator
,ProcTimeSortOperator
,PythonScalarFunctionOperator
,PythonStreamGroupAggregateOperator
,PythonStreamGroupTableAggregateOperator
,PythonStreamGroupWindowAggregateOperator
,PythonTableFunctionOperator
,QueryableAppendingStateOperator
,QueryableValueStateOperator
,RankOperator
,RateLimitedSourceReader
,RecreateOnResetOperatorCoordinator
,RocksDBKeyedStateBackend
,RocksDBSnapshotStrategyBase
,RocksIncrementalSnapshotStrategy
,RocksNativeFullSnapshotStrategy
,RowKindSetter
,RowTimeMiniBatchAssginerOperator
,RowTimeSortOperator
,SingleThreadMultiplexSourceReaderBase
,SinkOperator
,SinkUpsertMaterializer
,SortLimitOperator
,SortMergeJoinOperator
,SortOperator
,SortPartitionOperator
,SourceCoordinator
,SourceOperator
,SourceReaderBase
,StateBootstrapOperator
,StateBootstrapWrapperOperator
,StaticFileSplitEnumerator
,StreamArrowPythonGroupWindowAggregateFunctionOperator
,StreamArrowPythonProcTimeBoundedRangeOperator
,StreamArrowPythonProcTimeBoundedRowsOperator
,StreamArrowPythonRowTimeBoundedRangeOperator
,StreamArrowPythonRowTimeBoundedRowsOperator
,StreamFilter
,StreamFlatMap
,StreamGroupedReduceOperator
,StreamingFileSink
,StreamingFileWriter
,StreamingJoinOperator
,StreamingSemiAntiJoinOperator
,StreamMap
,StreamProject
,StreamRecordTimestampInserter
,StreamSink
,StreamSortOperator
,StreamSource
,TableAggregateWindowOperator
,TableStreamOperator
,TaskStateManagerImpl
,TemporalProcessTimeJoinOperator
,TemporalRowTimeJoinOperator
,TimestampsAndWatermarksOperator
,TwoInputBroadcastProcessOperator
,TwoInputNonBroadcastProcessOperator
,TwoOutputProcessOperator
,TwoPhaseCommitSinkFunction
,UnalignedWindowTableFunctionOperator
,UnionStreamOperator
,WatermarkAssignerOperator
,WindowAggOperator
,WindowJoinOperator
,WindowOperator
,WindowOperator
,WindowTableFunctionOperatorBase
@Public public interface CheckpointListener
This interface is typically only needed for transactional interaction with the "outside world", like committing external side effects on checkpoints. An example is committing external transactions once a checkpoint completes.Invocation Guarantees
It is NOT guaranteed that the implementation will receive a notification for each completed or aborted checkpoint. While these notifications come in most cases, notifications might not happen, for example, when a failure/restore happens directly after a checkpoint completed.
To handle this correctly, implementation should follow the "Checkpoint Subsuming Contract" described below.
Exceptions
The notifications from this interface come "after the fact", meaning after the checkpoint has been aborted or completed. Throwing an exception will not change the completion/abortion of the checkpoint.
Exceptions thrown from this method result in task- or job failure and recovery.
Checkpoint Subsuming Contract
Checkpoint IDs are strictly increasing. A checkpoint with higher ID always subsumes a checkpoint with lower ID. For example, when checkpoint T is confirmed complete, the code can assume that no checkpoints with lower ID (T-1, T-2, etc.) are pending any more. No checkpoint with lower ID will ever be committed after a checkpoint with a higher ID.
This does not necessarily mean that all of the previous checkpoints actually completed successfully. It is also possible that some checkpoint timed out or was not fully acknowledged by all tasks. Implementations must then behave as if that checkpoint did not happen. The recommended way to do this is to let the completion of a new checkpoint (higher ID) subsume the completion of all earlier checkpoints (lower ID).
This property is easy to achieve for cases where increasing "offsets", "watermarks", or other progress indicators are communicated on checkpoint completion. A newer checkpoint will have a higher "offset" (more progress) than the previous checkpoint, so it automatically subsumes the previous one. Remember the "offset to commit" for a checkpoint ID and commit it when that specific checkpoint (by ID) gets the notification that it is complete.
If you need to publish some specific artifacts (like files) or acknowledge some specific IDs after a checkpoint, you can follow a pattern like below.
Implementing Checkpoint Subsuming for Committing Artifacts
The following is a sample pattern how applications can publish specific artifacts on checkpoint. Examples would be operators that acknowledge specific IDs or publish specific files on checkpoint.
- During processing, have two sets of artifacts.
- A "ready set": Artifacts that are ready to be published as part of the next checkpoint. Artifacts are added to this set as soon as they are ready to be committed. This set is "transient", it is not stored in Flink's state persisted anywhere.
- A "pending set": Artifacts being committed with a checkpoint. The actual publishing
happens when the checkpoint is complete. This is a map of "
long => List<Artifact>
", mapping from the id of the checkpoint when the artifact was ready to the artifacts. /li>
- On checkpoint, add that set of artifacts from the "ready set" to the "pending set", associated with the checkpoint ID. The whole "pending set" gets stored in the checkpoint state.
- On
notifyCheckpointComplete()
publish all IDs/artifacts from the "pending set" up to the checkpoint with that ID. Remove these from the "pending set".
That way, even if some checkpoints did not complete, or if the notification that they completed got lost, the artifacts will be published as part of the next checkpoint that completes.
-
-
Method Summary
All Methods Instance Methods Abstract Methods Default Methods Modifier and Type Method Description default void
notifyCheckpointAborted(long checkpointId)
This method is called as a notification once a distributed checkpoint has been aborted.void
notifyCheckpointComplete(long checkpointId)
Notifies the listener that the checkpoint with the givencheckpointId
completed and was committed.
-
-
-
Method Detail
-
notifyCheckpointComplete
void notifyCheckpointComplete(long checkpointId) throws Exception
Notifies the listener that the checkpoint with the givencheckpointId
completed and was committed.These notifications are "best effort", meaning they can sometimes be skipped. To behave properly, implementers need to follow the "Checkpoint Subsuming Contract". Please see the
class-level JavaDocs
for details.Please note that checkpoints may generally overlap, so you cannot assume that the
notifyCheckpointComplete()
call is always for the latest prior checkpoint (or snapshot) that was taken on the function/operator implementing this interface. It might be for a checkpoint that was triggered earlier. Implementing the "Checkpoint Subsuming Contract" (see above) properly handles this situation correctly as well.Please note that throwing exceptions from this method will not cause the completed checkpoint to be revoked. Throwing exceptions will typically cause task/job failure and trigger recovery.
- Parameters:
checkpointId
- The ID of the checkpoint that has been completed.- Throws:
Exception
- This method can propagate exceptions, which leads to a failure/recovery for the task. Note that this will NOT lead to the checkpoint being revoked.
-
notifyCheckpointAborted
default void notifyCheckpointAborted(long checkpointId) throws Exception
This method is called as a notification once a distributed checkpoint has been aborted.Important: The fact that a checkpoint has been aborted does NOT mean that the data and artifacts produced between the previous checkpoint and the aborted checkpoint are to be discarded. The expected behavior is as if this checkpoint was never triggered in the first place, and the next successful checkpoint simply covers a longer time span. See the "Checkpoint Subsuming Contract" in the
class-level JavaDocs
for details.These notifications are "best effort", meaning they can sometimes be skipped.
This method is very rarely necessary to implement. The "best effort" guarantee, together with the fact that this method should not result in discarding any data (per the "Checkpoint Subsuming Contract") means it is mainly useful for earlier cleanups of auxiliary resources. One example is to pro-actively clear a local per-checkpoint state cache upon checkpoint failure.
- Parameters:
checkpointId
- The ID of the checkpoint that has been aborted.- Throws:
Exception
- This method can propagate exceptions, which leads to a failure/recovery for the task or job.
-
-