public class HybridSourceSplitEnumerator extends Object implements SplitEnumerator<HybridSourceSplit,HybridSourceEnumeratorState>
This enumerator delegates to the current underlying split enumerator and transitions to the
next source once all readers have indicated via SourceReaderFinishedEvent
that all input
was consumed.
Switching between enumerators occurs by creating the new enumerator via Source.createEnumerator(SplitEnumeratorContext)
. The start position can be fixed at pipeline
construction time through the source or supplied at switch time through a converter function by
using the end state of the previous enumerator.
During subtask recovery, splits that have been assigned since the last checkpoint will be
added back by the source coordinator. These splits may originate from a previous enumerator that
is no longer active. In that case HybridSourceSplitEnumerator
will suspend forwarding to
the current enumerator and replay the returned splits by activating the previous readers. After
returned splits were processed, delegation to the current underlying enumerator resumes.
Constructor and Description |
---|
HybridSourceSplitEnumerator(SplitEnumeratorContext<HybridSourceSplit> context,
List<org.apache.flink.connector.base.source.hybrid.HybridSource.SourceListEntry> sources,
int initialSourceIndex,
HybridSourceEnumeratorState restoredEnumeratorState) |
Modifier and Type | Method and Description |
---|---|
void |
addReader(int subtaskId)
Add a new source reader with the given subtask ID.
|
void |
addSplitsBack(List<HybridSourceSplit> splits,
int subtaskId)
Add splits back to the split enumerator.
|
void |
close()
Called to close the enumerator, in case it holds on to any resources, like threads or network
connections.
|
void |
handleSourceEvent(int subtaskId,
SourceEvent sourceEvent)
Handles a custom source event from the source reader.
|
void |
handleSplitRequest(int subtaskId,
String requesterHostname)
Handles the request for a split.
|
void |
notifyCheckpointAborted(long checkpointId)
This method is called as a notification once a distributed checkpoint has been aborted.
|
void |
notifyCheckpointComplete(long checkpointId)
We have an empty default implementation here because most source readers do not have to
implement the method.
|
HybridSourceEnumeratorState |
snapshotState(long checkpointId)
Creates a snapshot of the state of this split enumerator, to be stored in a checkpoint.
|
void |
start()
Start the split enumerator.
|
public HybridSourceSplitEnumerator(SplitEnumeratorContext<HybridSourceSplit> context, List<org.apache.flink.connector.base.source.hybrid.HybridSource.SourceListEntry> sources, int initialSourceIndex, HybridSourceEnumeratorState restoredEnumeratorState)
public void start()
SplitEnumerator
The default behavior does nothing.
start
in interface SplitEnumerator<HybridSourceSplit,HybridSourceEnumeratorState>
public void handleSplitRequest(int subtaskId, String requesterHostname)
SplitEnumerator
SourceReaderContext.sendSplitRequest()
method.handleSplitRequest
in interface SplitEnumerator<HybridSourceSplit,HybridSourceEnumeratorState>
subtaskId
- the subtask id of the source reader who sent the source event.requesterHostname
- Optional, the hostname where the requesting task is running. This
can be used to make split assignments locality-aware.public void addSplitsBack(List<HybridSourceSplit> splits, int subtaskId)
SplitEnumerator
SourceReader
fails and there are splits assigned to it after the last successful checkpoint.addSplitsBack
in interface SplitEnumerator<HybridSourceSplit,HybridSourceEnumeratorState>
splits
- The splits to add back to the enumerator for reassignment.subtaskId
- The id of the subtask to which the returned splits belong.public void addReader(int subtaskId)
SplitEnumerator
addReader
in interface SplitEnumerator<HybridSourceSplit,HybridSourceEnumeratorState>
subtaskId
- the subtask ID of the new source reader.public HybridSourceEnumeratorState snapshotState(long checkpointId) throws Exception
SplitEnumerator
The snapshot should contain the latest state of the enumerator: It should assume that all
operations that happened before the snapshot have successfully completed. For example all
splits assigned to readers via SplitEnumeratorContext.assignSplit(SourceSplit, int)
and SplitEnumeratorContext.assignSplits(SplitsAssignment)
) don't need to be included
in the snapshot anymore.
This method takes the ID of the checkpoint for which the state is snapshotted. Most implementations should be able to ignore this parameter, because for the contents of the snapshot, it doesn't matter for which checkpoint it gets created. This parameter can be interesting for source connectors with external systems where those systems are themselves aware of checkpoints; for example in cases where the enumerator notifies that system about a specific checkpoint being triggered.
snapshotState
in interface SplitEnumerator<HybridSourceSplit,HybridSourceEnumeratorState>
checkpointId
- The ID of the checkpoint for which the snapshot is created.Exception
- when the snapshot cannot be taken.public void notifyCheckpointComplete(long checkpointId) throws Exception
SplitEnumerator
notifyCheckpointComplete
in interface CheckpointListener
notifyCheckpointComplete
in interface SplitEnumerator<HybridSourceSplit,HybridSourceEnumeratorState>
checkpointId
- The ID of the checkpoint that has been completed.Exception
- This method can propagate exceptions, which leads to a failure/recovery for
the task. Note that this will NOT lead to the checkpoint being revoked.CheckpointListener.notifyCheckpointComplete(long)
public void notifyCheckpointAborted(long checkpointId) throws Exception
CheckpointListener
Important: The fact that a checkpoint has been aborted does NOT mean that the data
and artifacts produced between the previous checkpoint and the aborted checkpoint are to be
discarded. The expected behavior is as if this checkpoint was never triggered in the first
place, and the next successful checkpoint simply covers a longer time span. See the
"Checkpoint Subsuming Contract" in the class-level JavaDocs
for
details.
These notifications are "best effort", meaning they can sometimes be skipped.
This method is very rarely necessary to implement. The "best effort" guarantee, together with the fact that this method should not result in discarding any data (per the "Checkpoint Subsuming Contract") means it is mainly useful for earlier cleanups of auxiliary resources. One example is to pro-actively clear a local per-checkpoint state cache upon checkpoint failure.
notifyCheckpointAborted
in interface CheckpointListener
checkpointId
- The ID of the checkpoint that has been aborted.Exception
- This method can propagate exceptions, which leads to a failure/recovery for
the task or job.public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent)
SplitEnumerator
This method has a default implementation that does nothing, because it is only required to
be implemented by some sources, which have a custom event protocol between reader and
enumerator. The common events for reader registration and split requests are not dispatched
to this method, but rather invoke the SplitEnumerator.addReader(int)
and SplitEnumerator.handleSplitRequest(int, String)
methods.
handleSourceEvent
in interface SplitEnumerator<HybridSourceSplit,HybridSourceEnumeratorState>
subtaskId
- the subtask id of the source reader who sent the source event.sourceEvent
- the source event from the source reader.public void close() throws IOException
SplitEnumerator
close
in interface AutoCloseable
close
in interface SplitEnumerator<HybridSourceSplit,HybridSourceEnumeratorState>
IOException
Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.