Class StreamTask<OUT,​OP extends StreamOperator<OUT>>

  • Type Parameters:
    OUT -
    OP -
    All Implemented Interfaces:
    CheckpointableTask, CoordinatedTask, TaskInvokable, AsyncExceptionHandler, ContainingTaskDetails
    Direct Known Subclasses:
    AbstractTwoInputStreamTask, MultipleInputStreamTask, OneInputStreamTask, SourceOperatorStreamTask, SourceStreamTask

    @Internal
    public abstract class StreamTask<OUT,​OP extends StreamOperator<OUT>>
    extends Object
    implements TaskInvokable, CheckpointableTask, CoordinatedTask, AsyncExceptionHandler, ContainingTaskDetails
    Base class for all streaming tasks. A task is the unit of local processing that is deployed and executed by the TaskManagers. Each task runs one or more StreamOperators which form the Task's operator chain. Operators that are chained together execute synchronously in the same thread and hence on the same stream partition. A common case for these chains are successive map/flatmap/filter tasks.

    The task chain contains one "head" operator and multiple chained operators. The StreamTask is specialized for the type of the head operator: one-input and two-input tasks, as well as for sources, iteration heads and iteration tails.

    The Task class deals with the setup of the streams read by the head operator, and the streams produced by the operators at the ends of the operator chain. Note that the chain may fork and thus have multiple ends.

    The life cycle of the task is set up as follows:

    
     -- setInitialState -> provides state of all operators in the chain
    
     -- invoke()
           |
           +----> Create basic utils (config, etc) and load the chain of operators
           +----> operators.setup()
           +----> task specific init()
           +----> initialize-operator-states()
           +----> open-operators()
           +----> run()
           +----> finish-operators()
           +----> close-operators()
           +----> common cleanup
           +----> task specific cleanup()
     

    The StreamTask has a lock object called lock. All calls to methods on a StreamOperator must be synchronized on this lock object to ensure that no methods are called concurrently.

    • Field Detail

      • TRIGGER_THREAD_GROUP

        public static final ThreadGroup TRIGGER_THREAD_GROUP
        The thread group that holds all trigger timer threads.
      • LOG

        protected static final org.slf4j.Logger LOG
        The logger used by the StreamTask and its subclasses.
      • mainOperator

        protected OP extends StreamOperator<OUT> mainOperator
        the main operator that consumes the input streams of this task.
      • configuration

        protected final StreamConfig configuration
        The configuration of this streaming task.
      • stateBackend

        protected final StateBackend stateBackend
        Our state backend. We use this to create a keyed state backend.
      • checkpointStorage

        protected final CheckpointStorage checkpointStorage
        Our checkpoint storage. We use this to create checkpoint streams.
      • timerService

        protected final TimerService timerService
        The internal TimerService used to define the current processing time (default = System.currentTimeMillis()) and register timers for tasks to be executed in the future.
      • systemTimerService

        protected final TimerService systemTimerService
        In contrast to timerService we should not register any user timers here. It should be used only for system level timers.
    • Method Detail

      • processInput

        protected void processInput​(MailboxDefaultAction.Controller controller)
                             throws Exception
        This method implements the default action of the task (e.g. processing one event from the input). Implementations should (in general) be non-blocking.
        Parameters:
        controller - controller object for collaborative interaction between the action and the stream task.
        Throws:
        Exception - on any problems in the action.
      • notifyEndOfData

        protected void notifyEndOfData()
      • setSynchronousSavepoint

        protected void setSynchronousSavepoint​(long checkpointId)
      • advanceToEndOfEventTime

        protected void advanceToEndOfEventTime()
                                        throws Exception
        Emits the MAX_WATERMARK so that all registered timers are fired.

        This is used by the source task when the job is TERMINATED. In the case, we want all the timers registered throughout the pipeline to fire and the related state (e.g. windows) to be flushed.

        For tasks other than the source task, this method does nothing.

        Throws:
        Exception
      • setupNumRecordsInCounter

        protected Counter setupNumRecordsInCounter​(StreamOperator streamOperator)
      • cancel

        public final void cancel()
                          throws Exception
        Description copied from interface: TaskInvokable
        This method is called when a task is canceled either as a result of a user abort or an execution failure. It can be overwritten to respond to shut down the user code properly.
        Specified by:
        cancel in interface TaskInvokable
        Throws:
        Exception
      • hasMail

        public boolean hasMail()
      • isRunning

        public final boolean isRunning()
      • isCanceled

        public final boolean isCanceled()
      • isFailing

        public final boolean isFailing()
      • finalize

        protected void finalize()
                         throws Throwable
        The finalize method shuts down the timer. This is a fail-safe shutdown, in case the original shutdown method was never called.

        This should not be relied upon! It will cause shutdown to happen much later than if manual shutdown is attempted, and cause threads to linger for longer than needed.

        Overrides:
        finalize in class Object
        Throws:
        Throwable
      • getName

        public final String getName()
        Gets the name of the task, in the form "taskname (2/5)".
        Returns:
        The name of the task.
      • getCheckpointBarrierHandler

        protected Optional<CheckpointBarrierHandler> getCheckpointBarrierHandler()
        Acquires the optional CheckpointBarrierHandler associated with this stream task. The CheckpointBarrierHandler should exist if the task has data inputs and requires to align the barriers.
      • triggerCheckpointOnBarrier

        public void triggerCheckpointOnBarrier​(CheckpointMetaData checkpointMetaData,
                                               CheckpointOptions checkpointOptions,
                                               CheckpointMetricsBuilder checkpointMetrics)
                                        throws IOException
        Description copied from interface: CheckpointableTask
        This method is called when a checkpoint is triggered as a result of receiving checkpoint barriers on all input streams.
        Specified by:
        triggerCheckpointOnBarrier in interface CheckpointableTask
        Parameters:
        checkpointMetaData - Meta data for about this checkpoint
        checkpointOptions - Options for performing this checkpoint
        checkpointMetrics - Metrics about this checkpoint
        Throws:
        IOException - Exceptions thrown as the result of triggering a checkpoint are forwarded.
      • declineCheckpoint

        protected void declineCheckpoint​(long checkpointId)
      • getAsyncOperationsThreadPool

        public final ExecutorService getAsyncOperationsThreadPool()
      • notifyCheckpointCompleteAsync

        public Future<Void> notifyCheckpointCompleteAsync​(long checkpointId)
        Description copied from interface: CheckpointableTask
        Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received the notification from all participating tasks.
        Specified by:
        notifyCheckpointCompleteAsync in interface CheckpointableTask
        Parameters:
        checkpointId - The ID of the checkpoint that is complete.
        Returns:
        future that completes when the notification has been processed by the task.
      • notifyCheckpointAbortAsync

        public Future<Void> notifyCheckpointAbortAsync​(long checkpointId,
                                                       long latestCompletedCheckpointId)
        Description copied from interface: CheckpointableTask
        Invoked when a checkpoint has been aborted, i.e., when the checkpoint coordinator has received a decline message from one task and try to abort the targeted checkpoint by notification.
        Specified by:
        notifyCheckpointAbortAsync in interface CheckpointableTask
        Parameters:
        checkpointId - The ID of the checkpoint that is aborted.
        latestCompletedCheckpointId - The ID of the latest completed checkpoint.
        Returns:
        future that completes when the notification has been processed by the task.
      • notifyCheckpointSubsumedAsync

        public Future<Void> notifyCheckpointSubsumedAsync​(long checkpointId)
        Description copied from interface: CheckpointableTask
        Invoked when a checkpoint has been subsumed, i.e., when the checkpoint coordinator has confirmed one checkpoint has been finished, and try to remove the first previous checkpoint.
        Specified by:
        notifyCheckpointSubsumedAsync in interface CheckpointableTask
        Parameters:
        checkpointId - The ID of the checkpoint that is subsumed.
        Returns:
        future that completes when the notification has been processed by the task.
      • handleAsyncException

        public void handleAsyncException​(String message,
                                         Throwable exception)
        Handles an exception thrown by another thread (e.g. a TriggerTask), other than the one executing the main task by failing the task entirely.

        In more detail, it marks task execution failed for an external reason (a reason other than the task code itself throwing an exception). If the task is already in a terminal state (such as FINISHED, CANCELED, FAILED), or if the task is already canceling this does nothing. Otherwise it sets the state to FAILED, and, if the invokable code is running, starts an asynchronous thread that aborts that code.

        This method never blocks.

        Specified by:
        handleAsyncException in interface AsyncExceptionHandler
      • getAsyncCheckpointStartDelayNanos

        protected long getAsyncCheckpointStartDelayNanos()
      • maybeInterruptOnCancel

        public void maybeInterruptOnCancel​(Thread toInterrupt,
                                           @Nullable
                                           String taskName,
                                           @Nullable
                                           Long timeout)
        Description copied from interface: TaskInvokable
        Checks whether the task should be interrupted during cancellation and if so, execute the specified Runnable interruptAction.
        Specified by:
        maybeInterruptOnCancel in interface TaskInvokable
        taskName - optional taskName to log stack trace
        timeout - optional timeout to log stack trace
      • getEnvironment

        public final Environment getEnvironment()