Class TestingSplitEnumeratorContext<SplitT extends SourceSplit>
- java.lang.Object
-
- org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext<SplitT>
-
- Type Parameters:
SplitT
- The generic type of the splits.
- All Implemented Interfaces:
SplitEnumeratorContext<SplitT>
public class TestingSplitEnumeratorContext<SplitT extends SourceSplit> extends Object implements SplitEnumeratorContext<SplitT>
A test implementation of theSplitEnumeratorContext
, with manual, non-concurrent interaction and intercepting of state.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
TestingSplitEnumeratorContext.SplitAssignmentState<SplitT extends SourceSplit>
The state of the split assignment for a subtask.
-
Constructor Summary
Constructors Constructor Description TestingSplitEnumeratorContext(int parallelism)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
assignSplits(SplitsAssignment<SplitT> newSplitAssignments)
Assign the splits.<T> void
callAsync(Callable<T> callable, BiConsumer<T,Throwable> handler)
Invoke the callable and handover the return value to the handler which will be executed by the source coordinator.<T> void
callAsync(Callable<T> callable, BiConsumer<T,Throwable> handler, long initialDelay, long period)
Invoke the given callable periodically and handover the return value to the handler which will be executed by the source coordinator.int
currentParallelism()
Get the current parallelism of this Source.ManuallyTriggeredScheduledExecutorService
getExecutorService()
Map<Integer,List<SourceEvent>>
getSentEvents()
Map<Integer,TestingSplitEnumeratorContext.SplitAssignmentState<SplitT>>
getSplitAssignments()
SplitEnumeratorMetricGroup
metricGroup()
Map<Integer,ReaderInfo>
registeredReaders()
Get the currently registered readers.void
registerReader(int subtask, String hostname)
void
runInCoordinatorThread(Runnable runnable)
Invoke the given runnable in the source coordinator thread.void
sendEventToSourceReader(int subtaskId, int attemptNumber, SourceEvent event)
Send a source event to a source reader.void
sendEventToSourceReader(int subtaskId, SourceEvent event)
Send a source event to a source reader.void
setIsProcessingBacklog(boolean isProcessingBacklog)
Reports to JM whether this source is currently processing backlog.void
signalNoMoreSplits(int subtask)
Signals a subtask that it will not receive any further split.void
triggerAllActions()
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.apache.flink.api.connector.source.SplitEnumeratorContext
assignSplit, registeredReadersOfAttempts
-
-
-
-
Method Detail
-
triggerAllActions
public void triggerAllActions()
-
getExecutorService
public ManuallyTriggeredScheduledExecutorService getExecutorService()
-
getSplitAssignments
public Map<Integer,TestingSplitEnumeratorContext.SplitAssignmentState<SplitT>> getSplitAssignments()
-
getSentEvents
public Map<Integer,List<SourceEvent>> getSentEvents()
-
registerReader
public void registerReader(int subtask, String hostname)
-
metricGroup
public SplitEnumeratorMetricGroup metricGroup()
- Specified by:
metricGroup
in interfaceSplitEnumeratorContext<SplitT extends SourceSplit>
-
sendEventToSourceReader
public void sendEventToSourceReader(int subtaskId, SourceEvent event)
Description copied from interface:SplitEnumeratorContext
Send a source event to a source reader. The source reader is identified by its subtask id.- Specified by:
sendEventToSourceReader
in interfaceSplitEnumeratorContext<SplitT extends SourceSplit>
- Parameters:
subtaskId
- the subtask id of the source reader to send this event to.event
- the source event to send.
-
sendEventToSourceReader
public void sendEventToSourceReader(int subtaskId, int attemptNumber, SourceEvent event)
Description copied from interface:SplitEnumeratorContext
Send a source event to a source reader. The source reader is identified by its subtask id and attempt number. It is similar toSplitEnumeratorContext.sendEventToSourceReader(int, SourceEvent)
but it is aware of the subtask execution attempt to send this event to.The
SplitEnumerator
must invoke this method instead ofSplitEnumeratorContext.sendEventToSourceReader(int, SourceEvent)
if it is used in cases that a subtask can have multiple concurrent execution attempts, e.g. if speculative execution is enabled. Otherwise an error will be thrown when the split enumerator tries to send a custom source event.- Specified by:
sendEventToSourceReader
in interfaceSplitEnumeratorContext<SplitT extends SourceSplit>
- Parameters:
subtaskId
- the subtask id of the source reader to send this event to.attemptNumber
- the attempt number of the source reader to send this event to.event
- the source event to send.
-
currentParallelism
public int currentParallelism()
Description copied from interface:SplitEnumeratorContext
Get the current parallelism of this Source. Note that due to auto-scaling, the parallelism may change over time. Therefore the SplitEnumerator should not cache the return value of this method, but always invoke this method to get the latest parallelism.- Specified by:
currentParallelism
in interfaceSplitEnumeratorContext<SplitT extends SourceSplit>
- Returns:
- the parallelism of the Source.
-
registeredReaders
public Map<Integer,ReaderInfo> registeredReaders()
Description copied from interface:SplitEnumeratorContext
Get the currently registered readers. The mapping is from subtask id to the reader info.Note that if a subtask has multiple concurrent attempts, the map will contain the earliest attempt of that subtask. This is for compatibility purpose. It's recommended to use
SplitEnumeratorContext.registeredReadersOfAttempts()
instead.- Specified by:
registeredReaders
in interfaceSplitEnumeratorContext<SplitT extends SourceSplit>
- Returns:
- the currently registered readers.
-
assignSplits
public void assignSplits(SplitsAssignment<SplitT> newSplitAssignments)
Description copied from interface:SplitEnumeratorContext
Assign the splits.- Specified by:
assignSplits
in interfaceSplitEnumeratorContext<SplitT extends SourceSplit>
- Parameters:
newSplitAssignments
- the new split assignments to add.
-
signalNoMoreSplits
public void signalNoMoreSplits(int subtask)
Description copied from interface:SplitEnumeratorContext
Signals a subtask that it will not receive any further split.- Specified by:
signalNoMoreSplits
in interfaceSplitEnumeratorContext<SplitT extends SourceSplit>
- Parameters:
subtask
- The index of the operator's parallel subtask that shall be signaled it will not receive any further split.
-
callAsync
public <T> void callAsync(Callable<T> callable, BiConsumer<T,Throwable> handler)
Description copied from interface:SplitEnumeratorContext
Invoke the callable and handover the return value to the handler which will be executed by the source coordinator. When this method is invoked multiple times, TheCallable
s may be executed in a thread pool concurrently.It is important to make sure that the callable does not modify any shared state, especially the states that will be a part of the
SplitEnumerator.snapshotState(long)
. Otherwise, there might be unexpected behavior.Note that an exception thrown from the handler would result in failing the job.
- Specified by:
callAsync
in interfaceSplitEnumeratorContext<SplitT extends SourceSplit>
- Parameters:
callable
- a callable to call.handler
- a handler that handles the return value of or the exception thrown from the callable.
-
callAsync
public <T> void callAsync(Callable<T> callable, BiConsumer<T,Throwable> handler, long initialDelay, long period)
Description copied from interface:SplitEnumeratorContext
Invoke the given callable periodically and handover the return value to the handler which will be executed by the source coordinator. When this method is invoked multiple times, TheCallable
s may be executed in a thread pool concurrently.It is important to make sure that the callable does not modify any shared state, especially the states that will be a part of the
SplitEnumerator.snapshotState(long)
. Otherwise, there might be unexpected behavior.Note that an exception thrown from the handler would result in failing the job.
- Specified by:
callAsync
in interfaceSplitEnumeratorContext<SplitT extends SourceSplit>
- Parameters:
callable
- the callable to call.handler
- a handler that handles the return value of or the exception thrown from the callable.initialDelay
- the initial delay of calling the callable, in milliseconds.period
- the period between two invocations of the callable, in milliseconds.
-
runInCoordinatorThread
public void runInCoordinatorThread(Runnable runnable)
Description copied from interface:SplitEnumeratorContext
Invoke the given runnable in the source coordinator thread.This can be useful when the enumerator needs to execute some action (like assignSplits) triggered by some external events. E.g., Watermark from another source advanced and this source now be able to assign splits to awaiting readers. The trigger can be initiated from the coordinator thread of the other source. Instead of using lock for thread safety, this API allows to run such externally triggered action in the coordinator thread. Hence, we can ensure all enumerator actions are serialized in the single coordinator thread.
It is important that the runnable does not block.
- Specified by:
runInCoordinatorThread
in interfaceSplitEnumeratorContext<SplitT extends SourceSplit>
- Parameters:
runnable
- a runnable to execute
-
setIsProcessingBacklog
public void setIsProcessingBacklog(boolean isProcessingBacklog)
Description copied from interface:SplitEnumeratorContext
Reports to JM whether this source is currently processing backlog.When source is processing backlog, it means the records being emitted by this source is already stale and there is no processing latency requirement for these records. This allows downstream operators to optimize throughput instead of reducing latency for intermediate results.
If no API has been explicitly invoked to specify the backlog status of a source, the source is considered to have isProcessingBacklog=false by default.
- Specified by:
setIsProcessingBacklog
in interfaceSplitEnumeratorContext<SplitT extends SourceSplit>
-
-