public class JobManager extends Object implements FlinkActor, LeaderSessionMessageFilter, LogMessages, LeaderContender, SubmittedJobGraphStore.SubmittedJobGraphListener
- RegisterTaskManager
is sent by a TaskManager which wants to register at the job manager.
A successful registration at the instance manager is acknowledged by AcknowledgeRegistration
- SubmitJob
is sent by a client which wants to submit a job to the system. The submit
message contains the job description in the form of the JobGraph. The JobGraph is appended to
the ExecutionGraph and the corresponding ExecutionJobVertices are scheduled for execution on
the TaskManagers.
- CancelJob
requests to cancel the job with the specified jobID. A successful cancellation
is indicated by CancellationSuccess
and a failure by CancellationFailure
- UpdateTaskExecutionState
is sent by a TaskManager to update the state of an
ExecutionVertex contained in the ExecutionGraph
.
A successful update is acknowledged by true and otherwise false.
- RequestNextInputSplit
requests the next input split for a running task on a
TaskManager
. The assigned input split or null is sent to the sender in the form of the
message NextInputSplit
.
- JobStatusChanged
indicates that the status of job (RUNNING, CANCELING, FINISHED, etc.) has
changed. This message is sent by the ExecutionGraph.
Constructor and Description |
---|
JobManager(Configuration flinkConfiguration,
ScheduledExecutorService futureExecutor,
Executor ioExecutor,
InstanceManager instanceManager,
Scheduler scheduler,
BlobLibraryCacheManager libraryCacheManager,
akka.actor.ActorRef archive,
RestartStrategyFactory restartStrategyFactory,
scala.concurrent.duration.FiniteDuration timeout,
LeaderElectionService leaderElectionService,
SubmittedJobGraphStore submittedJobGraphs,
CheckpointRecoveryFactory checkpointRecoveryFactory,
scala.concurrent.duration.FiniteDuration jobRecoveryTimeout,
scala.Option<MetricRegistry> metricsRegistry) |
Modifier and Type | Method and Description |
---|---|
protected akka.actor.ActorRef |
archive() |
protected static void |
aroundPostRestart(Throwable reason) |
protected static void |
aroundPostStop() |
protected static void |
aroundPreRestart(Throwable reason,
scala.Option<Object> message) |
protected static void |
aroundPreStart() |
protected static void |
aroundReceive(scala.PartialFunction<Object,scala.runtime.BoxedUnit> receive,
Object msg) |
protected CheckpointRecoveryFactory |
checkpointRecoveryFactory() |
static akka.actor.ActorContext |
context() |
static scala.Tuple9<InstanceManager,Scheduler,BlobLibraryCacheManager,RestartStrategyFactory,scala.concurrent.duration.FiniteDuration,Object,scala.Option<Path>,scala.concurrent.duration.FiniteDuration,scala.Option<MetricRegistry>> |
createJobManagerComponents(Configuration configuration,
ScheduledExecutorService futureExecutor,
Executor ioExecutor,
BlobStore blobStore)
Create the job manager components as (instanceManager, scheduler, libraryCacheManager,
archiverProps, defaultExecutionRetries,
delayBetweenRetries, timeout)
|
protected scala.collection.mutable.HashMap<JobID,scala.Tuple2<ExecutionGraph,JobInfo>> |
currentJobs()
Either running or not yet archived jobs (session hasn't been ended).
|
scala.Option<akka.actor.ActorRef> |
currentResourceManager()
The resource manager actor responsible for allocating and managing task manager resources.
|
long |
currentResourceManagerConnectionId() |
static Object |
decorateMessage(Object message) |
String |
defaultSavepointDir()
The default directory for savepoints.
|
protected Configuration |
flinkConfiguration() |
protected ScheduledExecutorService |
futureExecutor() |
scala.Option<scala.collection.Seq<scala.concurrent.Future<scala.runtime.BoxedUnit>>> |
futuresToComplete()
Futures which have to be completed before terminating the job manager
|
String |
getAddress()
Returns the address of the
LeaderContender under which other instances can connect
to it. |
static akka.actor.Props |
getArchiveProps(Class<? extends MemoryArchivist> archiveClass,
int archiveCount,
scala.Option<Path> archivePath) |
static akka.actor.Props |
getJobManagerProps(Class<? extends JobManager> jobManagerClass,
Configuration configuration,
ScheduledExecutorService futureExecutor,
Executor ioExecutor,
InstanceManager instanceManager,
Scheduler scheduler,
LibraryCacheManager libraryCacheManager,
akka.actor.ActorRef archive,
RestartStrategyFactory restartStrategyFactory,
scala.concurrent.duration.FiniteDuration timeout,
LeaderElectionService leaderElectionService,
SubmittedJobGraphStore submittedJobGraphStore,
CheckpointRecoveryFactory checkpointRecoveryFactory,
scala.concurrent.duration.FiniteDuration jobRecoveryTimeout,
scala.Option<MetricRegistry> metricsRegistry) |
void |
grantLeadership(UUID newLeaderSessionID)
Callback method which is called by the
LeaderElectionService upon selecting this
instance as the new leader. |
protected HighAvailabilityMode |
haMode() |
void |
handleError(Exception exception)
Handles error occurring in the leader election service
|
scala.PartialFunction<Object,scala.runtime.BoxedUnit> |
handleMessage()
Central work method of the JobManager actor.
|
protected InstanceManager |
instanceManager() |
protected Executor |
ioExecutor() |
protected scala.Option<JobManagerMetricGroup> |
jobManagerMetricGroup() |
protected scala.concurrent.duration.FiniteDuration |
jobRecoveryTimeout() |
protected LeaderElectionService |
leaderElectionService() |
scala.Option<UUID> |
leaderSessionID() |
protected BlobLibraryCacheManager |
libraryCacheManager() |
grizzled.slf4j.Logger |
log() |
static grizzled.slf4j.Logger |
LOG() |
static void |
main(String[] args)
Entry point (main method) to run the JobManager in a standalone fashion.
|
protected scala.Option<MetricRegistry> |
metricsRegistry() |
void |
onAddedJobGraph(JobID jobId)
Callback for
SubmittedJobGraph instances added by a different SubmittedJobGraphStore instance. |
void |
onRemovedJobGraph(JobID jobId)
Callback for
SubmittedJobGraph instances removed by a different SubmittedJobGraphStore instance. |
static scala.Tuple4<Configuration,JobManagerMode,String,Iterator<Integer>> |
parseArgs(String[] args)
Loads the configuration, execution mode and the listening address from the provided command
line arguments.
|
static void |
postRestart(Throwable reason) |
void |
postStop() |
static void |
preRestart(Throwable reason,
scala.Option<Object> message) |
void |
preStart()
Run when the job manager is started.
|
static scala.PartialFunction<Object,scala.runtime.BoxedUnit> |
receive() |
protected RestartStrategyFactory |
restartStrategyFactory() |
static <T> scala.util.Try<T> |
retryOnBindException(scala.Function0<T> fn,
scala.Function0<Object> stopCond,
long maxSleepBetweenRetries)
Retries a function if it fails because of a
BindException . |
void |
revokeLeadership()
Callback method which is called by the
LeaderElectionService upon revoking the
leadership of a former leader. |
static void |
runJobManager(Configuration configuration,
JobManagerMode executionMode,
String listeningAddress,
int listeningPort)
Starts and runs the JobManager with all its components.
|
static void |
runJobManager(Configuration configuration,
JobManagerMode executionMode,
String listeningAddress,
Iterator<Integer> listeningPortRange)
Starts and runs the JobManager with all its components trying to bind to
a port in the specified range.
|
static int |
RUNTIME_FAILURE_RETURN_CODE() |
protected Scheduler |
scheduler() |
static akka.actor.ActorRef |
self() |
static akka.actor.ActorRef |
sender() |
protected void |
shutdown()
Shutdown method which may be overridden for testing.
|
static akka.actor.ActorSystem |
startActorSystem(Configuration configuration,
String externalHostname,
int port)
Starts the JobManager actor system.
|
static scala.Tuple4<akka.actor.ActorRef,akka.actor.ActorRef,scala.Option<WebMonitor>,scala.Option<akka.actor.ActorRef>> |
startJobManagerActors(akka.actor.ActorSystem jobManagerSystem,
Configuration configuration,
JobManagerMode executionMode,
String externalHostname,
ScheduledExecutorService futureExecutor,
Executor ioExecutor,
HighAvailabilityServices highAvailabilityServices,
Class<? extends JobManager> jobManagerClass,
Class<? extends MemoryArchivist> archiveClass,
scala.Option<Class<? extends FlinkResourceManager<?>>> resourceManagerClass)
Starts the JobManager and all its components including the WebMonitor.
|
static scala.Tuple2<akka.actor.ActorRef,akka.actor.ActorRef> |
startJobManagerActors(Configuration configuration,
akka.actor.ActorSystem actorSystem,
ScheduledExecutorService futureExecutor,
Executor ioExecutor,
HighAvailabilityServices highAvailabilityServices,
Class<? extends JobManager> jobManagerClass,
Class<? extends MemoryArchivist> archiveClass)
Starts the JobManager and job archiver based on the given configuration, in
the given actor system.
|
static scala.Tuple2<akka.actor.ActorRef,akka.actor.ActorRef> |
startJobManagerActors(Configuration configuration,
akka.actor.ActorSystem actorSystem,
ScheduledExecutorService futureExecutor,
Executor ioExecutor,
HighAvailabilityServices highAvailabilityServices,
scala.Option<String> jobManagerActorName,
scala.Option<String> archiveActorName,
Class<? extends JobManager> jobManagerClass,
Class<? extends MemoryArchivist> archiveClass)
Starts the JobManager and job archiver based on the given configuration, in the
given actor system.
|
static int |
STARTUP_FAILURE_RETURN_CODE() |
protected SubmittedJobGraphStore |
submittedJobGraphs() |
static akka.actor.SupervisorStrategy |
supervisorStrategy() |
scala.collection.mutable.Map<akka.actor.ActorRef,InstanceID> |
taskManagerMap() |
protected scala.concurrent.duration.FiniteDuration |
timeout() |
scala.concurrent.duration.FiniteDuration |
triggerResourceManagerReconnectInterval() |
void |
unhandled(Object message)
Handle unmatched messages with an exception.
|
int |
webMonitorPort()
The port of the web monitor as configured.
|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
decorateMessage, handleDiscardedMessage, handleNoLeaderId, receive
receive
public JobManager(Configuration flinkConfiguration, ScheduledExecutorService futureExecutor, Executor ioExecutor, InstanceManager instanceManager, Scheduler scheduler, BlobLibraryCacheManager libraryCacheManager, akka.actor.ActorRef archive, RestartStrategyFactory restartStrategyFactory, scala.concurrent.duration.FiniteDuration timeout, LeaderElectionService leaderElectionService, SubmittedJobGraphStore submittedJobGraphs, CheckpointRecoveryFactory checkpointRecoveryFactory, scala.concurrent.duration.FiniteDuration jobRecoveryTimeout, scala.Option<MetricRegistry> metricsRegistry)
public static grizzled.slf4j.Logger LOG()
public static int STARTUP_FAILURE_RETURN_CODE()
public static int RUNTIME_FAILURE_RETURN_CODE()
public static void main(String[] args)
args
- The command line arguments.public static void runJobManager(Configuration configuration, JobManagerMode executionMode, String listeningAddress, int listeningPort)
This method blocks indefinitely (or until the JobManager's actor system is shut down).
configuration
- The configuration object for the JobManager.executionMode
- The execution mode in which to run. Execution mode LOCAL will spawn an
an additional TaskManager in the same process.listeningAddress
- The hostname where the JobManager should listen for messages.listeningPort
- The port where the JobManager should listen for messages.public static void runJobManager(Configuration configuration, JobManagerMode executionMode, String listeningAddress, Iterator<Integer> listeningPortRange)
configuration
- The configuration object for the JobManager.executionMode
- The execution mode in which to run. Execution mode LOCAL will spawn an
an additional TaskManager in the same process.listeningAddress
- The hostname where the JobManager should listen for messages.listeningPortRange
- The port range where the JobManager should listen for messages.public static <T> scala.util.Try<T> retryOnBindException(scala.Function0<T> fn, scala.Function0<Object> stopCond, long maxSleepBetweenRetries)
BindException
.
fn
- The function to retrystopCond
- Flag to signal terminationmaxSleepBetweenRetries
- Max random sleep time between retriespublic static akka.actor.ActorSystem startActorSystem(Configuration configuration, String externalHostname, int port)
configuration
- Configuration to use for the job manager actor systemexternalHostname
- External hostname to bind toport
- Port to bind topublic static scala.Tuple4<akka.actor.ActorRef,akka.actor.ActorRef,scala.Option<WebMonitor>,scala.Option<akka.actor.ActorRef>> startJobManagerActors(akka.actor.ActorSystem jobManagerSystem, Configuration configuration, JobManagerMode executionMode, String externalHostname, ScheduledExecutorService futureExecutor, Executor ioExecutor, HighAvailabilityServices highAvailabilityServices, Class<? extends JobManager> jobManagerClass, Class<? extends MemoryArchivist> archiveClass, scala.Option<Class<? extends FlinkResourceManager<?>>> resourceManagerClass)
configuration
- The configuration object for the JobManagerexecutionMode
- The execution mode in which to run. Execution mode LOCAL with spawn an
additional TaskManager in the same process.externalHostname
- The hostname where the JobManager is reachable for rpc communicationfutureExecutor
- to run the JobManager's futuresioExecutor
- to run blocking io operationshighAvailabilityServices
- to instantiate high availability servicesjobManagerClass
- The class of the JobManager to be startedarchiveClass
- The class of the Archivist to be startedresourceManagerClass
- Optional class of resource manager if one should be startedjobManagerSystem
- (undocumented)public static scala.Tuple4<Configuration,JobManagerMode,String,Iterator<Integer>> parseArgs(String[] args)
args
- command line argumentspublic static scala.Tuple9<InstanceManager,Scheduler,BlobLibraryCacheManager,RestartStrategyFactory,scala.concurrent.duration.FiniteDuration,Object,scala.Option<Path>,scala.concurrent.duration.FiniteDuration,scala.Option<MetricRegistry>> createJobManagerComponents(Configuration configuration, ScheduledExecutorService futureExecutor, Executor ioExecutor, BlobStore blobStore)
configuration
- The configuration from which to parse the config values.futureExecutor
- to run JobManager's futuresioExecutor
- to run blocking io operationsblobStore
- to store blobs persistentlypublic static scala.Tuple2<akka.actor.ActorRef,akka.actor.ActorRef> startJobManagerActors(Configuration configuration, akka.actor.ActorSystem actorSystem, ScheduledExecutorService futureExecutor, Executor ioExecutor, HighAvailabilityServices highAvailabilityServices, Class<? extends JobManager> jobManagerClass, Class<? extends MemoryArchivist> archiveClass)
configuration
- The configuration for the JobManageractorSystem
- The actor system running the JobManagerfutureExecutor
- to run JobManager's futuresioExecutor
- to run blocking io operationsjobManagerClass
- The class of the JobManager to be startedarchiveClass
- The class of the MemoryArchivist to be startedhighAvailabilityServices
- (undocumented)public static scala.Tuple2<akka.actor.ActorRef,akka.actor.ActorRef> startJobManagerActors(Configuration configuration, akka.actor.ActorSystem actorSystem, ScheduledExecutorService futureExecutor, Executor ioExecutor, HighAvailabilityServices highAvailabilityServices, scala.Option<String> jobManagerActorName, scala.Option<String> archiveActorName, Class<? extends JobManager> jobManagerClass, Class<? extends MemoryArchivist> archiveClass)
configuration
- The configuration for the JobManageractorSystem
- The actor system running the JobManagerfutureExecutor
- to run JobManager's futuresioExecutor
- to run blocking io operationsjobManagerActorName
- Optionally the name of the JobManager actor. If none is given,
the actor will have the name generated by the actor system.archiveActorName
- Optionally the name of the archive actor. If none is given,
the actor will have the name generated by the actor system.jobManagerClass
- The class of the JobManager to be startedarchiveClass
- The class of the MemoryArchivist to be startedhighAvailabilityServices
- (undocumented)public static akka.actor.Props getArchiveProps(Class<? extends MemoryArchivist> archiveClass, int archiveCount, scala.Option<Path> archivePath)
public static akka.actor.Props getJobManagerProps(Class<? extends JobManager> jobManagerClass, Configuration configuration, ScheduledExecutorService futureExecutor, Executor ioExecutor, InstanceManager instanceManager, Scheduler scheduler, LibraryCacheManager libraryCacheManager, akka.actor.ActorRef archive, RestartStrategyFactory restartStrategyFactory, scala.concurrent.duration.FiniteDuration timeout, LeaderElectionService leaderElectionService, SubmittedJobGraphStore submittedJobGraphStore, CheckpointRecoveryFactory checkpointRecoveryFactory, scala.concurrent.duration.FiniteDuration jobRecoveryTimeout, scala.Option<MetricRegistry> metricsRegistry)
public static akka.actor.ActorContext context()
public static final akka.actor.ActorRef self()
public static final akka.actor.ActorRef sender()
protected static void aroundReceive(scala.PartialFunction<Object,scala.runtime.BoxedUnit> receive, Object msg)
protected static void aroundPreStart()
protected static void aroundPostStop()
protected static void aroundPreRestart(Throwable reason, scala.Option<Object> message)
protected static void aroundPostRestart(Throwable reason)
public static akka.actor.SupervisorStrategy supervisorStrategy()
public static void preRestart(Throwable reason, scala.Option<Object> message) throws Exception
Exception
public static scala.PartialFunction<Object,scala.runtime.BoxedUnit> receive()
protected Configuration flinkConfiguration()
protected ScheduledExecutorService futureExecutor()
protected Executor ioExecutor()
protected InstanceManager instanceManager()
protected Scheduler scheduler()
protected BlobLibraryCacheManager libraryCacheManager()
protected akka.actor.ActorRef archive()
protected RestartStrategyFactory restartStrategyFactory()
protected scala.concurrent.duration.FiniteDuration timeout()
protected LeaderElectionService leaderElectionService()
protected SubmittedJobGraphStore submittedJobGraphs()
protected CheckpointRecoveryFactory checkpointRecoveryFactory()
protected scala.concurrent.duration.FiniteDuration jobRecoveryTimeout()
protected scala.Option<MetricRegistry> metricsRegistry()
public grizzled.slf4j.Logger log()
log
in interface FlinkActor
protected scala.collection.mutable.HashMap<JobID,scala.Tuple2<ExecutionGraph,JobInfo>> currentJobs()
protected HighAvailabilityMode haMode()
public scala.Option<UUID> leaderSessionID()
leaderSessionID
in interface LeaderSessionMessageFilter
protected scala.Option<JobManagerMetricGroup> jobManagerMetricGroup()
public scala.Option<scala.collection.Seq<scala.concurrent.Future<scala.runtime.BoxedUnit>>> futuresToComplete()
public int webMonitorPort()
public String defaultSavepointDir()
public scala.Option<akka.actor.ActorRef> currentResourceManager()
public long currentResourceManagerConnectionId()
public scala.collection.mutable.Map<akka.actor.ActorRef,InstanceID> taskManagerMap()
public scala.concurrent.duration.FiniteDuration triggerResourceManagerReconnectInterval()
public void preStart()
preStart
in interface akka.actor.Actor
public void postStop()
postStop
in interface akka.actor.Actor
public scala.PartialFunction<Object,scala.runtime.BoxedUnit> handleMessage()
handleMessage
in interface FlinkActor
public void unhandled(Object message)
unhandled
in interface akka.actor.Actor
message
- (undocumented)public void grantLeadership(UUID newLeaderSessionID)
LeaderContender
LeaderElectionService
upon selecting this
instance as the new leader. The method is called with the new leader session ID.grantLeadership
in interface LeaderContender
newLeaderSessionID
- New leader session IDpublic void revokeLeadership()
LeaderContender
LeaderElectionService
upon revoking the
leadership of a former leader. This might happen in case that multiple contenders have
been granted leadership.revokeLeadership
in interface LeaderContender
public void onAddedJobGraph(JobID jobId)
SubmittedJobGraphStore.SubmittedJobGraphListener
SubmittedJobGraph
instances added by a different SubmittedJobGraphStore
instance.
Important: It is possible to get false positives and be notified about a job graph, which was added by this instance.
onAddedJobGraph
in interface SubmittedJobGraphStore.SubmittedJobGraphListener
jobId
- The JobID
of the added job graphpublic void onRemovedJobGraph(JobID jobId)
SubmittedJobGraphStore.SubmittedJobGraphListener
SubmittedJobGraph
instances removed by a different SubmittedJobGraphStore
instance.onRemovedJobGraph
in interface SubmittedJobGraphStore.SubmittedJobGraphListener
jobId
- The JobID
of the removed job graphpublic String getAddress()
LeaderContender
LeaderContender
under which other instances can connect
to it.getAddress
in interface LeaderContender
public void handleError(Exception exception)
handleError
in interface LeaderContender
exception
- Exception being thrown in the leader election serviceprotected void shutdown()
Copyright © 2014–2018 The Apache Software Foundation. All rights reserved.