Type
- The type of the messages created by the source.UId
- The type of unique IDs which may be used to acknowledge elements.@PublicEvolving public abstract class MessageAcknowledgingSourceBase<Type,UId> extends RichSourceFunction<Type> implements CheckpointedFunction, CheckpointListener
The mechanism for this source assumes that messages are identified by a unique ID. When messages are taken from the message queue, the message must not be dropped immediately, but must be retained until acknowledged. Messages that are not acknowledged within a certain time interval will be served again (to a different connection, established by the recovered source).
Note that this source can give no guarantees about message order in the case of failures, because messages that were retrieved but not yet acknowledged will be returned later again, after a set of messages that was not retrieved before the failure.
Internally, this source gathers the IDs of elements it emits. Per checkpoint, the IDs are stored and acknowledged when the checkpoint is complete. That way, no message is acknowledged unless it is certain that it has been successfully processed throughout the topology and the updates to any state caused by that message are persistent.
All messages that are emitted and successfully processed by the streaming program will eventually be acknowledged. In corner cases, the source may receive certain IDs multiple times, if a failure occurs while acknowledging. To cope with this situation, an additional Set stores all processed IDs. IDs are only removed after they have been acknowledged.
A typical way to use this base in a source function is by implementing a run() method as follows:
public void run(SourceContext<Type> ctx) throws Exception {
while (running) {
Message msg = queue.retrieve();
synchronized (ctx.getCheckpointLock()) {
if (addId(msg.getMessageId())) {
ctx.collect(msg.getMessageData());
}
}
}
}
NOTE: This source has a parallelism of 1
.SourceFunction.SourceContext<T>
Modifier and Type | Field and Description |
---|---|
protected ArrayDeque<Tuple2<Long,Set<UId>>> |
pendingCheckpoints
The list with IDs from checkpoints that were triggered, but not yet completed or notified of
completion.
|
Modifier | Constructor and Description |
---|---|
protected |
MessageAcknowledgingSourceBase(Class<UId> idClass)
Creates a new MessageAcknowledgingSourceBase for IDs of the given type.
|
protected |
MessageAcknowledgingSourceBase(TypeInformation<UId> idTypeInfo)
Creates a new MessageAcknowledgingSourceBase for IDs of the given type.
|
Modifier and Type | Method and Description |
---|---|
protected abstract void |
acknowledgeIDs(long checkpointId,
Set<UId> uIds)
This method must be implemented to acknowledge the given set of IDs back to the message
queue.
|
protected boolean |
addId(UId uid)
Adds an ID to be stored with the current checkpoint.
|
void |
close()
Tear-down method for the user code.
|
void |
initializeState(FunctionInitializationContext context)
This method is called when the parallel function instance is created during distributed
execution.
|
void |
notifyCheckpointAborted(long checkpointId)
This method is called as a notification once a distributed checkpoint has been aborted.
|
void |
notifyCheckpointComplete(long checkpointId)
Notifies the listener that the checkpoint with the given
checkpointId completed and
was committed. |
void |
snapshotState(FunctionSnapshotContext context)
This method is called when a snapshot for a checkpoint is requested.
|
getIterationRuntimeContext, getRuntimeContext, open, setRuntimeContext
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
cancel, run
protected transient ArrayDeque<Tuple2<Long,Set<UId>>> pendingCheckpoints
protected MessageAcknowledgingSourceBase(Class<UId> idClass)
idClass
- The class of the message ID type, used to create a serializer for the message
IDs.protected MessageAcknowledgingSourceBase(TypeInformation<UId> idTypeInfo)
idTypeInfo
- The type information of the message ID type, used to create a serializer
for the message IDs.public void initializeState(FunctionInitializationContext context) throws Exception
CheckpointedFunction
initializeState
in interface CheckpointedFunction
context
- the context for initializing the operatorException
- Thrown, if state could not be created ot restored.public void close() throws Exception
RichFunction
This method can be used for clean up work.
close
in interface RichFunction
close
in class AbstractRichFunction
Exception
- Implementations may forward exceptions, which are caught by the runtime.
When the runtime catches an exception, it aborts the task and lets the fail-over logic
decide whether to retry the task execution.protected abstract void acknowledgeIDs(long checkpointId, Set<UId> uIds)
uIds
- The list od IDs to acknowledge.protected boolean addId(UId uid)
uid
- The ID to add.public void snapshotState(FunctionSnapshotContext context) throws Exception
CheckpointedFunction
FunctionInitializationContext
when the Function was initialized, or offered now by FunctionSnapshotContext
itself.snapshotState
in interface CheckpointedFunction
context
- the context for drawing a snapshot of the operatorException
- Thrown, if state could not be created ot restored.public void notifyCheckpointComplete(long checkpointId) throws Exception
CheckpointListener
checkpointId
completed and
was committed.
These notifications are "best effort", meaning they can sometimes be skipped. To behave
properly, implementers need to follow the "Checkpoint Subsuming Contract". Please see the
class-level JavaDocs
for details.
Please note that checkpoints may generally overlap, so you cannot assume that the notifyCheckpointComplete()
call is always for the latest prior checkpoint (or snapshot) that
was taken on the function/operator implementing this interface. It might be for a checkpoint
that was triggered earlier. Implementing the "Checkpoint Subsuming Contract" (see above)
properly handles this situation correctly as well.
Please note that throwing exceptions from this method will not cause the completed checkpoint to be revoked. Throwing exceptions will typically cause task/job failure and trigger recovery.
notifyCheckpointComplete
in interface CheckpointListener
checkpointId
- The ID of the checkpoint that has been completed.Exception
- This method can propagate exceptions, which leads to a failure/recovery for
the task. Not that this will NOT lead to the checkpoint being revoked.public void notifyCheckpointAborted(long checkpointId)
CheckpointListener
Important: The fact that a checkpoint has been aborted does NOT mean that the data
and artifacts produced between the previous checkpoint and the aborted checkpoint are to be
discarded. The expected behavior is as if this checkpoint was never triggered in the first
place, and the next successful checkpoint simply covers a longer time span. See the
"Checkpoint Subsuming Contract" in the class-level JavaDocs
for
details.
These notifications are "best effort", meaning they can sometimes be skipped.
This method is very rarely necessary to implement. The "best effort" guarantee, together with the fact that this method should not result in discarding any data (per the "Checkpoint Subsuming Contract") means it is mainly useful for earlier cleanups of auxiliary resources. One example is to pro-actively clear a local per-checkpoint state cache upon checkpoint failure.
notifyCheckpointAborted
in interface CheckpointListener
checkpointId
- The ID of the checkpoint that has been aborted.Copyright © 2014–2023 The Apache Software Foundation. All rights reserved.