E
- The rich element type that contains information for split state update or timestamp
extraction.T
- The final element type to emit.SplitT
- the immutable split type.SplitStateT
- the mutable type of split state.@PublicEvolving public abstract class SourceReaderBase<E,T,SplitT extends SourceSplit,SplitStateT> extends Object implements SourceReader<T,SplitT>
SourceReader
which provides some synchronization between
the mail box main thread and the SourceReader internal threads. This class allows user to just
provide a SplitReader
and snapshot the split state.
This implementation provides the following metrics out of the box:
Modifier and Type | Field and Description |
---|---|
protected Configuration |
config
The raw configurations that may be used by subclasses.
|
protected SourceReaderContext |
context
The context of this source reader.
|
protected SourceReaderOptions |
options
The configuration for the reader.
|
protected RecordEmitter<E,T,SplitStateT> |
recordEmitter
The record emitter to handle the records read by the SplitReaders.
|
protected SplitFetcherManager<E,SplitT> |
splitFetcherManager
The split fetcher manager to run split fetchers.
|
Constructor and Description |
---|
SourceReaderBase(FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
SplitFetcherManager<E,SplitT> splitFetcherManager,
RecordEmitter<E,T,SplitStateT> recordEmitter,
Configuration config,
SourceReaderContext context) |
Modifier and Type | Method and Description |
---|---|
void |
addSplits(List<SplitT> splits)
Adds a list of splits for this reader to read.
|
void |
close() |
int |
getNumberOfCurrentlyAssignedSplits()
Gets the number of splits the reads has currently assigned.
|
void |
handleSourceEvents(SourceEvent sourceEvent)
Handle a custom source event sent by the
SplitEnumerator . |
protected abstract SplitStateT |
initializedState(SplitT split)
When new splits are added to the reader.
|
CompletableFuture<Void> |
isAvailable()
Returns a future that signals that data is available from the reader.
|
void |
notifyNoMoreSplits()
This method is called when the reader is notified that it will not receive any further
splits.
|
protected abstract void |
onSplitFinished(Map<String,SplitStateT> finishedSplitIds)
Handles the finished splits to clean the state if needed.
|
InputStatus |
pollNext(ReaderOutput<T> output)
Poll the next available record into the
SourceOutput . |
List<SplitT> |
snapshotState(long checkpointId)
Checkpoint on the state of the source.
|
void |
start()
Start the reader.
|
protected abstract SplitT |
toSplitType(String splitId,
SplitStateT splitState)
Convert a mutable SplitStateT to immutable SplitT.
|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
notifyCheckpointComplete
notifyCheckpointAborted
protected final RecordEmitter<E,T,SplitStateT> recordEmitter
protected final SplitFetcherManager<E,SplitT extends SourceSplit> splitFetcherManager
protected final SourceReaderOptions options
protected final Configuration config
protected SourceReaderContext context
public SourceReaderBase(FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, SplitFetcherManager<E,SplitT> splitFetcherManager, RecordEmitter<E,T,SplitStateT> recordEmitter, Configuration config, SourceReaderContext context)
public void start()
SourceReader
start
in interface SourceReader<T,SplitT extends SourceSplit>
public InputStatus pollNext(ReaderOutput<T> output) throws Exception
SourceReader
SourceOutput
.
The implementation must make sure this method is non-blocking.
Although the implementation can emit multiple records into the given SourceOutput, it is
recommended not doing so. Instead, emit one record into the SourceOutput and return a InputStatus.MORE_AVAILABLE
to let the caller thread know there are more records available.
pollNext
in interface SourceReader<T,SplitT extends SourceSplit>
Exception
public CompletableFuture<Void> isAvailable()
SourceReader
Once the future completes, the runtime will keep calling the SourceReader.pollNext(ReaderOutput)
method until that methods returns a status other than InputStatus.MORE_AVAILABLE
. After that the, the runtime will again call this method to
obtain the next future. Once that completes, it will again call SourceReader.pollNext(ReaderOutput)
and so on.
The contract is the following: If the reader has data available, then all futures previously returned by this method must eventually complete. Otherwise the source might stall indefinitely.
It is not a problem to have occasional "false positives", meaning to complete a future
even if no data is available. However, one should not use an "always complete" future in
cases no data is available, because that will result in busy waiting loops calling pollNext(...)
even though no data is available.
isAvailable
in interface SourceReader<T,SplitT extends SourceSplit>
public List<SplitT> snapshotState(long checkpointId)
SourceReader
snapshotState
in interface SourceReader<T,SplitT extends SourceSplit>
public void addSplits(List<SplitT> splits)
SourceReader
SplitEnumeratorContext.assignSplit(SourceSplit, int)
or SplitEnumeratorContext.assignSplits(SplitsAssignment)
.addSplits
in interface SourceReader<T,SplitT extends SourceSplit>
splits
- The splits assigned by the split enumerator.public void notifyNoMoreSplits()
SourceReader
It is triggered when the enumerator calls SplitEnumeratorContext.signalNoMoreSplits(int)
with the reader's parallel subtask.
notifyNoMoreSplits
in interface SourceReader<T,SplitT extends SourceSplit>
public void handleSourceEvents(SourceEvent sourceEvent)
SourceReader
SplitEnumerator
. This method is called when
the enumerator sends an event via SplitEnumeratorContext.sendEventToSourceReader(int,
SourceEvent)
.
This method has a default implementation that does nothing, because most sources do not require any custom events.
handleSourceEvents
in interface SourceReader<T,SplitT extends SourceSplit>
sourceEvent
- the event sent by the SplitEnumerator
.public void close() throws Exception
close
in interface AutoCloseable
Exception
public int getNumberOfCurrentlyAssignedSplits()
These are the splits that have been added via addSplits(List)
and have not yet
been finished by returning them from the SplitReader.fetch()
as part of RecordsWithSplitIds.finishedSplits()
.
protected abstract void onSplitFinished(Map<String,SplitStateT> finishedSplitIds)
protected abstract SplitStateT initializedState(SplitT split)
split
- a newly added split.protected abstract SplitT toSplitType(String splitId, SplitStateT splitState)
splitState
- splitState.Copyright © 2014–2023 The Apache Software Foundation. All rights reserved.