@Public public interface CheckpointListener
It is NOT guaranteed that the implementation will receive a notification for each completed or aborted checkpoint. While these notifications come in most cases, notifications might not happen, for example, when a failure/restore happens directly after a checkpoint completed.
To handle this correctly, implementation should follow the "Checkpoint Subsuming Contract" described below.
The notifications from this interface come "after the fact", meaning after the checkpoint has been aborted or completed. Throwing an exception will not change the completion/abortion of the checkpoint.
Exceptions thrown from this method result in task- or job failure and recovery.
Checkpoint IDs are strictly increasing. A checkpoint with higher ID always subsumes a checkpoint with lower ID. For example, when checkpoint T is confirmed complete, the code can assume that no checkpoints with lower ID (T-1, T-2, etc.) are pending any more. No checkpoint with lower ID will ever be committed after a checkpoint with a higher ID.
This does not necessarily mean that all of the previous checkpoints actually completed successfully. It is also possible that some checkpoint timed out or was not fully acknowledged by all tasks. Implementations must then behave as if that checkpoint did not happen. The recommended way to do this is to let the completion of a new checkpoint (higher ID) subsume the completion of all earlier checkpoints (lower ID).
This property is easy to achieve for cases where increasing "offsets", "watermarks", or other progress indicators are communicated on checkpoint completion. A newer checkpoint will have a higher "offset" (more progress) than the previous checkpoint, so it automatically subsumes the previous one. Remember the "offset to commit" for a checkpoint ID and commit it when that specific checkpoint (by ID) gets the notification that it is complete.
If you need to publish some specific artifacts (like files) or acknowledge some specific IDs after a checkpoint, you can follow a pattern like below.
The following is a sample pattern how applications can publish specific artifacts on checkpoint. Examples would be operators that acknowledge specific IDs or publish specific files on checkpoint.
long =>
List<Artifact>
", mapping from the id of the checkpoint when the artifact was ready
to the artifacts. /li>
notifyCheckpointComplete()
publish all IDs/artifacts from the "pending set" up
to the checkpoint with that ID. Remove these from the "pending set".
That way, even if some checkpoints did not complete, or if the notification that they completed got lost, the artifacts will be published as part of the next checkpoint that completes.
Modifier and Type | Method and Description |
---|---|
default void |
notifyCheckpointAborted(long checkpointId)
This method is called as a notification once a distributed checkpoint has been aborted.
|
void |
notifyCheckpointComplete(long checkpointId)
Notifies the listener that the checkpoint with the given
checkpointId completed and
was committed. |
void notifyCheckpointComplete(long checkpointId) throws Exception
checkpointId
completed and
was committed.
These notifications are "best effort", meaning they can sometimes be skipped. To behave
properly, implementers need to follow the "Checkpoint Subsuming Contract". Please see the
class-level JavaDocs
for details.
Please note that checkpoints may generally overlap, so you cannot assume that the notifyCheckpointComplete()
call is always for the latest prior checkpoint (or snapshot) that
was taken on the function/operator implementing this interface. It might be for a checkpoint
that was triggered earlier. Implementing the "Checkpoint Subsuming Contract" (see above)
properly handles this situation correctly as well.
Please note that throwing exceptions from this method will not cause the completed checkpoint to be revoked. Throwing exceptions will typically cause task/job failure and trigger recovery.
checkpointId
- The ID of the checkpoint that has been completed.Exception
- This method can propagate exceptions, which leads to a failure/recovery for
the task. Note that this will NOT lead to the checkpoint being revoked.default void notifyCheckpointAborted(long checkpointId) throws Exception
Important: The fact that a checkpoint has been aborted does NOT mean that the data
and artifacts produced between the previous checkpoint and the aborted checkpoint are to be
discarded. The expected behavior is as if this checkpoint was never triggered in the first
place, and the next successful checkpoint simply covers a longer time span. See the
"Checkpoint Subsuming Contract" in the class-level JavaDocs
for
details.
These notifications are "best effort", meaning they can sometimes be skipped.
This method is very rarely necessary to implement. The "best effort" guarantee, together with the fact that this method should not result in discarding any data (per the "Checkpoint Subsuming Contract") means it is mainly useful for earlier cleanups of auxiliary resources. One example is to pro-actively clear a local per-checkpoint state cache upon checkpoint failure.
checkpointId
- The ID of the checkpoint that has been aborted.Exception
- This method can propagate exceptions, which leads to a failure/recovery for
the task or job.Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.