public class TaskManager extends Object implements FlinkActor, LeaderSessionMessageFilter, LogMessages, LeaderRetrievalListener
- "Waiting to be registered": In that phase, it periodically
sends a RegisterTaskManager
message to the JobManager.
Upon successful registration, the JobManager replies with an AcknowledgeRegistration
message. This stops the registration messages and initializes all fields
that require the JobManager's actor reference.
- "Operational": Here the TaskManager accepts and processes task messages, like
SubmitTask
, CancelTask
, FailTask
.
If the TaskManager disconnects from the JobManager (because the JobManager is no longer
reachable), the TaskManager gets back to the "waiting to be registered" state.
========== Failure model of the TaskManager ==========
The TaskManager tries to compensate for task failures as far as possible by marking the task as failed and removing all its resources. This causes the JobManager to restart the task (on this same TaskManager or on a different TaskManager).
In certain cases, exceptions indicate that the TaskManager is unable to proceed. The most robust way to clean up is letting the OS/kernel do it, so we will trigger killing the process. In case of YARN (or resilient standalone mode), the process will be restarted, producing a clean state. To achieve this, we kill the TaskManager actor. The watch dog actor (process reaper) will recognize that and kill the TaskManager process.
Fatal errors that require TaskManager JVM restart include:
- Errors bringing up the Network Stack or Library Cache after the TaskManager has registered at the JobManager. The TaskManager cannot operate without.
- Exceptions while releasing the task resources from the network stack, intermediate results, or memory manager. Those situations indicate a critical leak in the resource management, which can only be reliably fixed through a JVM restart.
- Exceptions releasing intermediate result resources. Critical resource leak, requires a clean JVM.
Constructor and Description |
---|
TaskManager(TaskManagerConfiguration config,
InstanceConnectionInfo connectionInfo,
MemoryManager memoryManager,
IOManager ioManager,
NetworkEnvironment network,
int numberOfSlots,
LeaderRetrievalService leaderRetrievalService) |
Modifier and Type | Method and Description |
---|---|
protected akka.util.Timeout |
askTimeout()
The timeout for all actor ask futures
|
protected BroadcastVariableManager |
bcVarManager()
Handler for shared broadcast variables (shared between multiple Tasks)
|
protected TaskManagerConfiguration |
config() |
protected InstanceConnectionInfo |
connectionInfo() |
protected scala.Option<akka.actor.ActorRef> |
currentJobManager() |
static scala.concurrent.duration.FiniteDuration |
DELAY_AFTER_REFUSED_REGISTRATION() |
protected FileCache |
fileCache()
Handler for distributed files cached by this TaskManager
|
static scala.Tuple2<String,Object> |
getAndCheckJobManagerAddress(Configuration configuration)
Gets the hostname and port of the JobManager from the configuration.
|
static akka.actor.ActorRef |
getTaskManagerRemoteReference(String taskManagerUrl,
akka.actor.ActorSystem system,
scala.concurrent.duration.FiniteDuration timeout)
Resolves the TaskManager actor reference in a blocking fashion.
|
void |
handleError(Exception exception)
This method is called by the
LeaderRetrievalService in case of an exception. |
protected void |
handleJobManagerDisconnect(akka.actor.ActorRef jobManager,
String msg) |
scala.PartialFunction<Object,scala.runtime.BoxedUnit> |
handleMessage()
Central handling of actor messages.
|
static scala.concurrent.duration.FiniteDuration |
HEARTBEAT_INTERVAL() |
static scala.concurrent.duration.FiniteDuration |
INITIAL_REGISTRATION_TIMEOUT() |
protected IOManager |
ioManager() |
protected boolean |
isConnected()
Checks whether the TaskManager is currently connected to its JobManager.
|
protected LeaderRetrievalService |
leaderRetrievalService() |
scala.Option<UUID> |
leaderSessionID() |
grizzled.slf4j.Logger |
log() |
static grizzled.slf4j.Logger |
LOG()
TaskManager logger for synchronous logging (not through the logging actor)
|
static void |
main(String[] args)
Entry point (main method) to run the TaskManager in a standalone fashion.
|
static scala.concurrent.duration.FiniteDuration |
MAX_REGISTRATION_TIMEOUT() |
static long |
MAX_STARTUP_CONNECT_TIME()
Maximum time (milli seconds) that the TaskManager will spend searching for a
suitable network interface to use for communication
|
protected MemoryManager |
memoryManager() |
protected NetworkEnvironment |
network() |
void |
notifyLeaderAddress(String leaderAddress,
UUID leaderSessionID)
This method is called by the
LeaderRetrievalService when a new leader is elected. |
protected int |
numberOfSlots() |
static Configuration |
parseArgsAndLoadConfig(String[] args)
Parse the command line arguments of the TaskManager and loads the configuration.
|
static scala.Tuple4<TaskManagerConfiguration,NetworkEnvironmentConfiguration,InstanceConnectionInfo,MemoryType> |
parseTaskManagerConfiguration(Configuration configuration,
String taskManagerHostname,
boolean localTaskManagerCommunication)
Utility method to extract TaskManager config parameters from the configuration and to
sanity check them.
|
void |
postStop()
Called after the actor is stopped.
|
void |
preStart()
Called prior to the actor receiving any messages.
|
protected HardwareDescription |
resources()
The TaskManager's physical execution resources
|
protected HashMap<ExecutionAttemptID,Task> |
runningTasks()
Registry of all tasks currently executed by this TaskManager
|
static void |
runTaskManager(String taskManagerHostname,
int actorSystemPort,
Configuration configuration)
Starts and runs the TaskManager.
|
static void |
runTaskManager(String taskManagerHostname,
int actorSystemPort,
Configuration configuration,
Class<? extends TaskManager> taskManagerClass)
Starts and runs the TaskManager.
|
static int |
RUNTIME_FAILURE_RETURN_CODE()
Return code for critical errors during the runtime
|
static scala.Tuple2<String,Object> |
selectNetworkInterfaceAndPort(Configuration configuration) |
static void |
selectNetworkInterfaceAndRunTaskManager(Configuration configuration,
Class<? extends TaskManager> taskManagerClass)
Starts and runs the TaskManager.
|
protected void |
sendHeartbeatToJobManager()
Sends a heartbeat message to the JobManager (if connected) with the current
metrics report.
|
static akka.actor.ActorRef |
startTaskManagerComponentsAndActor(Configuration configuration,
akka.actor.ActorSystem actorSystem,
String taskManagerHostname,
scala.Option<String> taskManagerActorName,
scala.Option<LeaderRetrievalService> leaderRetrievalServiceOption,
boolean localTaskManagerCommunication,
Class<? extends TaskManager> taskManagerClass) |
static long |
STARTUP_CONNECT_LOG_SUPPRESS()
Time (milli seconds) after which the TaskManager will start logging failed
connection attempts
|
static int |
STARTUP_FAILURE_RETURN_CODE()
Return code for unsuccessful TaskManager startup
|
static String |
TASK_MANAGER_NAME()
The name of the TaskManager actor
|
void |
triggerTaskManagerRegistration()
Starts the TaskManager's registration process to connect to the JobManager.
|
void |
unhandled(Object message)
Handle unmatched messages with an exception.
|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
decorateMessage, handleDiscardedMessage, receive
receive
public TaskManager(TaskManagerConfiguration config, InstanceConnectionInfo connectionInfo, MemoryManager memoryManager, IOManager ioManager, NetworkEnvironment network, int numberOfSlots, LeaderRetrievalService leaderRetrievalService)
public static grizzled.slf4j.Logger LOG()
public static int STARTUP_FAILURE_RETURN_CODE()
public static int RUNTIME_FAILURE_RETURN_CODE()
public static String TASK_MANAGER_NAME()
public static long MAX_STARTUP_CONNECT_TIME()
public static long STARTUP_CONNECT_LOG_SUPPRESS()
public static scala.concurrent.duration.FiniteDuration INITIAL_REGISTRATION_TIMEOUT()
public static scala.concurrent.duration.FiniteDuration MAX_REGISTRATION_TIMEOUT()
public static scala.concurrent.duration.FiniteDuration DELAY_AFTER_REFUSED_REGISTRATION()
public static scala.concurrent.duration.FiniteDuration HEARTBEAT_INTERVAL()
public static void main(String[] args)
args
- The command line arguments.public static Configuration parseArgsAndLoadConfig(String[] args)
args
- Command line argumentspublic static void selectNetworkInterfaceAndRunTaskManager(Configuration configuration, Class<? extends TaskManager> taskManagerClass)
This method first tries to select the network interface to use for the TaskManager communication. The network interface is used both for the actor communication (coordination) as well as for the data exchange between task managers. Unless the hostname/interface is explicitly configured in the configuration, this method will try out various interfaces and methods to connect to the JobManager and select the one where the connection attempt is successful.
After selecting the network interface, this method brings up an actor system for the TaskManager and its actors, starts the TaskManager's services (library cache, shuffle network stack, ...), and starts the TaskManager itself.
configuration
- The configuration for the TaskManager.taskManagerClass
- The actor class to instantiate.
Allows to use TaskManager subclasses for example for YARN.public static scala.Tuple2<String,Object> selectNetworkInterfaceAndPort(Configuration configuration)
public static void runTaskManager(String taskManagerHostname, int actorSystemPort, Configuration configuration)
This method will also spawn a process reaper for the TaskManager (kill the process if the actor fails) and optionally start the JVM memory logging thread.
taskManagerHostname
- The hostname/address of the interface where the actor system
will communicate.actorSystemPort
- The port at which the actor system will communicate.configuration
- The configuration for the TaskManager.public static void runTaskManager(String taskManagerHostname, int actorSystemPort, Configuration configuration, Class<? extends TaskManager> taskManagerClass)
This method will also spawn a process reaper for the TaskManager (kill the process if the actor fails) and optionally start the JVM memory logging thread.
taskManagerHostname
- The hostname/address of the interface where the actor system
will communicate.actorSystemPort
- The port at which the actor system will communicate.configuration
- The configuration for the TaskManager.taskManagerClass
- The actor class to instantiate. Allows the use of TaskManager
subclasses for example for YARN.public static akka.actor.ActorRef startTaskManagerComponentsAndActor(Configuration configuration, akka.actor.ActorSystem actorSystem, String taskManagerHostname, scala.Option<String> taskManagerActorName, scala.Option<LeaderRetrievalService> leaderRetrievalServiceOption, boolean localTaskManagerCommunication, Class<? extends TaskManager> taskManagerClass)
configuration
- The configuration for the TaskManager.actorSystem
- The actor system that should run the TaskManager actor.taskManagerHostname
- The hostname/address that describes the TaskManager's data location.taskManagerActorName
- Optionally the name of the TaskManager actor. If none is given,
the actor will use a random name.leaderRetrievalServiceOption
- Optionally, a leader retrieval service can be provided. If
none is given, then a LeaderRetrievalService is
constructed from the configuration.localTaskManagerCommunication
- If true, the TaskManager will not initiate the
TCP network stack.taskManagerClass
- The class of the TaskManager actor. May be used to give
subclasses that understand additional actor messages.
IllegalConfigurationException
- Thrown, if the given config contains illegal values.
IOException
- Thrown, if any of the I/O components (such as buffer pools,
I/O manager, ...) cannot be properly started.Exception
- Thrown is some other error occurs while parsing the configuration
or starting the TaskManager components.
public static akka.actor.ActorRef getTaskManagerRemoteReference(String taskManagerUrl, akka.actor.ActorSystem system, scala.concurrent.duration.FiniteDuration timeout)
taskManagerUrl
- The akka URL of the JobManager.system
- The local actor system that should perform the lookup.timeout
- The maximum time to wait until the lookup fails.IOException
- Thrown, if the lookup fails.public static scala.Tuple4<TaskManagerConfiguration,NetworkEnvironmentConfiguration,InstanceConnectionInfo,MemoryType> parseTaskManagerConfiguration(Configuration configuration, String taskManagerHostname, boolean localTaskManagerCommunication)
configuration
- The configuration.taskManagerHostname
- The host name under which the TaskManager communicates.localTaskManagerCommunication
- True, to skip initializing the network stack.
Use only in cases where only one task manager runs.public static scala.Tuple2<String,Object> getAndCheckJobManagerAddress(Configuration configuration)
configuration
- The configuration to read the config values from.protected TaskManagerConfiguration config()
protected InstanceConnectionInfo connectionInfo()
protected MemoryManager memoryManager()
protected IOManager ioManager()
protected NetworkEnvironment network()
protected int numberOfSlots()
protected LeaderRetrievalService leaderRetrievalService()
public grizzled.slf4j.Logger log()
log
in interface FlinkActor
protected akka.util.Timeout askTimeout()
protected HardwareDescription resources()
protected HashMap<ExecutionAttemptID,Task> runningTasks()
protected BroadcastVariableManager bcVarManager()
protected FileCache fileCache()
protected scala.Option<akka.actor.ActorRef> currentJobManager()
public scala.Option<UUID> leaderSessionID()
leaderSessionID
in interface LeaderSessionMessageFilter
public void preStart()
preStart
in interface akka.actor.Actor
public void postStop()
postStop
in interface akka.actor.Actor
public scala.PartialFunction<Object,scala.runtime.BoxedUnit> handleMessage()
handleMessage
in interface FlinkActor
public void unhandled(Object message)
unhandled
in interface akka.actor.Actor
protected boolean isConnected()
protected void handleJobManagerDisconnect(akka.actor.ActorRef jobManager, String msg)
protected void sendHeartbeatToJobManager()
public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID)
LeaderRetrievalListener
LeaderRetrievalService
when a new leader is elected.notifyLeaderAddress
in interface LeaderRetrievalListener
leaderAddress
- The address of the new leaderleaderSessionID
- The new leader session IDpublic void triggerTaskManagerRegistration()
public void handleError(Exception exception)
LeaderRetrievalListener
LeaderRetrievalService
in case of an exception. This
assures that the LeaderRetrievalListener
is aware of any problems occurring in the
LeaderRetrievalService
thread.handleError
in interface LeaderRetrievalListener
Copyright © 2014–2017 The Apache Software Foundation. All rights reserved.