E
- The rich element type that contains information for split state update or timestamp
extraction.T
- The final element type to emit.SplitT
- the immutable split type.SplitStateT
- the mutable type of split state.@PublicEvolving public abstract class SourceReaderBase<E,T,SplitT extends SourceSplit,SplitStateT> extends Object implements SourceReader<T,SplitT>
SourceReader
which provides some synchronization between
the mail box main thread and the SourceReader internal threads. This class allows user to just
provide a SplitReader
and snapshot the split state.
This implementation provides the following metrics out of the box:
Modifier and Type | Field and Description |
---|---|
protected Configuration |
config
The raw configurations that may be used by subclasses.
|
protected SourceReaderContext |
context
The context of this source reader.
|
protected RecordEvaluator<T> |
eofRecordEvaluator |
protected SourceReaderOptions |
options
The configuration for the reader.
|
protected RecordEmitter<E,T,SplitStateT> |
recordEmitter
The record emitter to handle the records read by the SplitReaders.
|
protected SplitFetcherManager<E,SplitT> |
splitFetcherManager
The split fetcher manager to run split fetchers.
|
Constructor and Description |
---|
SourceReaderBase(FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
SplitFetcherManager<E,SplitT> splitFetcherManager,
RecordEmitter<E,T,SplitStateT> recordEmitter,
Configuration config,
SourceReaderContext context)
Deprecated.
|
SourceReaderBase(FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
SplitFetcherManager<E,SplitT> splitFetcherManager,
RecordEmitter<E,T,SplitStateT> recordEmitter,
RecordEvaluator<T> eofRecordEvaluator,
Configuration config,
SourceReaderContext context)
Deprecated.
|
SourceReaderBase(SplitFetcherManager<E,SplitT> splitFetcherManager,
RecordEmitter<E,T,SplitStateT> recordEmitter,
Configuration config,
SourceReaderContext context)
The primary constructor for the source reader.
|
SourceReaderBase(SplitFetcherManager<E,SplitT> splitFetcherManager,
RecordEmitter<E,T,SplitStateT> recordEmitter,
RecordEvaluator<T> eofRecordEvaluator,
Configuration config,
SourceReaderContext context) |
Modifier and Type | Method and Description |
---|---|
void |
addSplits(List<SplitT> splits)
Adds a list of splits for this reader to read.
|
void |
close() |
int |
getNumberOfCurrentlyAssignedSplits()
Gets the number of splits the reads has currently assigned.
|
void |
handleSourceEvents(SourceEvent sourceEvent)
Handle a custom source event sent by the
SplitEnumerator . |
protected abstract SplitStateT |
initializedState(SplitT split)
When new splits are added to the reader.
|
CompletableFuture<Void> |
isAvailable()
Returns a future that signals that data is available from the reader.
|
void |
notifyNoMoreSplits()
This method is called when the reader is notified that it will not receive any further
splits.
|
protected abstract void |
onSplitFinished(Map<String,SplitStateT> finishedSplitIds)
Handles the finished splits to clean the state if needed.
|
void |
pauseOrResumeSplits(Collection<String> splitsToPause,
Collection<String> splitsToResume)
Pauses or resumes reading of individual source splits.
|
InputStatus |
pollNext(ReaderOutput<T> output)
Poll the next available record into the
ReaderOutput . |
List<SplitT> |
snapshotState(long checkpointId)
Checkpoint on the state of the source.
|
void |
start()
Start the reader.
|
protected abstract SplitT |
toSplitType(String splitId,
SplitStateT splitState)
Convert a mutable SplitStateT to immutable SplitT.
|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
notifyCheckpointComplete
notifyCheckpointAborted
protected final RecordEmitter<E,T,SplitStateT> recordEmitter
protected final SplitFetcherManager<E,SplitT extends SourceSplit> splitFetcherManager
protected final SourceReaderOptions options
protected final Configuration config
protected SourceReaderContext context
@Nullable protected final RecordEvaluator<T> eofRecordEvaluator
@Deprecated public SourceReaderBase(FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, SplitFetcherManager<E,SplitT> splitFetcherManager, RecordEmitter<E,T,SplitStateT> recordEmitter, Configuration config, SourceReaderContext context)
SourceReaderBase(SplitFetcherManager, RecordEmitter,
Configuration, SourceReaderContext)
instead.@Deprecated public SourceReaderBase(FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, SplitFetcherManager<E,SplitT> splitFetcherManager, RecordEmitter<E,T,SplitStateT> recordEmitter, @Nullable RecordEvaluator<T> eofRecordEvaluator, Configuration config, SourceReaderContext context)
SourceReaderBase(SplitFetcherManager, RecordEmitter,
RecordEvaluator, Configuration, SourceReaderContext)
instead.public SourceReaderBase(SplitFetcherManager<E,SplitT> splitFetcherManager, RecordEmitter<E,T,SplitStateT> recordEmitter, Configuration config, SourceReaderContext context)
The reader will use a handover queue sized as configured via SourceReaderOptions.ELEMENT_QUEUE_CAPACITY
.
public SourceReaderBase(SplitFetcherManager<E,SplitT> splitFetcherManager, RecordEmitter<E,T,SplitStateT> recordEmitter, @Nullable RecordEvaluator<T> eofRecordEvaluator, Configuration config, SourceReaderContext context)
public void start()
SourceReader
start
in interface SourceReader<T,SplitT extends SourceSplit>
public InputStatus pollNext(ReaderOutput<T> output) throws Exception
SourceReader
ReaderOutput
.
The implementation must make sure this method is non-blocking.
Although the implementation can emit multiple records into the given ReaderOutput, it is
recommended not doing so. Instead, emit one record into the ReaderOutput and return a InputStatus.MORE_AVAILABLE
to let the caller thread know there are more records available.
pollNext
in interface SourceReader<T,SplitT extends SourceSplit>
Exception
public CompletableFuture<Void> isAvailable()
SourceReader
Once the future completes, the runtime will keep calling the SourceReader.pollNext(ReaderOutput)
method until that method returns a status other than InputStatus.MORE_AVAILABLE
. After that, the runtime will again call this method to obtain
the next future. Once that completes, it will again call SourceReader.pollNext(ReaderOutput)
and
so on.
The contract is the following: If the reader has data available, then all futures previously returned by this method must eventually complete. Otherwise the source might stall indefinitely.
It is not a problem to have occasional "false positives", meaning to complete a future
even if no data is available. However, one should not use an "always complete" future in
cases no data is available, because that will result in busy waiting loops calling pollNext(...)
even though no data is available.
isAvailable
in interface SourceReader<T,SplitT extends SourceSplit>
public List<SplitT> snapshotState(long checkpointId)
SourceReader
snapshotState
in interface SourceReader<T,SplitT extends SourceSplit>
public void addSplits(List<SplitT> splits)
SourceReader
SplitEnumeratorContext.assignSplit(SourceSplit, int)
or SplitEnumeratorContext.assignSplits(SplitsAssignment)
.addSplits
in interface SourceReader<T,SplitT extends SourceSplit>
splits
- The splits assigned by the split enumerator.public void notifyNoMoreSplits()
SourceReader
It is triggered when the enumerator calls SplitEnumeratorContext.signalNoMoreSplits(int)
with the reader's parallel subtask.
notifyNoMoreSplits
in interface SourceReader<T,SplitT extends SourceSplit>
public void handleSourceEvents(SourceEvent sourceEvent)
SourceReader
SplitEnumerator
. This method is called when
the enumerator sends an event via SplitEnumeratorContext.sendEventToSourceReader(int,
SourceEvent)
.
This method has a default implementation that does nothing, because most sources do not require any custom events.
handleSourceEvents
in interface SourceReader<T,SplitT extends SourceSplit>
sourceEvent
- the event sent by the SplitEnumerator
.public void pauseOrResumeSplits(Collection<String> splitsToPause, Collection<String> splitsToResume)
SourceReader
Note that no other methods can be called in parallel, so updating subscriptions can be done atomically. This method is simply providing connectors with more expressive APIs the opportunity to update all subscriptions at once.
This is currently used to align the watermarks of splits, if watermark alignment is used and the source reads from more than one split.
The default implementation throws an UnsupportedOperationException
where the
default implementation will be removed in future releases. To be compatible with future
releases, it is recommended to implement this method and override the default implementation.
pauseOrResumeSplits
in interface SourceReader<T,SplitT extends SourceSplit>
splitsToPause
- the splits to pausesplitsToResume
- the splits to resumepublic void close() throws Exception
close
in interface AutoCloseable
Exception
public int getNumberOfCurrentlyAssignedSplits()
These are the splits that have been added via addSplits(List)
and have not yet
been finished by returning them from the SplitReader.fetch()
as part of RecordsWithSplitIds.finishedSplits()
.
protected abstract void onSplitFinished(Map<String,SplitStateT> finishedSplitIds)
protected abstract SplitStateT initializedState(SplitT split)
split
- a newly added split.protected abstract SplitT toSplitType(String splitId, SplitStateT splitState)
splitState
- splitState.Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.