Class SourceReaderBase<E,T,SplitT extends SourceSplit,SplitStateT>
- java.lang.Object
-
- org.apache.flink.connector.base.source.reader.SourceReaderBase<E,T,SplitT,SplitStateT>
-
- Type Parameters:
E
- The rich element type that contains information for split state update or timestamp extraction.T
- The final element type to emit.SplitT
- the immutable split type.SplitStateT
- the mutable type of split state.
- All Implemented Interfaces:
AutoCloseable
,CheckpointListener
,SourceReader<T,SplitT>
- Direct Known Subclasses:
SingleThreadMultiplexSourceReaderBase
@PublicEvolving public abstract class SourceReaderBase<E,T,SplitT extends SourceSplit,SplitStateT> extends Object implements SourceReader<T,SplitT>
An abstract implementation ofSourceReader
which provides some synchronization between the mail box main thread and the SourceReader internal threads. This class allows user to just provide aSplitReader
and snapshot the split state.This implementation provides the following metrics out of the box:
-
-
Field Summary
Fields Modifier and Type Field Description protected Configuration
config
The raw configurations that may be used by subclasses.protected SourceReaderContext
context
The context of this source reader.protected RecordEvaluator<T>
eofRecordEvaluator
protected SourceReaderOptions
options
The configuration for the reader.protected RecordEmitter<E,T,SplitStateT>
recordEmitter
The record emitter to handle the records read by the SplitReaders.protected SplitFetcherManager<E,SplitT>
splitFetcherManager
The split fetcher manager to run split fetchers.
-
Constructor Summary
Constructors Constructor Description SourceReaderBase(SplitFetcherManager<E,SplitT> splitFetcherManager, RecordEmitter<E,T,SplitStateT> recordEmitter, Configuration config, SourceReaderContext context)
The primary constructor for the source reader.SourceReaderBase(SplitFetcherManager<E,SplitT> splitFetcherManager, RecordEmitter<E,T,SplitStateT> recordEmitter, RecordEvaluator<T> eofRecordEvaluator, Configuration config, SourceReaderContext context)
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description void
addSplits(List<SplitT> splits)
Adds a list of splits for this reader to read.void
close()
int
getNumberOfCurrentlyAssignedSplits()
Gets the number of splits the reads has currently assigned.void
handleSourceEvents(SourceEvent sourceEvent)
Handle a custom source event sent by theSplitEnumerator
.protected abstract SplitStateT
initializedState(SplitT split)
When new splits are added to the reader.CompletableFuture<Void>
isAvailable()
Returns a future that signals that data is available from the reader.void
notifyNoMoreSplits()
This method is called when the reader is notified that it will not receive any further splits.protected abstract void
onSplitFinished(Map<String,SplitStateT> finishedSplitIds)
Handles the finished splits to clean the state if needed.void
pauseOrResumeSplits(Collection<String> splitsToPause, Collection<String> splitsToResume)
Pauses or resumes reading of individual source splits.InputStatus
pollNext(ReaderOutput<T> output)
Poll the next available record into theReaderOutput
.List<SplitT>
snapshotState(long checkpointId)
Checkpoint on the state of the source.void
start()
Start the reader.protected abstract SplitT
toSplitType(String splitId, SplitStateT splitState)
Convert a mutable SplitStateT to immutable SplitT.-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.apache.flink.api.common.state.CheckpointListener
notifyCheckpointAborted
-
Methods inherited from interface org.apache.flink.api.connector.source.SourceReader
notifyCheckpointComplete
-
-
-
-
Field Detail
-
recordEmitter
protected final RecordEmitter<E,T,SplitStateT> recordEmitter
The record emitter to handle the records read by the SplitReaders.
-
splitFetcherManager
protected final SplitFetcherManager<E,SplitT extends SourceSplit> splitFetcherManager
The split fetcher manager to run split fetchers.
-
options
protected final SourceReaderOptions options
The configuration for the reader.
-
config
protected final Configuration config
The raw configurations that may be used by subclasses.
-
context
protected SourceReaderContext context
The context of this source reader.
-
eofRecordEvaluator
@Nullable protected final RecordEvaluator<T> eofRecordEvaluator
-
-
Constructor Detail
-
SourceReaderBase
public SourceReaderBase(SplitFetcherManager<E,SplitT> splitFetcherManager, RecordEmitter<E,T,SplitStateT> recordEmitter, Configuration config, SourceReaderContext context)
The primary constructor for the source reader.The reader will use a handover queue sized as configured via
SourceReaderOptions.ELEMENT_QUEUE_CAPACITY
.
-
SourceReaderBase
public SourceReaderBase(SplitFetcherManager<E,SplitT> splitFetcherManager, RecordEmitter<E,T,SplitStateT> recordEmitter, @Nullable RecordEvaluator<T> eofRecordEvaluator, Configuration config, SourceReaderContext context)
-
-
Method Detail
-
start
public void start()
Description copied from interface:SourceReader
Start the reader.- Specified by:
start
in interfaceSourceReader<E,T>
-
pollNext
public InputStatus pollNext(ReaderOutput<T> output) throws Exception
Description copied from interface:SourceReader
Poll the next available record into theReaderOutput
.The implementation must make sure this method is non-blocking.
Although the implementation can emit multiple records into the given ReaderOutput, it is recommended not doing so. Instead, emit one record into the ReaderOutput and return a
InputStatus.MORE_AVAILABLE
to let the caller thread know there are more records available.- Specified by:
pollNext
in interfaceSourceReader<E,T>
- Returns:
- The InputStatus of the SourceReader after the method invocation.
- Throws:
Exception
-
isAvailable
public CompletableFuture<Void> isAvailable()
Description copied from interface:SourceReader
Returns a future that signals that data is available from the reader.Once the future completes, the runtime will keep calling the
SourceReader.pollNext(ReaderOutput)
method until that method returns a status other thanInputStatus.MORE_AVAILABLE
. After that, the runtime will again call this method to obtain the next future. Once that completes, it will again callSourceReader.pollNext(ReaderOutput)
and so on.The contract is the following: If the reader has data available, then all futures previously returned by this method must eventually complete. Otherwise the source might stall indefinitely.
It is not a problem to have occasional "false positives", meaning to complete a future even if no data is available. However, one should not use an "always complete" future in cases no data is available, because that will result in busy waiting loops calling
pollNext(...)
even though no data is available.- Specified by:
isAvailable
in interfaceSourceReader<E,T>
- Returns:
- a future that will be completed once there is a record available to poll.
-
snapshotState
public List<SplitT> snapshotState(long checkpointId)
Description copied from interface:SourceReader
Checkpoint on the state of the source.- Specified by:
snapshotState
in interfaceSourceReader<E,T>
- Returns:
- the state of the source.
-
addSplits
public void addSplits(List<SplitT> splits)
Description copied from interface:SourceReader
Adds a list of splits for this reader to read. This method is called when the enumerator assigns a split viaSplitEnumeratorContext.assignSplit(SourceSplit, int)
orSplitEnumeratorContext.assignSplits(SplitsAssignment)
.- Specified by:
addSplits
in interfaceSourceReader<E,T>
- Parameters:
splits
- The splits assigned by the split enumerator.
-
notifyNoMoreSplits
public void notifyNoMoreSplits()
Description copied from interface:SourceReader
This method is called when the reader is notified that it will not receive any further splits.It is triggered when the enumerator calls
SplitEnumeratorContext.signalNoMoreSplits(int)
with the reader's parallel subtask.- Specified by:
notifyNoMoreSplits
in interfaceSourceReader<E,T>
-
handleSourceEvents
public void handleSourceEvents(SourceEvent sourceEvent)
Description copied from interface:SourceReader
Handle a custom source event sent by theSplitEnumerator
. This method is called when the enumerator sends an event viaSplitEnumeratorContext.sendEventToSourceReader(int, SourceEvent)
.This method has a default implementation that does nothing, because most sources do not require any custom events.
- Specified by:
handleSourceEvents
in interfaceSourceReader<E,T>
- Parameters:
sourceEvent
- the event sent by theSplitEnumerator
.
-
pauseOrResumeSplits
public void pauseOrResumeSplits(Collection<String> splitsToPause, Collection<String> splitsToResume)
Description copied from interface:SourceReader
Pauses or resumes reading of individual source splits.Note that no other methods can be called in parallel, so updating subscriptions can be done atomically. This method is simply providing connectors with more expressive APIs the opportunity to update all subscriptions at once.
This is currently used to align the watermarks of splits, if watermark alignment is used and the source reads from more than one split.
The default implementation throws an
UnsupportedOperationException
where the default implementation will be removed in future releases. To be compatible with future releases, it is recommended to implement this method and override the default implementation.- Specified by:
pauseOrResumeSplits
in interfaceSourceReader<E,T>
- Parameters:
splitsToPause
- the splits to pausesplitsToResume
- the splits to resume
-
close
public void close() throws Exception
- Specified by:
close
in interfaceAutoCloseable
- Throws:
Exception
-
getNumberOfCurrentlyAssignedSplits
public int getNumberOfCurrentlyAssignedSplits()
Gets the number of splits the reads has currently assigned.These are the splits that have been added via
addSplits(List)
and have not yet been finished by returning them from theSplitReader.fetch()
as part ofRecordsWithSplitIds.finishedSplits()
.
-
onSplitFinished
protected abstract void onSplitFinished(Map<String,SplitStateT> finishedSplitIds)
Handles the finished splits to clean the state if needed.
-
initializedState
protected abstract SplitStateT initializedState(SplitT split)
When new splits are added to the reader. The initialize the state of the new splits.- Parameters:
split
- a newly added split.
-
toSplitType
protected abstract SplitT toSplitType(String splitId, SplitStateT splitState)
Convert a mutable SplitStateT to immutable SplitT.- Parameters:
splitState
- splitState.- Returns:
- an immutable Split state.
-
-