public class JobManager extends Object implements FlinkActor, LeaderSessionMessageFilter, LogMessages, LeaderContender, SubmittedJobGraphStore.SubmittedJobGraphListener
- RegisterTaskManager
is sent by a TaskManager which wants to register at the job manager.
A successful registration at the instance manager is acknowledged by AcknowledgeRegistration
- SubmitJob
is sent by a client which wants to submit a job to the system. The submit
message contains the job description in the form of the JobGraph. The JobGraph is appended to
the ExecutionGraph and the corresponding ExecutionJobVertices are scheduled for execution on
the TaskManagers.
- CancelJob
requests to cancel the job with the specified jobID. A successful cancellation
is indicated by CancellationSuccess
and a failure by CancellationFailure
- UpdateTaskExecutionState
is sent by a TaskManager to update the state of an
ExecutionVertex contained in the ExecutionGraph
.
A successful update is acknowledged by true and otherwise false.
- RequestNextInputSplit
requests the next input split for a running task on a
TaskManager
. The assigned input split or null is sent to the sender in the form of the
message NextInputSplit
.
- JobStatusChanged
indicates that the status of job (RUNNING, CANCELING, FINISHED, etc.) has
changed. This message is sent by the ExecutionGraph.
Constructor and Description |
---|
JobManager(Configuration flinkConfiguration,
ExecutorService executorService,
InstanceManager instanceManager,
Scheduler scheduler,
BlobLibraryCacheManager libraryCacheManager,
akka.actor.ActorRef archive,
RestartStrategy defaultRestartStrategy,
scala.concurrent.duration.FiniteDuration timeout,
LeaderElectionService leaderElectionService,
SubmittedJobGraphStore submittedJobGraphs,
CheckpointRecoveryFactory checkpointRecoveryFactory,
SavepointStore savepointStore,
scala.concurrent.duration.FiniteDuration jobRecoveryTimeout) |
Modifier and Type | Method and Description |
---|---|
static String |
ARCHIVE_NAME()
Name of the archive actor
|
protected akka.actor.ActorRef |
archive() |
protected CheckpointRecoveryFactory |
checkpointRecoveryFactory() |
static scala.Tuple12<ExecutorService,InstanceManager,Scheduler,BlobLibraryCacheManager,RestartStrategy,scala.concurrent.duration.FiniteDuration,Object,LeaderElectionService,SubmittedJobGraphStore,CheckpointRecoveryFactory,SavepointStore,scala.concurrent.duration.FiniteDuration> |
createJobManagerComponents(Configuration configuration,
scala.Option<LeaderElectionService> leaderElectionServiceOption)
Create the job manager components as (instanceManager, scheduler, libraryCacheManager,
archiverProps, defaultExecutionRetries,
delayBetweenRetries, timeout)
|
protected scala.collection.mutable.HashMap<JobID,scala.Tuple2<ExecutionGraph,JobInfo>> |
currentJobs()
Either running or not yet archived jobs (session hasn't been ended).
|
protected RestartStrategy |
defaultRestartStrategy() |
protected scala.concurrent.ExecutionContext |
executionContext()
The extra execution context, for futures, with a custom logging reporter
|
protected ExecutorService |
executorService() |
protected Configuration |
flinkConfiguration() |
scala.Option<scala.collection.Seq<scala.concurrent.Future<scala.runtime.BoxedUnit>>> |
futuresToComplete()
Futures which have to be completed before terminating the job manager
|
String |
getAddress()
Returns the address of the
LeaderContender under which other instances can connect
to it. |
static akka.actor.ActorRef |
getJobManagerActorRef(InetSocketAddress address,
akka.actor.ActorSystem system,
Configuration config)
Resolves the JobManager actor reference in a blocking fashion.
|
static akka.actor.ActorRef |
getJobManagerActorRef(InetSocketAddress address,
akka.actor.ActorSystem system,
scala.concurrent.duration.FiniteDuration timeout)
Resolves the JobManager actor reference in a blocking fashion.
|
static akka.actor.ActorRef |
getJobManagerActorRef(String jobManagerUrl,
akka.actor.ActorSystem system,
scala.concurrent.duration.FiniteDuration timeout)
Resolves the JobManager actor reference in a blocking fashion.
|
static scala.concurrent.Future<akka.actor.ActorRef> |
getJobManagerActorRefFuture(InetSocketAddress address,
akka.actor.ActorSystem system,
scala.concurrent.duration.FiniteDuration timeout) |
static String |
getJobManagerAkkaURL(akka.actor.ActorSystem system,
scala.Option<String> name) |
static String |
getLocalJobManagerAkkaURL(scala.Option<String> name)
Builds the akka actor path for the JobManager actor to address the actor within
its own actor system.
|
static String |
getRemoteJobManagerAkkaURL(Configuration config)
Returns the JobManager actor's remote Akka URL, given the configured hostname and port.
|
static String |
getRemoteJobManagerAkkaURL(InetSocketAddress address,
scala.Option<String> name)
Builds the akka actor path for the JobManager actor, given the socket address
where the JobManager's actor system runs.
|
void |
grantLeadership(UUID newLeaderSessionID)
Callback method which is called by the
LeaderElectionService upon selecting this
instance as the new leader. |
void |
handleError(Exception exception)
Handles error occurring in the leader election service
|
scala.PartialFunction<Object,scala.runtime.BoxedUnit> |
handleMessage()
Central work method of the JobManager actor.
|
protected InstanceManager |
instanceManager() |
static String |
JOB_MANAGER_NAME()
Name of the JobManager actor
|
protected scala.concurrent.duration.FiniteDuration |
jobRecoveryTimeout() |
protected LeaderElectionService |
leaderElectionService() |
scala.Option<UUID> |
leaderSessionID() |
protected BlobLibraryCacheManager |
libraryCacheManager() |
grizzled.slf4j.Logger |
log() |
static grizzled.slf4j.Logger |
LOG() |
static void |
main(String[] args)
Entry point (main method) to run the JobManager in a standalone fashion.
|
void |
onAddedJobGraph(JobID jobId)
Callback for
SubmittedJobGraph instances added by a different SubmittedJobGraphStore instance. |
void |
onRemovedJobGraph(JobID jobId)
Callback for
SubmittedJobGraph instances removed by a different SubmittedJobGraphStore instance. |
static scala.Tuple4<Configuration,JobManagerMode,String,Iterator<Integer>> |
parseArgs(String[] args)
Loads the configuration, execution mode and the listening address from the provided command
line arguments.
|
void |
postStop() |
void |
preStart()
Run when the job manager is started.
|
protected RecoveryMode |
recoveryMode() |
static <T> scala.util.Try<T> |
retryOnBindException(scala.Function0<T> fn,
scala.Function0<Object> stopCond,
long maxSleepBetweenRetries)
Retries a function if it fails because of a
BindException . |
void |
revokeLeadership()
Callback method which is called by the
LeaderElectionService upon revoking the
leadership of a former leader. |
static void |
runJobManager(Configuration configuration,
JobManagerMode executionMode,
String listeningAddress,
int listeningPort)
Starts and runs the JobManager with all its components.
|
static void |
runJobManager(Configuration configuration,
JobManagerMode executionMode,
String listeningAddress,
Iterator<Integer> listeningPortRange)
Starts and runs the JobManager with all its components trying to bind to
a port in the specified range.
|
static int |
RUNTIME_FAILURE_RETURN_CODE() |
protected SavepointStore |
savepointStore() |
protected Scheduler |
scheduler() |
static akka.actor.ActorRef |
startActor(akka.actor.Props props,
akka.actor.ActorSystem actorSystem) |
static scala.Tuple4<akka.actor.ActorSystem,akka.actor.ActorRef,akka.actor.ActorRef,scala.Option<WebMonitor>> |
startActorSystemAndJobManagerActors(Configuration configuration,
JobManagerMode executionMode,
String listeningAddress,
int listeningPort,
Class<? extends JobManager> jobManagerClass,
Class<? extends MemoryArchivist> archiveClass)
Starts an ActorSystem, the JobManager and all its components including the WebMonitor.
|
static scala.Tuple2<akka.actor.ActorRef,akka.actor.ActorRef> |
startJobManagerActors(Configuration configuration,
akka.actor.ActorSystem actorSystem,
Class<? extends JobManager> jobManagerClass,
Class<? extends MemoryArchivist> archiveClass)
Starts the JobManager and job archiver based on the given configuration, in the
given actor system.
|
static scala.Tuple2<akka.actor.ActorRef,akka.actor.ActorRef> |
startJobManagerActors(Configuration configuration,
akka.actor.ActorSystem actorSystem,
scala.Option<String> jobMangerActorName,
scala.Option<String> archiveActorName,
Class<? extends JobManager> jobManagerClass,
Class<? extends MemoryArchivist> archiveClass)
Starts the JobManager and job archiver based on the given configuration, in the
given actor system.
|
static int |
STARTUP_FAILURE_RETURN_CODE() |
protected SubmittedJobGraphStore |
submittedJobGraphs() |
protected scala.concurrent.duration.FiniteDuration |
timeout() |
void |
unhandled(Object message)
Handle unmatched messages with an exception.
|
int |
webMonitorPort()
The port of the web monitor as configured.
|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
decorateMessage, handleDiscardedMessage, receive
receive
public JobManager(Configuration flinkConfiguration, ExecutorService executorService, InstanceManager instanceManager, Scheduler scheduler, BlobLibraryCacheManager libraryCacheManager, akka.actor.ActorRef archive, RestartStrategy defaultRestartStrategy, scala.concurrent.duration.FiniteDuration timeout, LeaderElectionService leaderElectionService, SubmittedJobGraphStore submittedJobGraphs, CheckpointRecoveryFactory checkpointRecoveryFactory, SavepointStore savepointStore, scala.concurrent.duration.FiniteDuration jobRecoveryTimeout)
public static grizzled.slf4j.Logger LOG()
public static int STARTUP_FAILURE_RETURN_CODE()
public static int RUNTIME_FAILURE_RETURN_CODE()
public static String JOB_MANAGER_NAME()
public static String ARCHIVE_NAME()
public static void main(String[] args)
args
- The command line arguments.public static void runJobManager(Configuration configuration, JobManagerMode executionMode, String listeningAddress, int listeningPort)
This method blocks indefinitely (or until the JobManager's actor system is shut down).
configuration
- The configuration object for the JobManager.executionMode
- The execution mode in which to run. Execution mode LOCAL will spawn an
an additional TaskManager in the same process.listeningAddress
- The hostname where the JobManager should listen for messages.listeningPort
- The port where the JobManager should listen for messages.public static void runJobManager(Configuration configuration, JobManagerMode executionMode, String listeningAddress, Iterator<Integer> listeningPortRange)
configuration
- The configuration object for the JobManager.executionMode
- The execution mode in which to run. Execution mode LOCAL will spawn an
an additional TaskManager in the same process.listeningAddress
- The hostname where the JobManager should listen for messages.listeningPortRange
- The port range where the JobManager should listen for messages.public static <T> scala.util.Try<T> retryOnBindException(scala.Function0<T> fn, scala.Function0<Object> stopCond, long maxSleepBetweenRetries)
BindException
.
fn
- The function to retrystopCond
- Flag to signal terminationmaxSleepBetweenRetries
- Max random sleep time between retriespublic static scala.Tuple4<akka.actor.ActorSystem,akka.actor.ActorRef,akka.actor.ActorRef,scala.Option<WebMonitor>> startActorSystemAndJobManagerActors(Configuration configuration, JobManagerMode executionMode, String listeningAddress, int listeningPort, Class<? extends JobManager> jobManagerClass, Class<? extends MemoryArchivist> archiveClass)
configuration
- The configuration object for the JobManagerexecutionMode
- The execution mode in which to run. Execution mode LOCAL with spawn an
additional TaskManager in the same process.listeningAddress
- The hostname where the JobManager should listen for messages.listeningPort
- The port where the JobManager should listen for messagesjobManagerClass
- The class of the JobManager to be startedarchiveClass
- The class of the Archivist to be startedpublic static scala.Tuple4<Configuration,JobManagerMode,String,Iterator<Integer>> parseArgs(String[] args)
args
- command line argumentspublic static scala.Tuple12<ExecutorService,InstanceManager,Scheduler,BlobLibraryCacheManager,RestartStrategy,scala.concurrent.duration.FiniteDuration,Object,LeaderElectionService,SubmittedJobGraphStore,CheckpointRecoveryFactory,SavepointStore,scala.concurrent.duration.FiniteDuration> createJobManagerComponents(Configuration configuration, scala.Option<LeaderElectionService> leaderElectionServiceOption)
configuration
- The configuration from which to parse the config values.leaderElectionServiceOption
- LeaderElectionService which shall be returned if the option
is definedpublic static scala.Tuple2<akka.actor.ActorRef,akka.actor.ActorRef> startJobManagerActors(Configuration configuration, akka.actor.ActorSystem actorSystem, Class<? extends JobManager> jobManagerClass, Class<? extends MemoryArchivist> archiveClass)
configuration
- The configuration for the JobManageractorSystem
- The actor system running the JobManagerjobManagerClass
- The class of the JobManager to be startedarchiveClass
- The class of the MemoryArchivist to be startedpublic static scala.Tuple2<akka.actor.ActorRef,akka.actor.ActorRef> startJobManagerActors(Configuration configuration, akka.actor.ActorSystem actorSystem, scala.Option<String> jobMangerActorName, scala.Option<String> archiveActorName, Class<? extends JobManager> jobManagerClass, Class<? extends MemoryArchivist> archiveClass)
configuration
- The configuration for the JobManageractorSystem
- The actor system running the JobManagerjobMangerActorName
- Optionally the name of the JobManager actor. If none is given,
the actor will have the name generated by the actor system.archiveActorName
- Optionally the name of the archive actor. If none is given,
the actor will have the name generated by the actor system.jobManagerClass
- The class of the JobManager to be startedarchiveClass
- The class of the MemoryArchivist to be startedpublic static akka.actor.ActorRef startActor(akka.actor.Props props, akka.actor.ActorSystem actorSystem)
public static String getRemoteJobManagerAkkaURL(InetSocketAddress address, scala.Option<String> name)
address
- The address of the JobManager's actor system.public static String getRemoteJobManagerAkkaURL(Configuration config)
config
- The configuration to parsepublic static String getLocalJobManagerAkkaURL(scala.Option<String> name)
public static String getJobManagerAkkaURL(akka.actor.ActorSystem system, scala.Option<String> name)
public static scala.concurrent.Future<akka.actor.ActorRef> getJobManagerActorRefFuture(InetSocketAddress address, akka.actor.ActorSystem system, scala.concurrent.duration.FiniteDuration timeout)
public static akka.actor.ActorRef getJobManagerActorRef(String jobManagerUrl, akka.actor.ActorSystem system, scala.concurrent.duration.FiniteDuration timeout)
jobManagerUrl
- The akka URL of the JobManager.system
- The local actor system that should perform the lookup.timeout
- The maximum time to wait until the lookup fails.IOException
- Thrown, if the lookup fails.public static akka.actor.ActorRef getJobManagerActorRef(InetSocketAddress address, akka.actor.ActorSystem system, scala.concurrent.duration.FiniteDuration timeout)
address
- The socket address of the JobManager's actor system.system
- The local actor system that should perform the lookup.timeout
- The maximum time to wait until the lookup fails.IOException
- Thrown, if the lookup fails.public static akka.actor.ActorRef getJobManagerActorRef(InetSocketAddress address, akka.actor.ActorSystem system, Configuration config)
address
- The socket address of the JobManager's actor system.system
- The local actor system that should perform the lookup.config
- The config describing the maximum time to wait until the lookup fails.IOException
- Thrown, if the lookup fails.protected Configuration flinkConfiguration()
protected ExecutorService executorService()
protected InstanceManager instanceManager()
protected Scheduler scheduler()
protected BlobLibraryCacheManager libraryCacheManager()
protected akka.actor.ActorRef archive()
protected RestartStrategy defaultRestartStrategy()
protected scala.concurrent.duration.FiniteDuration timeout()
protected LeaderElectionService leaderElectionService()
protected SubmittedJobGraphStore submittedJobGraphs()
protected CheckpointRecoveryFactory checkpointRecoveryFactory()
protected SavepointStore savepointStore()
protected scala.concurrent.duration.FiniteDuration jobRecoveryTimeout()
public grizzled.slf4j.Logger log()
log
in interface FlinkActor
protected scala.concurrent.ExecutionContext executionContext()
protected scala.collection.mutable.HashMap<JobID,scala.Tuple2<ExecutionGraph,JobInfo>> currentJobs()
protected RecoveryMode recoveryMode()
public scala.Option<UUID> leaderSessionID()
leaderSessionID
in interface LeaderSessionMessageFilter
public scala.Option<scala.collection.Seq<scala.concurrent.Future<scala.runtime.BoxedUnit>>> futuresToComplete()
public int webMonitorPort()
public void preStart()
preStart
in interface akka.actor.Actor
public void postStop()
postStop
in interface akka.actor.Actor
public scala.PartialFunction<Object,scala.runtime.BoxedUnit> handleMessage()
handleMessage
in interface FlinkActor
public void unhandled(Object message)
unhandled
in interface akka.actor.Actor
public void grantLeadership(UUID newLeaderSessionID)
LeaderContender
LeaderElectionService
upon selecting this
instance as the new leader. The method is called with the new leader session ID.grantLeadership
in interface LeaderContender
newLeaderSessionID
- New leader session IDpublic void revokeLeadership()
LeaderContender
LeaderElectionService
upon revoking the
leadership of a former leader. This might happen in case that multiple contenders have
been granted leadership.revokeLeadership
in interface LeaderContender
public void onAddedJobGraph(JobID jobId)
SubmittedJobGraphStore.SubmittedJobGraphListener
SubmittedJobGraph
instances added by a different SubmittedJobGraphStore
instance.
Important: It is possible to get false positives and be notified about a job graph, which was added by this instance.
onAddedJobGraph
in interface SubmittedJobGraphStore.SubmittedJobGraphListener
jobId
- The JobID
of the added job graphpublic void onRemovedJobGraph(JobID jobId)
SubmittedJobGraphStore.SubmittedJobGraphListener
SubmittedJobGraph
instances removed by a different SubmittedJobGraphStore
instance.onRemovedJobGraph
in interface SubmittedJobGraphStore.SubmittedJobGraphListener
jobId
- The JobID
of the removed job graphpublic String getAddress()
LeaderContender
LeaderContender
under which other instances can connect
to it.getAddress
in interface LeaderContender
public void handleError(Exception exception)
handleError
in interface LeaderContender
exception
- Exception being thrown in the leader election serviceCopyright © 2014–2017 The Apache Software Foundation. All rights reserved.