OUT
- The type of elements accepted by the chain, i.e., the input type of the chain's main
operator.public abstract class OperatorChain<OUT,OP extends StreamOperator<OUT>> extends Object implements BoundedMultiInput, Closeable
OperatorChain
contains all operators that are executed as one chain within a single
StreamTask
.
The main entry point to the chain is it's mainOperator
. mainOperator
is
driving the execution of the StreamTask
, by pulling the records from network inputs
and/or source inputs and pushing produced records to the remaining chained operators.
Modifier and Type | Class and Description |
---|---|
static class |
OperatorChain.ChainedSource
Wrapper class to access the chained sources and their's outputs.
|
Modifier and Type | Field and Description |
---|---|
protected Map<StreamConfig.SourceInputConfig,OperatorChain.ChainedSource> |
chainedSources |
protected org.apache.flink.shaded.guava31.com.google.common.io.Closer |
closer |
protected FinishedOnRestoreInput |
finishedOnRestoreInput |
protected StreamOperatorWrapper<?,?> |
firstOperatorWrapper |
protected boolean |
isClosed |
protected WatermarkGaugeExposingOutput<StreamRecord<OUT>> |
mainOperatorOutput |
protected StreamOperatorWrapper<OUT,OP> |
mainOperatorWrapper
For iteration,
StreamIterationHead and StreamIterationTail used for executing
feedback edges do not contain any operators, in which case, mainOperatorWrapper and
tailOperatorWrapper are null. |
protected int |
numOperators |
protected OperatorEventDispatcherImpl |
operatorEventDispatcher |
protected RecordWriterOutput<?>[] |
streamOutputs |
protected StreamOperatorWrapper<?,?> |
tailOperatorWrapper |
Constructor and Description |
---|
OperatorChain(StreamTask<OUT,OP> containingTask,
RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate) |
Modifier and Type | Method and Description |
---|---|
void |
abortCheckpoint(long checkpointId,
CheckpointException cause) |
void |
alignedBarrierTimeout(long checkpointId) |
void |
broadcastEvent(AbstractEvent event) |
void |
broadcastEvent(AbstractEvent event,
boolean isPriorityEvent) |
void |
close()
This method releases all resources of the record writer output.
|
void |
closeAllOperators()
Execute
StreamOperator.close() of each operator in the chain of this StreamTask . |
abstract void |
dispatchOperatorEvent(OperatorID operator,
SerializedValue<OperatorEvent> event) |
abstract void |
endInput(int inputId)
Ends the main operator input specified by
inputId ). |
abstract void |
finishOperators(StreamTaskActionExecutor actionExecutor,
StopMode stopMode)
Closes all operators in a chain effect way.
|
void |
flushOutputs()
This method should be called before finishing the record emission, to make sure any data that
is still buffered will be sent.
|
Iterable<StreamOperatorWrapper<?,?>> |
getAllOperators()
Returns an
Iterable which traverses all operators in forward topological order. |
protected Iterable<StreamOperatorWrapper<?,?>> |
getAllOperators(boolean reverse)
Returns an
Iterable which traverses all operators in forward or reverse topological
order. |
OperatorChain.ChainedSource |
getChainedSource(StreamConfig.SourceInputConfig sourceInput) |
List<Output<StreamRecord<?>>> |
getChainedSourceOutputs() |
Input |
getFinishedOnRestoreInputOrDefault(Input defaultInput) |
OP |
getMainOperator() |
WatermarkGaugeExposingOutput<StreamRecord<OUT>> |
getMainOperatorOutput() |
int |
getNumberOfOperators() |
OperatorEventDispatcher |
getOperatorEventDispatcher() |
StreamTaskSourceInput<?> |
getSourceTaskInput(StreamConfig.SourceInputConfig sourceInput) |
List<StreamTaskSourceInput<?>> |
getSourceTaskInputs() |
RecordWriterOutput<?>[] |
getStreamOutputs() |
protected StreamOperator<?> |
getTailOperator() |
abstract void |
initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer)
Initialize state and open all operators in the chain from tail to heads, contrary to
StreamOperator.close() which happens heads to tail (see finishOperators(StreamTaskActionExecutor, StopMode) ). |
boolean |
isClosed() |
abstract boolean |
isTaskDeployedAsFinished() |
abstract void |
notifyCheckpointAborted(long checkpointId) |
abstract void |
notifyCheckpointComplete(long checkpointId) |
abstract void |
notifyCheckpointSubsumed(long checkpointId) |
abstract void |
prepareSnapshotPreBarrier(long checkpointId) |
protected void |
sendAcknowledgeCheckpointEvent(long checkpointId) |
protected void |
snapshotChannelStates(StreamOperator<?> op,
ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult,
OperatorSnapshotFutures snapshotInProgress) |
abstract void |
snapshotState(Map<OperatorID,OperatorSnapshotFutures> operatorSnapshotsInProgress,
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
java.util.function.Supplier<Boolean> isRunning,
ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult,
CheckpointStreamFactory storage) |
protected final RecordWriterOutput<?>[] streamOutputs
protected final WatermarkGaugeExposingOutput<StreamRecord<OUT>> mainOperatorOutput
@Nullable protected final StreamOperatorWrapper<OUT,OP extends StreamOperator<OUT>> mainOperatorWrapper
StreamIterationHead
and StreamIterationTail
used for executing
feedback edges do not contain any operators, in which case, mainOperatorWrapper
and
tailOperatorWrapper
are null.
Usually first operator in the chain is the same as mainOperatorWrapper
, but
that's not the case if there are chained source inputs. In this case, one of the source
inputs will be the first operator. For example the following operator chain is possible:
first \ main (multi-input) -> ... -> tail / second
Where "first" and "second" (there can be more) are chained source operators. When it comes to things like closing, stat initialisation or state snapshotting, the operator chain is traversed: first, second, main, ..., tail or in reversed order: tail, ..., main, second, first
@Nullable protected final StreamOperatorWrapper<?,?> firstOperatorWrapper
@Nullable protected final StreamOperatorWrapper<?,?> tailOperatorWrapper
protected final Map<StreamConfig.SourceInputConfig,OperatorChain.ChainedSource> chainedSources
protected final int numOperators
protected final OperatorEventDispatcherImpl operatorEventDispatcher
protected final org.apache.flink.shaded.guava31.com.google.common.io.Closer closer
@Nullable protected final FinishedOnRestoreInput finishedOnRestoreInput
protected boolean isClosed
public OperatorChain(StreamTask<OUT,OP> containingTask, RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate)
public abstract boolean isTaskDeployedAsFinished()
public abstract void dispatchOperatorEvent(OperatorID operator, SerializedValue<OperatorEvent> event) throws FlinkException
FlinkException
public abstract void prepareSnapshotPreBarrier(long checkpointId) throws Exception
Exception
public abstract void endInput(int inputId) throws Exception
inputId
).endInput
in interface BoundedMultiInput
inputId
- the input ID starts from 1 which indicates the first input.Exception
public abstract void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception
StreamOperator.close()
which happens heads to tail (see finishOperators(StreamTaskActionExecutor, StopMode)
).Exception
public abstract void finishOperators(StreamTaskActionExecutor actionExecutor, StopMode stopMode) throws Exception
StreamOperator.open()
which happens tail to
heads (see initializeStateAndOpenOperators(StreamTaskStateInitializer)
).Exception
public abstract void notifyCheckpointComplete(long checkpointId) throws Exception
Exception
public abstract void notifyCheckpointAborted(long checkpointId) throws Exception
Exception
public abstract void notifyCheckpointSubsumed(long checkpointId) throws Exception
Exception
public abstract void snapshotState(Map<OperatorID,OperatorSnapshotFutures> operatorSnapshotsInProgress, CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, java.util.function.Supplier<Boolean> isRunning, ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult, CheckpointStreamFactory storage) throws Exception
Exception
public OperatorEventDispatcher getOperatorEventDispatcher()
public void broadcastEvent(AbstractEvent event) throws IOException
IOException
public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException
IOException
public void alignedBarrierTimeout(long checkpointId) throws IOException
IOException
public void abortCheckpoint(long checkpointId, CheckpointException cause)
public void closeAllOperators() throws Exception
StreamOperator.close()
of each operator in the chain of this StreamTask
. Closing happens from tail to head operator in the chain.Exception
public RecordWriterOutput<?>[] getStreamOutputs()
@VisibleForTesting public Iterable<StreamOperatorWrapper<?,?>> getAllOperators()
Iterable
which traverses all operators in forward topological order.protected Iterable<StreamOperatorWrapper<?,?>> getAllOperators(boolean reverse)
Iterable
which traverses all operators in forward or reverse topological
order.public Input getFinishedOnRestoreInputOrDefault(Input defaultInput)
public int getNumberOfOperators()
public WatermarkGaugeExposingOutput<StreamRecord<OUT>> getMainOperatorOutput()
public OperatorChain.ChainedSource getChainedSource(StreamConfig.SourceInputConfig sourceInput)
public List<Output<StreamRecord<?>>> getChainedSourceOutputs()
public StreamTaskSourceInput<?> getSourceTaskInput(StreamConfig.SourceInputConfig sourceInput)
public List<StreamTaskSourceInput<?>> getSourceTaskInputs()
public void flushOutputs() throws IOException
IOException
- Thrown, if the buffered data cannot be pushed into the output streams.public void close() throws IOException
This method should never fail.
close
in interface Closeable
close
in interface AutoCloseable
IOException
@Nullable protected StreamOperator<?> getTailOperator()
protected void snapshotChannelStates(StreamOperator<?> op, ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult, OperatorSnapshotFutures snapshotInProgress)
public boolean isClosed()
protected void sendAcknowledgeCheckpointEvent(long checkpointId)
Copyright © 2014–2023 The Apache Software Foundation. All rights reserved.