Class OperatorChain<OUT,OP extends StreamOperator<OUT>>
- java.lang.Object
-
- org.apache.flink.streaming.runtime.tasks.OperatorChain<OUT,OP>
-
- Type Parameters:
OUT
- The type of elements accepted by the chain, i.e., the input type of the chain's main operator.
- All Implemented Interfaces:
Closeable
,AutoCloseable
,BoundedMultiInput
- Direct Known Subclasses:
FinishedOperatorChain
,RegularOperatorChain
public abstract class OperatorChain<OUT,OP extends StreamOperator<OUT>> extends Object implements BoundedMultiInput, Closeable
TheOperatorChain
contains all operators that are executed as one chain within a singleStreamTask
.The main entry point to the chain is it's
mainOperator
.mainOperator
is driving the execution of theStreamTask
, by pulling the records from network inputs and/or source inputs and pushing produced records to the remaining chained operators.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
OperatorChain.ChainedSource
Wrapper class to access the chained sources and their's outputs.
-
Field Summary
Fields Modifier and Type Field Description protected Map<StreamConfig.SourceInputConfig,OperatorChain.ChainedSource>
chainedSources
protected org.apache.flink.shaded.guava32.com.google.common.io.Closer
closer
protected FinishedOnRestoreInput
finishedOnRestoreInput
protected StreamOperatorWrapper<?,?>
firstOperatorWrapper
protected boolean
isClosed
protected WatermarkGaugeExposingOutput<StreamRecord<OUT>>
mainOperatorOutput
protected StreamOperatorWrapper<OUT,OP>
mainOperatorWrapper
For iteration,StreamIterationHead
andStreamIterationTail
used for executing feedback edges do not contain any operators, in which case,mainOperatorWrapper
andtailOperatorWrapper
are null.protected int
numOperators
protected OperatorEventDispatcherImpl
operatorEventDispatcher
protected RecordWriterOutput<?>[]
streamOutputs
protected StreamOperatorWrapper<?,?>
tailOperatorWrapper
-
Constructor Summary
Constructors Constructor Description OperatorChain(StreamTask<OUT,OP> containingTask, RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate)
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description void
abortCheckpoint(long checkpointId, CheckpointException cause)
void
alignedBarrierTimeout(long checkpointId)
void
broadcastEvent(AbstractEvent event)
void
broadcastEvent(AbstractEvent event, boolean isPriorityEvent)
void
close()
This method releases all resources of the record writer output.void
closeAllOperators()
ExecuteStreamOperator.close()
of each operator in the chain of thisStreamTask
.abstract void
dispatchOperatorEvent(OperatorID operator, SerializedValue<OperatorEvent> event)
abstract void
endInput(int inputId)
Ends the main operator input specified byinputId
).abstract void
finishOperators(StreamTaskActionExecutor actionExecutor, StopMode stopMode)
Closes all operators in a chain effect way.void
flushOutputs()
This method should be called before finishing the record emission, to make sure any data that is still buffered will be sent.Iterable<StreamOperatorWrapper<?,?>>
getAllOperators()
Returns anIterable
which traverses all operators in forward topological order.protected Iterable<StreamOperatorWrapper<?,?>>
getAllOperators(boolean reverse)
Returns anIterable
which traverses all operators in forward or reverse topological order.OperatorChain.ChainedSource
getChainedSource(StreamConfig.SourceInputConfig sourceInput)
List<Output<StreamRecord<?>>>
getChainedSourceOutputs()
Input
getFinishedOnRestoreInputOrDefault(Input defaultInput)
OP
getMainOperator()
WatermarkGaugeExposingOutput<StreamRecord<OUT>>
getMainOperatorOutput()
int
getNumberOfOperators()
OperatorEventDispatcher
getOperatorEventDispatcher()
StreamTaskSourceInput<?>
getSourceTaskInput(StreamConfig.SourceInputConfig sourceInput)
List<StreamTaskSourceInput<?>>
getSourceTaskInputs()
RecordWriterOutput<?>[]
getStreamOutputs()
protected StreamOperator<?>
getTailOperator()
abstract void
initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer)
Initialize state and open all operators in the chain from tail to heads, contrary toStreamOperator.close()
which happens heads to tail (seefinishOperators(StreamTaskActionExecutor, StopMode)
).boolean
isClosed()
abstract boolean
isTaskDeployedAsFinished()
abstract void
notifyCheckpointAborted(long checkpointId)
abstract void
notifyCheckpointComplete(long checkpointId)
abstract void
notifyCheckpointSubsumed(long checkpointId)
abstract void
prepareSnapshotPreBarrier(long checkpointId)
protected void
sendAcknowledgeCheckpointEvent(long checkpointId)
protected void
snapshotChannelStates(StreamOperator<?> op, ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult, OperatorSnapshotFutures snapshotInProgress)
abstract void
snapshotState(Map<OperatorID,OperatorSnapshotFutures> operatorSnapshotsInProgress, CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, Supplier<Boolean> isRunning, ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult, CheckpointStreamFactory storage)
-
-
-
Field Detail
-
streamOutputs
protected final RecordWriterOutput<?>[] streamOutputs
-
mainOperatorOutput
protected final WatermarkGaugeExposingOutput<StreamRecord<OUT>> mainOperatorOutput
-
mainOperatorWrapper
@Nullable protected final StreamOperatorWrapper<OUT,OP extends StreamOperator<OUT>> mainOperatorWrapper
For iteration,StreamIterationHead
andStreamIterationTail
used for executing feedback edges do not contain any operators, in which case,mainOperatorWrapper
andtailOperatorWrapper
are null.Usually first operator in the chain is the same as
mainOperatorWrapper
, but that's not the case if there are chained source inputs. In this case, one of the source inputs will be the first operator. For example the following operator chain is possible:first \ main (multi-input) -> ... -> tail / second
Where "first" and "second" (there can be more) are chained source operators. When it comes to things like closing, stat initialisation or state snapshotting, the operator chain is traversed: first, second, main, ..., tail or in reversed order: tail, ..., main, second, first
-
firstOperatorWrapper
@Nullable protected final StreamOperatorWrapper<?,?> firstOperatorWrapper
-
tailOperatorWrapper
@Nullable protected final StreamOperatorWrapper<?,?> tailOperatorWrapper
-
chainedSources
protected final Map<StreamConfig.SourceInputConfig,OperatorChain.ChainedSource> chainedSources
-
numOperators
protected final int numOperators
-
operatorEventDispatcher
protected final OperatorEventDispatcherImpl operatorEventDispatcher
-
closer
protected final org.apache.flink.shaded.guava32.com.google.common.io.Closer closer
-
finishedOnRestoreInput
@Nullable protected final FinishedOnRestoreInput finishedOnRestoreInput
-
isClosed
protected boolean isClosed
-
-
Constructor Detail
-
OperatorChain
public OperatorChain(StreamTask<OUT,OP> containingTask, RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate)
-
-
Method Detail
-
isTaskDeployedAsFinished
public abstract boolean isTaskDeployedAsFinished()
-
dispatchOperatorEvent
public abstract void dispatchOperatorEvent(OperatorID operator, SerializedValue<OperatorEvent> event) throws FlinkException
- Throws:
FlinkException
-
prepareSnapshotPreBarrier
public abstract void prepareSnapshotPreBarrier(long checkpointId) throws Exception
- Throws:
Exception
-
endInput
public abstract void endInput(int inputId) throws Exception
Ends the main operator input specified byinputId
).- Specified by:
endInput
in interfaceBoundedMultiInput
- Parameters:
inputId
- the input ID starts from 1 which indicates the first input.- Throws:
Exception
-
initializeStateAndOpenOperators
public abstract void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception
Initialize state and open all operators in the chain from tail to heads, contrary toStreamOperator.close()
which happens heads to tail (seefinishOperators(StreamTaskActionExecutor, StopMode)
).- Throws:
Exception
-
finishOperators
public abstract void finishOperators(StreamTaskActionExecutor actionExecutor, StopMode stopMode) throws Exception
Closes all operators in a chain effect way. Closing happens from heads to tail operator in the chain, contrary toStreamOperator.open()
which happens tail to heads (seeinitializeStateAndOpenOperators(StreamTaskStateInitializer)
).- Throws:
Exception
-
notifyCheckpointComplete
public abstract void notifyCheckpointComplete(long checkpointId) throws Exception
- Throws:
Exception
-
notifyCheckpointAborted
public abstract void notifyCheckpointAborted(long checkpointId) throws Exception
- Throws:
Exception
-
notifyCheckpointSubsumed
public abstract void notifyCheckpointSubsumed(long checkpointId) throws Exception
- Throws:
Exception
-
snapshotState
public abstract void snapshotState(Map<OperatorID,OperatorSnapshotFutures> operatorSnapshotsInProgress, CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, Supplier<Boolean> isRunning, ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult, CheckpointStreamFactory storage) throws Exception
- Throws:
Exception
-
getOperatorEventDispatcher
public OperatorEventDispatcher getOperatorEventDispatcher()
-
broadcastEvent
public void broadcastEvent(AbstractEvent event) throws IOException
- Throws:
IOException
-
broadcastEvent
public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException
- Throws:
IOException
-
alignedBarrierTimeout
public void alignedBarrierTimeout(long checkpointId) throws IOException
- Throws:
IOException
-
abortCheckpoint
public void abortCheckpoint(long checkpointId, CheckpointException cause)
-
closeAllOperators
public void closeAllOperators() throws Exception
ExecuteStreamOperator.close()
of each operator in the chain of thisStreamTask
. Closing happens from tail to head operator in the chain.- Throws:
Exception
-
getStreamOutputs
public RecordWriterOutput<?>[] getStreamOutputs()
-
getAllOperators
@VisibleForTesting public Iterable<StreamOperatorWrapper<?,?>> getAllOperators()
Returns anIterable
which traverses all operators in forward topological order.
-
getAllOperators
protected Iterable<StreamOperatorWrapper<?,?>> getAllOperators(boolean reverse)
Returns anIterable
which traverses all operators in forward or reverse topological order.
-
getFinishedOnRestoreInputOrDefault
public Input getFinishedOnRestoreInputOrDefault(Input defaultInput)
-
getNumberOfOperators
public int getNumberOfOperators()
-
getMainOperatorOutput
public WatermarkGaugeExposingOutput<StreamRecord<OUT>> getMainOperatorOutput()
-
getChainedSource
public OperatorChain.ChainedSource getChainedSource(StreamConfig.SourceInputConfig sourceInput)
-
getChainedSourceOutputs
public List<Output<StreamRecord<?>>> getChainedSourceOutputs()
-
getSourceTaskInput
public StreamTaskSourceInput<?> getSourceTaskInput(StreamConfig.SourceInputConfig sourceInput)
-
getSourceTaskInputs
public List<StreamTaskSourceInput<?>> getSourceTaskInputs()
-
flushOutputs
public void flushOutputs() throws IOException
This method should be called before finishing the record emission, to make sure any data that is still buffered will be sent. It also ensures that all data sending related exceptions are recognized.- Throws:
IOException
- Thrown, if the buffered data cannot be pushed into the output streams.
-
close
public void close() throws IOException
This method releases all resources of the record writer output. It stops the output flushing thread (if there is one) and releases all buffers currently held by the output serializers.This method should never fail.
- Specified by:
close
in interfaceAutoCloseable
- Specified by:
close
in interfaceCloseable
- Throws:
IOException
-
getMainOperator
@Nullable public OP getMainOperator()
-
getTailOperator
@Nullable protected StreamOperator<?> getTailOperator()
-
snapshotChannelStates
protected void snapshotChannelStates(StreamOperator<?> op, ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult, OperatorSnapshotFutures snapshotInProgress)
-
isClosed
public boolean isClosed()
-
sendAcknowledgeCheckpointEvent
protected void sendAcknowledgeCheckpointEvent(long checkpointId)
-
-