Class MailboxProcessor
- java.lang.Object
-
- org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
-
- All Implemented Interfaces:
Closeable
,AutoCloseable
@Internal public class MailboxProcessor extends Object implements Closeable
This class encapsulates the logic of the mailbox-based execution model. At the core of this modelrunMailboxLoop()
that continuously executes the providedMailboxDefaultAction
in a loop. On each iteration, the method also checks if there are pending actions in the mailbox and executes such actions. This model ensures single-threaded execution between the default action (e.g. record processing) and mailbox actions (e.g. checkpoint trigger, timer firing, ...).The
MailboxDefaultAction
interacts with this class through theMailboxProcessor.MailboxController
to communicate control flow changes to the mailbox loop, e.g. that invocations of the default action are temporarily or permanently exhausted.The design of
runMailboxLoop()
is centered around the idea of keeping the expected hot path (default action, no mail) as fast as possible. This means that all checking of mail and other control flags (mailboxLoopRunning, suspendedDefaultAction) are always connected to #hasMail indicating true. This means that control flag changes in the mailbox thread can be done directly, but we must ensure that there is at least one action in the mailbox so that the change is picked up. For control flag changes by all other threads, that must happen through mailbox actions, this is automatically the case.This class has an open-prepareClose-close lifecycle that is connected with and maps to the lifecycle of the encapsulated
TaskMailbox
(which is open-quiesce-close).
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected static class
MailboxProcessor.MailboxController
Implementation ofMailboxDefaultAction.Controller
that is connected to aMailboxProcessor
instance.
-
Field Summary
Fields Modifier and Type Field Description protected TaskMailbox
mailbox
The mailbox data-structure that manages request for special actions, like timers, checkpoints, ...protected MailboxDefaultAction
mailboxDefaultAction
Action that is repeatedly executed if no action request is in the mailbox.
-
Constructor Summary
Constructors Constructor Description MailboxProcessor()
MailboxProcessor(MailboxDefaultAction mailboxDefaultAction)
MailboxProcessor(MailboxDefaultAction mailboxDefaultAction, TaskMailbox mailbox, StreamTaskActionExecutor actionExecutor)
MailboxProcessor(MailboxDefaultAction mailboxDefaultAction, TaskMailbox mailbox, StreamTaskActionExecutor actionExecutor, MailboxMetricsController mailboxMetricsControl)
MailboxProcessor(MailboxDefaultAction mailboxDefaultAction, StreamTaskActionExecutor actionExecutor)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
allActionsCompleted()
This method must be called to end the stream task when all actions for the tasks have been performed.void
close()
Lifecycle method to close the mailbox for action submission/retrieval.void
drain()
Finishes running all mails in the mailbox.MailboxExecutor
getMailboxExecutor(int priority)
Returns an executor service facade to submit actions to the mailbox.MailboxMetricsController
getMailboxMetricsControl()
GetsMailboxMetricsController
for control and access to mailbox metrics.MailboxExecutor
getMainMailboxExecutor()
boolean
hasMail()
boolean
isDefaultActionAvailable()
boolean
isMailboxLoopRunning()
boolean
isMailboxThread()
Check if the current thread is the mailbox thread.void
prepareClose()
Lifecycle method to close the mailbox for action submission.void
reportThrowable(Throwable throwable)
Reports a throwable for rethrowing from the mailbox thread.void
runMailboxLoop()
Runs the mailbox processing loop.boolean
runMailboxStep()
Execute a single (as small as possible) step of the mailbox.boolean
runSingleMailboxLoop()
Execute a single (as small as possible) step of the mailbox.void
suspend()
Suspend the running of the loop which was started byrunMailboxLoop()
}.
-
-
-
Field Detail
-
mailbox
protected final TaskMailbox mailbox
The mailbox data-structure that manages request for special actions, like timers, checkpoints, ...
-
mailboxDefaultAction
protected final MailboxDefaultAction mailboxDefaultAction
Action that is repeatedly executed if no action request is in the mailbox. Typically record processing.
-
-
Constructor Detail
-
MailboxProcessor
@VisibleForTesting public MailboxProcessor()
-
MailboxProcessor
public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction)
-
MailboxProcessor
public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction, StreamTaskActionExecutor actionExecutor)
-
MailboxProcessor
public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction, TaskMailbox mailbox, StreamTaskActionExecutor actionExecutor)
-
MailboxProcessor
public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction, TaskMailbox mailbox, StreamTaskActionExecutor actionExecutor, MailboxMetricsController mailboxMetricsControl)
-
-
Method Detail
-
getMainMailboxExecutor
public MailboxExecutor getMainMailboxExecutor()
-
getMailboxExecutor
public MailboxExecutor getMailboxExecutor(int priority)
Returns an executor service facade to submit actions to the mailbox.- Parameters:
priority
- the priority of theMailboxExecutor
.
-
getMailboxMetricsControl
@VisibleForTesting public MailboxMetricsController getMailboxMetricsControl()
GetsMailboxMetricsController
for control and access to mailbox metrics.- Returns:
MailboxMetricsController
.
-
prepareClose
public void prepareClose()
Lifecycle method to close the mailbox for action submission.
-
close
public void close()
Lifecycle method to close the mailbox for action submission/retrieval. This will cancel all instances ofRunnableFuture
that are still contained in the mailbox.- Specified by:
close
in interfaceAutoCloseable
- Specified by:
close
in interfaceCloseable
-
drain
public void drain() throws Exception
Finishes running all mails in the mailbox. If no concurrent write operations occurred, the mailbox must be empty after this method.- Throws:
Exception
-
runMailboxLoop
public void runMailboxLoop() throws Exception
Runs the mailbox processing loop. This is where the main work is done. This loop can be suspended at any time by callingsuspend()
. For resuming the loop this method should be called again.- Throws:
Exception
-
suspend
public void suspend()
Suspend the running of the loop which was started byrunMailboxLoop()
}.
-
runSingleMailboxLoop
@VisibleForTesting public boolean runSingleMailboxLoop() throws Exception
Execute a single (as small as possible) step of the mailbox.- Returns:
- true if something was processed.
- Throws:
Exception
-
runMailboxStep
@VisibleForTesting public boolean runMailboxStep() throws Exception
Execute a single (as small as possible) step of the mailbox.- Returns:
- true if something was processed.
- Throws:
Exception
-
isMailboxThread
public boolean isMailboxThread()
Check if the current thread is the mailbox thread.- Returns:
- only true if called from the mailbox thread.
-
reportThrowable
public void reportThrowable(Throwable throwable)
Reports a throwable for rethrowing from the mailbox thread. This will clear and cancel all other pending mails.- Parameters:
throwable
- to report by rethrowing from the mailbox loop.
-
allActionsCompleted
public void allActionsCompleted()
This method must be called to end the stream task when all actions for the tasks have been performed.
-
isDefaultActionAvailable
@VisibleForTesting public boolean isDefaultActionAvailable()
-
isMailboxLoopRunning
@VisibleForTesting public boolean isMailboxLoopRunning()
-
hasMail
public boolean hasMail()
-
-