SplitT
- the type of the splits.@Internal public class SourceCoordinatorContext<SplitT extends SourceSplit> extends Object implements SplitEnumeratorContext<SplitT>, SupportsIntermediateNoMoreSplits, AutoCloseable
OperatorCoordinator
. Compared with SplitEnumeratorContext
this class allows interaction with state and sending OperatorEvent
to the SourceOperator
while SplitEnumeratorContext
only allows sending SourceEvent
.
The context serves a few purposes:
SourceEvents
to the source readers.
Constructor and Description |
---|
SourceCoordinatorContext(SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
int numWorkerThreads,
OperatorCoordinator.Context operatorCoordinatorContext,
SimpleVersionedSerializer<SplitT> splitSerializer,
boolean supportsConcurrentExecutionAttempts) |
Modifier and Type | Method and Description |
---|---|
void |
assignSplits(SplitsAssignment<SplitT> assignment)
Assign the splits.
|
<T> void |
callAsync(Callable<T> callable,
BiConsumer<T,Throwable> handler)
Invoke the callable and handover the return value to the handler which will be executed by
the source coordinator.
|
<T> void |
callAsync(Callable<T> callable,
BiConsumer<T,Throwable> handler,
long initialDelay,
long period)
Invoke the given callable periodically and handover the return value to the handler which
will be executed by the source coordinator.
|
void |
close() |
int |
currentParallelism()
Get the current parallelism of this Source.
|
TernaryBoolean |
isBacklog()
Returns whether the Source is processing backlog data.
|
SplitEnumeratorMetricGroup |
metricGroup() |
Map<Integer,ReaderInfo> |
registeredReaders()
Get the currently registered readers.
|
Map<Integer,Map<Integer,ReaderInfo>> |
registeredReadersOfAttempts()
Get the currently registered readers of all the subtask attempts.
|
void |
runInCoordinatorThread(Runnable runnable)
Invoke the given runnable in the source coordinator thread.
|
void |
sendEventToSourceReader(int subtaskId,
int attemptNumber,
SourceEvent event)
Send a source event to a source reader.
|
void |
sendEventToSourceReader(int subtaskId,
SourceEvent event)
Send a source event to a source reader.
|
void |
setIsProcessingBacklog(boolean isProcessingBacklog)
Reports to JM whether this source is currently processing backlog.
|
void |
signalIntermediateNoMoreSplits(int subtask)
Signals a subtask that it will not receive split for current source, but it will receive
split for next sources.
|
void |
signalNoMoreSplits(int subtask)
Signals a subtask that it will not receive any further split.
|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
assignSplit
public SourceCoordinatorContext(SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory, int numWorkerThreads, OperatorCoordinator.Context operatorCoordinatorContext, SimpleVersionedSerializer<SplitT> splitSerializer, boolean supportsConcurrentExecutionAttempts)
public SplitEnumeratorMetricGroup metricGroup()
metricGroup
in interface SplitEnumeratorContext<SplitT extends SourceSplit>
public void sendEventToSourceReader(int subtaskId, SourceEvent event)
SplitEnumeratorContext
sendEventToSourceReader
in interface SplitEnumeratorContext<SplitT extends SourceSplit>
subtaskId
- the subtask id of the source reader to send this event to.event
- the source event to send.public void sendEventToSourceReader(int subtaskId, int attemptNumber, SourceEvent event)
SplitEnumeratorContext
SplitEnumeratorContext.sendEventToSourceReader(int, SourceEvent)
but it is
aware of the subtask execution attempt to send this event to.
The SplitEnumerator
must invoke this method instead of SplitEnumeratorContext.sendEventToSourceReader(int, SourceEvent)
if it is used in cases that a subtask can have
multiple concurrent execution attempts, e.g. if speculative execution is enabled. Otherwise
an error will be thrown when the split enumerator tries to send a custom source event.
sendEventToSourceReader
in interface SplitEnumeratorContext<SplitT extends SourceSplit>
subtaskId
- the subtask id of the source reader to send this event to.attemptNumber
- the attempt number of the source reader to send this event to.event
- the source event to send.public int currentParallelism()
SplitEnumeratorContext
currentParallelism
in interface SplitEnumeratorContext<SplitT extends SourceSplit>
public Map<Integer,ReaderInfo> registeredReaders()
SplitEnumeratorContext
Note that if a subtask has multiple concurrent attempts, the map will contain the earliest
attempt of that subtask. This is for compatibility purpose. It's recommended to use SplitEnumeratorContext.registeredReadersOfAttempts()
instead.
registeredReaders
in interface SplitEnumeratorContext<SplitT extends SourceSplit>
public Map<Integer,Map<Integer,ReaderInfo>> registeredReadersOfAttempts()
SplitEnumeratorContext
registeredReadersOfAttempts
in interface SplitEnumeratorContext<SplitT extends SourceSplit>
public void assignSplits(SplitsAssignment<SplitT> assignment)
SplitEnumeratorContext
assignSplits
in interface SplitEnumeratorContext<SplitT extends SourceSplit>
assignment
- the new split assignments to add.public void signalNoMoreSplits(int subtask)
SplitEnumeratorContext
signalNoMoreSplits
in interface SplitEnumeratorContext<SplitT extends SourceSplit>
subtask
- The index of the operator's parallel subtask that shall be signaled it will
not receive any further split.public void signalIntermediateNoMoreSplits(int subtask)
SupportsIntermediateNoMoreSplits
signalIntermediateNoMoreSplits
in interface SupportsIntermediateNoMoreSplits
subtask
- The index of the operator's parallel subtask that shall be signaled it will
receive splits later.public <T> void callAsync(Callable<T> callable, BiConsumer<T,Throwable> handler, long initialDelay, long period)
SplitEnumeratorContext
Callable
s may be executed in a thread pool concurrently.
It is important to make sure that the callable does not modify any shared state,
especially the states that will be a part of the SplitEnumerator.snapshotState(long)
.
Otherwise, there might be unexpected behavior.
Note that an exception thrown from the handler would result in failing the job.
callAsync
in interface SplitEnumeratorContext<SplitT extends SourceSplit>
callable
- the callable to call.handler
- a handler that handles the return value of or the exception thrown from the
callable.initialDelay
- the initial delay of calling the callable, in milliseconds.period
- the period between two invocations of the callable, in milliseconds.public <T> void callAsync(Callable<T> callable, BiConsumer<T,Throwable> handler)
SplitEnumeratorContext
Callable
s may be executed in a thread pool concurrently.
It is important to make sure that the callable does not modify any shared state,
especially the states that will be a part of the SplitEnumerator.snapshotState(long)
.
Otherwise, there might be unexpected behavior.
Note that an exception thrown from the handler would result in failing the job.
callAsync
in interface SplitEnumeratorContext<SplitT extends SourceSplit>
callable
- a callable to call.handler
- a handler that handles the return value of or the exception thrown from the
callable.public void runInCoordinatorThread(Runnable runnable)
This can be useful when the enumerator needs to execute some action (like assignSplits) triggered by some external events. E.g., Watermark from another source advanced and this source now be able to assign splits to awaiting readers. The trigger can be initiated from the coordinator thread of the other source. Instead of using lock for thread safety, this API allows to run such externally triggered action in the coordinator thread. Hence, we can ensure all enumerator actions are serialized in the single coordinator thread.
It is important that the runnable does not block. If the runnable throws an Exception, the corresponding job is failed.
runInCoordinatorThread
in interface SplitEnumeratorContext<SplitT extends SourceSplit>
runnable
- a runnable to executepublic void close() throws InterruptedException
close
in interface AutoCloseable
InterruptedException
public void setIsProcessingBacklog(boolean isProcessingBacklog)
SplitEnumeratorContext
When source is processing backlog, it means the records being emitted by this source is already stale and there is no processing latency requirement for these records. This allows downstream operators to optimize throughput instead of reducing latency for intermediate results.
If no API has been explicitly invoked to specify the backlog status of a source, the source is considered to have isProcessingBacklog=false by default.
setIsProcessingBacklog
in interface SplitEnumeratorContext<SplitT extends SourceSplit>
public TernaryBoolean isBacklog()
setIsProcessingBacklog(boolean)
method.Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.