Class HybridSourceSplitEnumerator
- java.lang.Object
-
- org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator
-
- All Implemented Interfaces:
AutoCloseable
,CheckpointListener
,SplitEnumerator<HybridSourceSplit,HybridSourceEnumeratorState>
,SupportsBatchSnapshot
public class HybridSourceSplitEnumerator extends Object implements SplitEnumerator<HybridSourceSplit,HybridSourceEnumeratorState>, SupportsBatchSnapshot
Wraps the actual split enumerators and facilitates source switching. Enumerators are created lazily when source switch occurs to support runtime position conversion.This enumerator delegates to the current underlying split enumerator and transitions to the next source once all readers have indicated via
SourceReaderFinishedEvent
that all input was consumed.Switching between enumerators occurs by creating the new enumerator via
Source.createEnumerator(SplitEnumeratorContext)
. The start position can be fixed at pipeline construction time through the source or supplied at switch time through a converter function by using the end state of the previous enumerator.During subtask recovery, splits that have been assigned since the last checkpoint will be added back by the source coordinator. These splits may originate from a previous enumerator that is no longer active. In that case
HybridSourceSplitEnumerator
will suspend forwarding to the current enumerator and replay the returned splits by activating the previous readers. After returned splits were processed, delegation to the current underlying enumerator resumes.
-
-
Constructor Summary
Constructors Constructor Description HybridSourceSplitEnumerator(SplitEnumeratorContext<HybridSourceSplit> context, List<org.apache.flink.connector.base.source.hybrid.HybridSource.SourceListEntry> sources, int initialSourceIndex, HybridSourceEnumeratorState restoredEnumeratorState)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
addReader(int subtaskId)
Add a new source reader with the given subtask ID.void
addSplitsBack(List<HybridSourceSplit> splits, int subtaskId)
Add splits back to the split enumerator.void
close()
Called to close the enumerator, in case it holds on to any resources, like threads or network connections.void
handleSourceEvent(int subtaskId, SourceEvent sourceEvent)
Handles a custom source event from the source reader.void
handleSplitRequest(int subtaskId, String requesterHostname)
Handles the request for a split.void
notifyCheckpointAborted(long checkpointId)
This method is called as a notification once a distributed checkpoint has been aborted.void
notifyCheckpointComplete(long checkpointId)
We have an empty default implementation here because most source readers do not have to implement the method.HybridSourceEnumeratorState
snapshotState(long checkpointId)
Creates a snapshot of the state of this split enumerator, to be stored in a checkpoint.void
start()
Start the split enumerator.
-
-
-
Constructor Detail
-
HybridSourceSplitEnumerator
public HybridSourceSplitEnumerator(SplitEnumeratorContext<HybridSourceSplit> context, List<org.apache.flink.connector.base.source.hybrid.HybridSource.SourceListEntry> sources, int initialSourceIndex, HybridSourceEnumeratorState restoredEnumeratorState)
-
-
Method Detail
-
start
public void start()
Description copied from interface:SplitEnumerator
Start the split enumerator.The default behavior does nothing.
- Specified by:
start
in interfaceSplitEnumerator<HybridSourceSplit,HybridSourceEnumeratorState>
-
handleSplitRequest
public void handleSplitRequest(int subtaskId, String requesterHostname)
Description copied from interface:SplitEnumerator
Handles the request for a split. This method is called when the reader with the given subtask id calls theSourceReaderContext.sendSplitRequest()
method.- Specified by:
handleSplitRequest
in interfaceSplitEnumerator<HybridSourceSplit,HybridSourceEnumeratorState>
- Parameters:
subtaskId
- the subtask id of the source reader who sent the source event.requesterHostname
- Optional, the hostname where the requesting task is running. This can be used to make split assignments locality-aware.
-
addSplitsBack
public void addSplitsBack(List<HybridSourceSplit> splits, int subtaskId)
Description copied from interface:SplitEnumerator
Add splits back to the split enumerator. This will only happen when aSourceReader
fails and there are splits assigned to it after the last successful checkpoint.- Specified by:
addSplitsBack
in interfaceSplitEnumerator<HybridSourceSplit,HybridSourceEnumeratorState>
- Parameters:
splits
- The splits to add back to the enumerator for reassignment.subtaskId
- The id of the subtask to which the returned splits belong.
-
addReader
public void addReader(int subtaskId)
Description copied from interface:SplitEnumerator
Add a new source reader with the given subtask ID.- Specified by:
addReader
in interfaceSplitEnumerator<HybridSourceSplit,HybridSourceEnumeratorState>
- Parameters:
subtaskId
- the subtask ID of the new source reader.
-
snapshotState
public HybridSourceEnumeratorState snapshotState(long checkpointId) throws Exception
Description copied from interface:SplitEnumerator
Creates a snapshot of the state of this split enumerator, to be stored in a checkpoint.The snapshot should contain the latest state of the enumerator: It should assume that all operations that happened before the snapshot have successfully completed. For example all splits assigned to readers via
SplitEnumeratorContext.assignSplit(SourceSplit, int)
andSplitEnumeratorContext.assignSplits(SplitsAssignment)
) don't need to be included in the snapshot anymore.This method takes the ID of the checkpoint for which the state is snapshotted. Most implementations should be able to ignore this parameter, because for the contents of the snapshot, it doesn't matter for which checkpoint it gets created. This parameter can be interesting for source connectors with external systems where those systems are themselves aware of checkpoints; for example in cases where the enumerator notifies that system about a specific checkpoint being triggered.
- Specified by:
snapshotState
in interfaceSplitEnumerator<HybridSourceSplit,HybridSourceEnumeratorState>
- Parameters:
checkpointId
- The ID of the checkpoint for which the snapshot is created.- Returns:
- an object containing the state of the split enumerator.
- Throws:
Exception
- when the snapshot cannot be taken.
-
notifyCheckpointComplete
public void notifyCheckpointComplete(long checkpointId) throws Exception
Description copied from interface:SplitEnumerator
We have an empty default implementation here because most source readers do not have to implement the method.- Specified by:
notifyCheckpointComplete
in interfaceCheckpointListener
- Specified by:
notifyCheckpointComplete
in interfaceSplitEnumerator<HybridSourceSplit,HybridSourceEnumeratorState>
- Parameters:
checkpointId
- The ID of the checkpoint that has been completed.- Throws:
Exception
- This method can propagate exceptions, which leads to a failure/recovery for the task. Note that this will NOT lead to the checkpoint being revoked.- See Also:
CheckpointListener.notifyCheckpointComplete(long)
-
notifyCheckpointAborted
public void notifyCheckpointAborted(long checkpointId) throws Exception
Description copied from interface:CheckpointListener
This method is called as a notification once a distributed checkpoint has been aborted.Important: The fact that a checkpoint has been aborted does NOT mean that the data and artifacts produced between the previous checkpoint and the aborted checkpoint are to be discarded. The expected behavior is as if this checkpoint was never triggered in the first place, and the next successful checkpoint simply covers a longer time span. See the "Checkpoint Subsuming Contract" in the
class-level JavaDocs
for details.These notifications are "best effort", meaning they can sometimes be skipped.
This method is very rarely necessary to implement. The "best effort" guarantee, together with the fact that this method should not result in discarding any data (per the "Checkpoint Subsuming Contract") means it is mainly useful for earlier cleanups of auxiliary resources. One example is to pro-actively clear a local per-checkpoint state cache upon checkpoint failure.
- Specified by:
notifyCheckpointAborted
in interfaceCheckpointListener
- Parameters:
checkpointId
- The ID of the checkpoint that has been aborted.- Throws:
Exception
- This method can propagate exceptions, which leads to a failure/recovery for the task or job.
-
handleSourceEvent
public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent)
Description copied from interface:SplitEnumerator
Handles a custom source event from the source reader.This method has a default implementation that does nothing, because it is only required to be implemented by some sources, which have a custom event protocol between reader and enumerator. The common events for reader registration and split requests are not dispatched to this method, but rather invoke the
SplitEnumerator.addReader(int)
andSplitEnumerator.handleSplitRequest(int, String)
methods.- Specified by:
handleSourceEvent
in interfaceSplitEnumerator<HybridSourceSplit,HybridSourceEnumeratorState>
- Parameters:
subtaskId
- the subtask id of the source reader who sent the source event.sourceEvent
- the source event from the source reader.
-
close
public void close() throws IOException
Description copied from interface:SplitEnumerator
Called to close the enumerator, in case it holds on to any resources, like threads or network connections.- Specified by:
close
in interfaceAutoCloseable
- Specified by:
close
in interfaceSplitEnumerator<HybridSourceSplit,HybridSourceEnumeratorState>
- Throws:
IOException
-
-