OUT
- Operator
- @Internal public abstract class StreamTask<OUT,Operator extends StreamOperator<OUT>> extends AbstractInvokable implements StatefulTask<StreamTaskStateList>
StreamOperator
s which form
the Task's operator chain. Operators that are chained together execute synchronously in the
same thread and hence on the same stream partition. A common case for these chains
are successive map/flatmap/filter tasks.
The task chain contains one "head" operator and multiple chained operators. The StreamTask is specialized for the type of the head operator: one-input and two-input tasks, as well as for sources, iteration heads and iteration tails.
The Task class deals with the setup of the streams read by the head operator, and the streams produced by the operators at the ends of the operator chain. Note that the chain may fork and thus have multiple ends. The life cycle of the task is set up as follows:
-- restoreState() -> restores state of all operators in the chain
-- invoke()
|
+----> Create basic utils (config, etc) and load the chain of operators
+----> operators.setup()
+----> task specific init()
+----> open-operators()
+----> run()
+----> close-operators()
+----> dispose-operators()
+----> common cleanup
+----> task specific cleanup()
The StreamTask
has a lock object called lock
. All calls to methods on a
StreamOperator
must be synchronized on this lock object to ensure that no methods
are called concurrently.
Modifier and Type | Field and Description |
---|---|
protected Operator |
headOperator
the head operator that consumes the input streams of this task
|
protected static org.slf4j.Logger |
LOG
The logger used by the StreamTask and its subclasses
|
static ThreadGroup |
TRIGGER_THREAD_GROUP
The thread group that holds all trigger timer threads
|
Constructor and Description |
---|
StreamTask() |
Modifier and Type | Method and Description |
---|---|
void |
abortCheckpointOnBarrier(long checkpointId,
Throwable cause)
Aborts a checkpoint as the result of receiving possibly some checkpoint barriers,
but at least one
CancelCheckpointMarker . |
void |
cancel()
This method is called when a task is canceled either as a result of a user abort or an execution failure.
|
protected abstract void |
cancelTask() |
void |
checkTimerException()
Check whether an exception was thrown in a Thread other than the main Thread.
|
protected abstract void |
cleanup() |
AbstractStateBackend |
createStateBackend(String operatorIdentifier,
TypeSerializer<?> keySerializer) |
void |
failExternally(Throwable cause)
Marks task execution failed for an external reason (a reason other than the task code itself
throwing an exception).
|
protected void |
finalize()
The finalize method shuts down the timer.
|
Map<String,Accumulator<?,?>> |
getAccumulatorMap() |
Object |
getCheckpointLock()
Gets the lock object on which all operations that involve data and state mutation have to lock.
|
StreamConfig |
getConfiguration() |
long |
getCurrentProcessingTime() |
Output<StreamRecord<OUT>> |
getHeadOutput() |
String |
getName()
Gets the name of the task, in the form "taskname (2/5)".
|
RecordWriterOutput<?>[] |
getStreamOutputs() |
protected abstract void |
init() |
void |
invoke()
Starts the execution.
|
boolean |
isCanceled() |
boolean |
isRunning() |
protected boolean |
isSerializingTimestamps() |
void |
notifyCheckpointComplete(long checkpointId)
Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received
the notification from all participating tasks.
|
ScheduledFuture<?> |
registerTimer(long timestamp,
Triggerable target)
Registers a timer.
|
protected abstract void |
run() |
void |
setInitialState(StreamTaskStateList initialState)
Sets the initial state of the operator, upon recovery.
|
void |
setTimeService(TimeServiceProvider timeProvider)
Allows the user to specify his own
TimerServiceProvider . |
String |
toString() |
boolean |
triggerCheckpoint(long checkpointId,
long timestamp)
This method is either called directly and asynchronously by the checkpoint
coordinator (in the case of functions that are directly notified - usually
the data sources), or called synchronously when all incoming channels have
reported a checkpoint barrier.
|
void |
triggerCheckpointOnBarrier(long checkpointId,
long timestamp)
This method is called when a checkpoint is triggered as a result of receiving checkpoint
barriers on all input streams.
|
getCurrentNumberOfSubtasks, getEnvironment, getExecutionConfig, getIndexInSubtaskGroup, getJobConfiguration, getTaskConfiguration, getUserCodeClassLoader, setEnvironment
public static final ThreadGroup TRIGGER_THREAD_GROUP
protected static final org.slf4j.Logger LOG
protected Operator extends StreamOperator<OUT> headOperator
public void setTimeService(TimeServiceProvider timeProvider)
TimerServiceProvider
.
By default a DefaultTimerService
is going to be provided.
Changing it can be useful for testing processing time functionality, such as
WindowAssigners
and Triggers
.public long getCurrentProcessingTime()
public final void invoke() throws Exception
AbstractInvokable
Must be overwritten by the concrete task implementation. This method is called by the task manager when the actual execution of the task starts.
All resources should be cleaned up when the method returns. Make sure
to guard the code with try-finally
blocks where necessary.
invoke
in class AbstractInvokable
Exception
- Tasks may forward their exceptions for the TaskManager to handle through failure/recovery.public void failExternally(Throwable cause)
This method never blocks.
public final void cancel() throws Exception
AbstractInvokable
cancel
in class AbstractInvokable
Exception
- thrown if any exception occurs during the execution of the user codepublic final boolean isRunning()
public final boolean isCanceled()
protected void finalize() throws Throwable
This should not be relied upon! It will cause shutdown to happen much later than if manual shutdown is attempted, and cause threads to linger for longer than needed.
protected boolean isSerializingTimestamps()
public String getName()
public Object getCheckpointLock()
public StreamConfig getConfiguration()
public Map<String,Accumulator<?,?>> getAccumulatorMap()
public Output<StreamRecord<OUT>> getHeadOutput()
public RecordWriterOutput<?>[] getStreamOutputs()
public void setInitialState(StreamTaskStateList initialState)
StatefulTask
setInitialState
in interface StatefulTask<StreamTaskStateList>
initialState
- The handle to the state.public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception
StatefulTask
triggerCheckpoint
in interface StatefulTask<StreamTaskStateList>
checkpointId
- The ID of the checkpoint, incrementing.timestamp
- The timestamp when the checkpoint was triggered at the JobManager.false
if the checkpoint can not be carried out, true
otherwiseException
public void triggerCheckpointOnBarrier(long checkpointId, long timestamp) throws Exception
StatefulTask
triggerCheckpointOnBarrier
in interface StatefulTask<StreamTaskStateList>
checkpointId
- The ID of the checkpoint, incrementing.timestamp
- The timestamp when the checkpoint was triggered at the JobManager.Exception
- Exceptions thrown as the result of triggering a checkpoint are forwarded.public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws Exception
StatefulTask
CancelCheckpointMarker
.
This requires implementing tasks to forward a
CancelCheckpointMarker
to their outputs.
abortCheckpointOnBarrier
in interface StatefulTask<StreamTaskStateList>
checkpointId
- The ID of the checkpoint to be aborted.cause
- The reason why the checkpoint was aborted during alignmentException
public void notifyCheckpointComplete(long checkpointId) throws Exception
StatefulTask
notifyCheckpointComplete
in interface StatefulTask<StreamTaskStateList>
checkpointId
- The ID of the checkpoint that is complete..Exception
- The notification method may forward its exceptions.public AbstractStateBackend createStateBackend(String operatorIdentifier, TypeSerializer<?> keySerializer) throws Exception
Exception
public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target)
public void checkTimerException() throws AsynchronousException
This must be called in the main loop of StreamTask
subclasses to ensure
that we propagate failures.
AsynchronousException
Copyright © 2014–2017 The Apache Software Foundation. All rights reserved.