public class TaskManager extends Object implements FlinkActor, LeaderSessionMessageFilter, LogMessages, LeaderRetrievalListener
- "Waiting to be registered": In that phase, it periodically
sends a RegisterTaskManager
message to the JobManager.
Upon successful registration, the JobManager replies with an AcknowledgeRegistration
message. This stops the registration messages and initializes all fields
that require the JobManager's actor reference.
- "Operational": Here the TaskManager accepts and processes task messages, like
SubmitTask
, CancelTask
, FailTask
.
If the TaskManager disconnects from the JobManager (because the JobManager is no longer
reachable), the TaskManager gets back to the "waiting to be registered" state.
========== Failure model of the TaskManager ==========
The TaskManager tries to compensate for task failures as far as possible by marking the task as failed and removing all its resources. This causes the JobManager to restart the task (on this same TaskManager or on a different TaskManager).
In certain cases, exceptions indicate that the TaskManager is unable to proceed. The most robust way to clean up is letting the OS/kernel do it, so we will trigger killing the process. In case of YARN (or resilient standalone mode), the process will be restarted, producing a clean state. To achieve this, we kill the TaskManager actor. The watch dog actor (process reaper) will recognize that and kill the TaskManager process.
Fatal errors that require TaskManager JVM restart include:
- Errors bringing up the Network Stack or Library Cache after the TaskManager has registered at the JobManager. The TaskManager cannot operate without.
- Exceptions while releasing the task resources from the network stack, intermediate results, or memory manager. Those situations indicate a critical leak in the resource management, which can only be reliably fixed through a JVM restart.
- Exceptions releasing intermediate result resources. Critical resource leak, requires a clean JVM.
Constructor and Description |
---|
TaskManager(TaskManagerConfiguration config,
ResourceID resourceID,
TaskManagerLocation location,
MemoryManager memoryManager,
IOManager ioManager,
NetworkEnvironment network,
int numberOfSlots,
HighAvailabilityServices highAvailabilityServices,
MetricRegistry metricsRegistry) |
Modifier and Type | Method and Description |
---|---|
protected static void |
aroundPostRestart(Throwable reason) |
protected static void |
aroundPostStop() |
protected static void |
aroundPreRestart(Throwable reason,
scala.Option<Object> message) |
protected static void |
aroundPreStart() |
protected static void |
aroundReceive(scala.PartialFunction<Object,scala.runtime.BoxedUnit> receive,
Object msg) |
protected akka.util.Timeout |
askTimeout()
The timeout for all actor ask futures
|
protected BroadcastVariableManager |
bcVarManager()
Handler for shared broadcast variables (shared between multiple Tasks)
|
protected TaskManagerConfiguration |
config() |
static akka.actor.ActorContext |
context() |
protected scala.Option<akka.actor.ActorRef> |
currentJobManager() |
static Object |
decorateMessage(Object message) |
protected FileCache |
fileCache()
Handler for distributed files cached by this TaskManager
|
static akka.actor.Props |
getTaskManagerProps(Class<? extends TaskManager> taskManagerClass,
TaskManagerConfiguration taskManagerConfig,
ResourceID resourceID,
TaskManagerLocation taskManagerLocation,
MemoryManager memoryManager,
IOManager ioManager,
NetworkEnvironment networkEnvironment,
HighAvailabilityServices highAvailabilityServices,
MetricRegistry metricsRegistry) |
static akka.actor.ActorRef |
getTaskManagerRemoteReference(String taskManagerUrl,
akka.actor.ActorSystem system,
scala.concurrent.duration.FiniteDuration timeout)
Resolves the TaskManager actor reference in a blocking fashion.
|
void |
handleError(Exception exception)
This method is called by the
LeaderRetrievalService in case of an exception. |
protected void |
handleJobManagerDisconnect(String msg) |
scala.PartialFunction<Object,scala.runtime.BoxedUnit> |
handleMessage()
Central handling of actor messages.
|
static scala.concurrent.duration.FiniteDuration |
HEARTBEAT_INTERVAL() |
protected HighAvailabilityServices |
highAvailabilityServices() |
protected IOManager |
ioManager() |
protected boolean |
isConnected()
Checks whether the TaskManager is currently connected to the JobManager.
|
protected LeaderRetrievalService |
leaderRetrievalService() |
scala.Option<UUID> |
leaderSessionID() |
protected TaskManagerLocation |
location() |
grizzled.slf4j.Logger |
log() |
static grizzled.slf4j.Logger |
LOG()
TaskManager logger for synchronous logging (not through the logging actor)
|
static void |
main(String[] args)
Entry point (main method) to run the TaskManager in a standalone fashion.
|
static long |
MAX_STARTUP_CONNECT_TIME()
Maximum time (milli seconds) that the TaskManager will spend searching for a
|
protected MemoryManager |
memoryManager() |
protected MetricRegistry |
metricsRegistry() |
protected NetworkEnvironment |
network() |
void |
notifyLeaderAddress(String leaderAddress,
UUID leaderSessionID)
This method is called by the
LeaderRetrievalService when a new leader is elected. |
protected int |
numberOfSlots() |
static Configuration |
parseArgsAndLoadConfig(String[] args)
Parse the command line arguments of the TaskManager and loads the configuration.
|
static void |
postRestart(Throwable reason) |
void |
postStop()
Called after the actor is stopped.
|
static void |
preRestart(Throwable reason,
scala.Option<Object> message) |
void |
preStart()
Called prior to the actor receiving any messages.
|
static scala.PartialFunction<Object,scala.runtime.BoxedUnit> |
receive() |
protected ResourceID |
resourceID() |
protected HardwareDescription |
resources()
The TaskManager's physical execution resources
|
protected HashMap<ExecutionAttemptID,Task> |
runningTasks()
Registry of all tasks currently executed by this TaskManager
|
static void |
runTaskManager(String taskManagerHostname,
ResourceID resourceID,
int actorSystemPort,
Configuration configuration,
HighAvailabilityServices highAvailabilityServices)
Starts and runs the TaskManager.
|
static void |
runTaskManager(String taskManagerHostname,
ResourceID resourceID,
int actorSystemPort,
Configuration configuration,
HighAvailabilityServices highAvailabilityServices,
Class<? extends TaskManager> taskManagerClass)
Starts and runs the TaskManager.
|
static int |
RUNTIME_FAILURE_RETURN_CODE()
Return code for critical errors during the runtime
|
static scala.Tuple2<String,Object> |
selectNetworkInterfaceAndPort(Configuration configuration,
HighAvailabilityServices highAvailabilityServices) |
static void |
selectNetworkInterfaceAndRunTaskManager(Configuration configuration,
ResourceID resourceID,
Class<? extends TaskManager> taskManagerClass)
Starts and runs the TaskManager.
|
static akka.actor.ActorRef |
self() |
static akka.actor.ActorRef |
sender() |
protected void |
sendHeartbeatToJobManager()
Sends a heartbeat message to the JobManager (if connected) with the current
metrics report.
|
protected void |
shutdown() |
static akka.actor.ActorRef |
startTaskManagerComponentsAndActor(Configuration configuration,
ResourceID resourceID,
akka.actor.ActorSystem actorSystem,
HighAvailabilityServices highAvailabilityServices,
String taskManagerHostname,
scala.Option<String> taskManagerActorName,
boolean localTaskManagerCommunication,
Class<? extends TaskManager> taskManagerClass)
Starts the task manager actor.
|
static long |
STARTUP_CONNECT_LOG_SUPPRESS()
Time (milli seconds) after which the TaskManager will start logging failed
|
static int |
STARTUP_FAILURE_RETURN_CODE()
Return code for unsuccessful TaskManager startup
|
static akka.actor.SupervisorStrategy |
supervisorStrategy() |
void |
triggerTaskManagerRegistration()
Starts the TaskManager's registration process to connect to the JobManager.
|
void |
unhandled(Object message)
Handle unmatched messages with an exception.
|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
decorateMessage, handleDiscardedMessage, handleNoLeaderId, receive
receive
public TaskManager(TaskManagerConfiguration config, ResourceID resourceID, TaskManagerLocation location, MemoryManager memoryManager, IOManager ioManager, NetworkEnvironment network, int numberOfSlots, HighAvailabilityServices highAvailabilityServices, MetricRegistry metricsRegistry)
public static grizzled.slf4j.Logger LOG()
public static int STARTUP_FAILURE_RETURN_CODE()
public static int RUNTIME_FAILURE_RETURN_CODE()
public static long MAX_STARTUP_CONNECT_TIME()
public static long STARTUP_CONNECT_LOG_SUPPRESS()
public static scala.concurrent.duration.FiniteDuration HEARTBEAT_INTERVAL()
public static void main(String[] args)
args
- The command line arguments.public static Configuration parseArgsAndLoadConfig(String[] args) throws Exception
args
- Command line argumentsException
public static void selectNetworkInterfaceAndRunTaskManager(Configuration configuration, ResourceID resourceID, Class<? extends TaskManager> taskManagerClass) throws Exception
This method first tries to select the network interface to use for the TaskManager communication. The network interface is used both for the actor communication (coordination) as well as for the data exchange between task managers. Unless the hostname/interface is explicitly configured in the configuration, this method will try out various interfaces and methods to connect to the JobManager and select the one where the connection attempt is successful.
After selecting the network interface, this method brings up an actor system for the TaskManager and its actors, starts the TaskManager's services (library cache, shuffle network stack, ...), and starts the TaskManager itself.
configuration
- The configuration for the TaskManager.taskManagerClass
- The actor class to instantiate.
Allows to use TaskManager subclasses for example for YARN.resourceID
- (undocumented)Exception
public static scala.Tuple2<String,Object> selectNetworkInterfaceAndPort(Configuration configuration, HighAvailabilityServices highAvailabilityServices) throws IOException, IllegalConfigurationException
public static void runTaskManager(String taskManagerHostname, ResourceID resourceID, int actorSystemPort, Configuration configuration, HighAvailabilityServices highAvailabilityServices) throws Exception
This method will also spawn a process reaper for the TaskManager (kill the process if the actor fails) and optionally start the JVM memory logging thread.
taskManagerHostname
- The hostname/address of the interface where the actor system
will communicate.resourceID
- The id of the resource which the task manager will run on.actorSystemPort
- The port at which the actor system will communicate.configuration
- The configuration for the TaskManager.highAvailabilityServices
- Service factory for high availability servicesException
public static void runTaskManager(String taskManagerHostname, ResourceID resourceID, int actorSystemPort, Configuration configuration, HighAvailabilityServices highAvailabilityServices, Class<? extends TaskManager> taskManagerClass) throws Exception
This method will also spawn a process reaper for the TaskManager (kill the process if the actor fails) and optionally start the JVM memory logging thread.
taskManagerHostname
- The hostname/address of the interface where the actor system
will communicate.resourceID
- The id of the resource which the task manager will run on.actorSystemPort
- The port at which the actor system will communicate.configuration
- The configuration for the TaskManager.highAvailabilityServices
- Service factory for high availability servicestaskManagerClass
- The actor class to instantiate. Allows the use of TaskManager
subclasses for example for YARN.Exception
public static akka.actor.ActorRef startTaskManagerComponentsAndActor(Configuration configuration, ResourceID resourceID, akka.actor.ActorSystem actorSystem, HighAvailabilityServices highAvailabilityServices, String taskManagerHostname, scala.Option<String> taskManagerActorName, boolean localTaskManagerCommunication, Class<? extends TaskManager> taskManagerClass) throws IllegalConfigurationException, IOException, Exception
configuration
- The configuration for the TaskManager.resourceID
- The id of the resource which the task manager will run on.actorSystem
- The actor system that should run the TaskManager actor.highAvailabilityServices
- Factory to create high availability servicestaskManagerHostname
- The hostname/address that describes the TaskManager's data location.taskManagerActorName
- Optionally the name of the TaskManager actor. If none is given,
the actor will use a random name.localTaskManagerCommunication
- If true, the TaskManager will not initiate the
TCP network stack.taskManagerClass
- The class of the TaskManager actor. May be used to give
subclasses that understand additional actor messages.IllegalConfigurationException
- Thrown, if the given config contains illegal values.IOException
- Thrown, if any of the I/O components (such as buffer pools,
I/O manager, ...) cannot be properly started.Exception
- Thrown is some other error occurs while parsing the configuration
or starting the TaskManager components.public static akka.actor.Props getTaskManagerProps(Class<? extends TaskManager> taskManagerClass, TaskManagerConfiguration taskManagerConfig, ResourceID resourceID, TaskManagerLocation taskManagerLocation, MemoryManager memoryManager, IOManager ioManager, NetworkEnvironment networkEnvironment, HighAvailabilityServices highAvailabilityServices, MetricRegistry metricsRegistry)
public static akka.actor.ActorRef getTaskManagerRemoteReference(String taskManagerUrl, akka.actor.ActorSystem system, scala.concurrent.duration.FiniteDuration timeout) throws IOException
taskManagerUrl
- The akka URL of the JobManager.system
- The local actor system that should perform the lookup.timeout
- The maximum time to wait until the lookup fails.IOException
- Thrown, if the lookup fails.public static akka.actor.ActorContext context()
public static final akka.actor.ActorRef self()
public static final akka.actor.ActorRef sender()
protected static void aroundReceive(scala.PartialFunction<Object,scala.runtime.BoxedUnit> receive, Object msg)
protected static void aroundPreStart()
protected static void aroundPostStop()
protected static void aroundPreRestart(Throwable reason, scala.Option<Object> message)
protected static void aroundPostRestart(Throwable reason)
public static akka.actor.SupervisorStrategy supervisorStrategy()
public static void preRestart(Throwable reason, scala.Option<Object> message) throws Exception
Exception
public static scala.PartialFunction<Object,scala.runtime.BoxedUnit> receive()
protected TaskManagerConfiguration config()
protected ResourceID resourceID()
protected TaskManagerLocation location()
protected MemoryManager memoryManager()
protected IOManager ioManager()
protected NetworkEnvironment network()
protected int numberOfSlots()
protected HighAvailabilityServices highAvailabilityServices()
protected MetricRegistry metricsRegistry()
public grizzled.slf4j.Logger log()
log
in interface FlinkActor
protected akka.util.Timeout askTimeout()
protected HardwareDescription resources()
protected HashMap<ExecutionAttemptID,Task> runningTasks()
protected BroadcastVariableManager bcVarManager()
protected FileCache fileCache()
protected LeaderRetrievalService leaderRetrievalService()
protected scala.Option<akka.actor.ActorRef> currentJobManager()
public scala.Option<UUID> leaderSessionID()
leaderSessionID
in interface LeaderSessionMessageFilter
public void preStart()
preStart
in interface akka.actor.Actor
public void postStop()
postStop
in interface akka.actor.Actor
public scala.PartialFunction<Object,scala.runtime.BoxedUnit> handleMessage()
handleMessage
in interface FlinkActor
public void unhandled(Object message)
unhandled
in interface akka.actor.Actor
message
- (undocumented)protected boolean isConnected()
protected void handleJobManagerDisconnect(String msg)
protected void sendHeartbeatToJobManager()
public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID)
LeaderRetrievalListener
LeaderRetrievalService
when a new leader is elected.notifyLeaderAddress
in interface LeaderRetrievalListener
leaderAddress
- The address of the new leaderleaderSessionID
- The new leader session IDpublic void triggerTaskManagerRegistration()
public void handleError(Exception exception)
LeaderRetrievalListener
LeaderRetrievalService
in case of an exception. This
assures that the LeaderRetrievalListener
is aware of any problems occurring in the
LeaderRetrievalService
thread.handleError
in interface LeaderRetrievalListener
protected void shutdown()
Copyright © 2014–2018 The Apache Software Foundation. All rights reserved.