public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrievable> extends FlinkUntypedActor
requestNewWorkers(int)
, which triggers requests
for more containers. After that, the getNumWorkerRequestsPending()
should reflect the pending requests.getNumWorkersPendingRegistration()
.workerStarted(ResourceID)
will be called
to inform about a registered worker.Modifier and Type | Field and Description |
---|---|
protected Configuration |
config
The Flink configuration object
|
protected static int |
EXIT_CODE_FATAL_ERROR
The exit code with which the process is stopped in case of a fatal error
|
static String |
RESOURCE_MANAGER_NAME
The default name of the resource manager actor
|
LOG
Modifier | Constructor and Description |
---|---|
protected |
FlinkResourceManager(int numInitialTaskManagers,
Configuration flinkConfig,
LeaderRetrievalService leaderRetriever)
Creates a AbstractFrameworkMaster actor.
|
Modifier and Type | Method and Description |
---|---|
Collection<WorkerType> |
allStartedWorkers()
Gets an iterable for all currently started TaskManagers.
|
protected abstract void |
fatalError(String message,
Throwable error)
Notifies the resource master of a fatal error.
|
int |
getDesignatedWorkerPoolSize()
Gets the current designated worker pool size, meaning the number of workers
that the resource master strives to maintain.
|
protected UUID |
getLeaderSessionID()
Returns the current leader session ID associated with this actor.
|
int |
getNumberOfStartedTaskManagers()
Gets the number of currently started TaskManagers.
|
protected abstract int |
getNumWorkerRequestsPending()
Gets the number of requested workers that have not yet been granted.
|
protected abstract int |
getNumWorkersPendingRegistration()
Gets the number of containers that have been started, but where the TaskManager
has not yet registered at the job manager.
|
static akka.actor.Props |
getResourceManagerProps(Class<? extends FlinkResourceManager> resourceManagerClass,
Configuration configuration,
LeaderRetrievalService leaderRetrievalService) |
Collection<WorkerType> |
getStartedTaskManagers()
Gets the currently registered resources.
|
protected void |
handleMessage(Object message)
This method receives the actor messages after they have been filtered for
a match with the leader session.
|
protected abstract void |
initialize()
Initializes the framework specific components.
|
boolean |
isStarted(ResourceID resourceId)
Gets the started worker for a given resource ID, if one is available.
|
void |
notifyWorkerFailed(ResourceID resourceID,
String message)
This method should be called by the framework once it detects that a currently registered
worker has failed.
|
void |
postStop() |
void |
preStart() |
protected abstract Collection<WorkerType> |
reacceptRegisteredWorkers(Collection<ResourceID> registered)
This method is called when the resource manager starts after a failure and reconnects to
the leader JobManager, who still has some workers registered.
|
protected abstract void |
releasePendingWorker(ResourceID resourceID)
Trigger a release of a pending worker.
|
protected abstract void |
releaseStartedWorker(WorkerType resourceID)
Trigger a release of a started worker.
|
protected abstract void |
requestNewWorkers(int numWorkers)
Requests to allocate a certain number of new workers.
|
protected void |
sendInfoMessage(String message) |
protected abstract void |
shutdownApplication(ApplicationStatus finalStatus,
String optionalDiagnostics)
The framework specific code for shutting down the application.
|
static akka.actor.ActorRef |
startResourceManagerActors(Configuration configuration,
akka.actor.ActorSystem actorSystem,
LeaderRetrievalService leaderRetriever,
Class<? extends FlinkResourceManager<?>> resourceManagerClass)
Starts the resource manager actors.
|
static akka.actor.ActorRef |
startResourceManagerActors(Configuration configuration,
akka.actor.ActorSystem actorSystem,
LeaderRetrievalService leaderRetriever,
Class<? extends FlinkResourceManager<?>> resourceManagerClass,
String resourceManagerActorName)
Starts the resource manager actors.
|
void |
triggerCheckWorkers()
This method causes the resource framework master to asynchronouslyre-examine
the set of available and pending workers containers, and release or allocate
containers if needed.
|
protected void |
triggerConnectingToJobManager(String leaderAddress)
Causes the resource manager to announce itself at the new leader JobManager and
obtains its connection information and currently known TaskManagers.
|
protected abstract WorkerType |
workerStarted(ResourceID resourceID)
Callback when a worker was started.
|
decorateMessage, onReceive
akka$actor$Actor$_setter_$context_$eq, akka$actor$Actor$_setter_$self_$eq, aroundPostRestart, aroundPostStop, aroundPreRestart, aroundPreStart, aroundReceive, context, getContext, getSelf, getSender, postRestart, preRestart, receive, self, sender, supervisorStrategy, unhandled
protected static final int EXIT_CODE_FATAL_ERROR
public static final String RESOURCE_MANAGER_NAME
protected final Configuration config
protected FlinkResourceManager(int numInitialTaskManagers, Configuration flinkConfig, LeaderRetrievalService leaderRetriever)
flinkConfig
- The Flink configuration object.public void preStart()
preStart
in interface akka.actor.Actor
preStart
in class akka.actor.UntypedActor
public void postStop()
postStop
in interface akka.actor.Actor
postStop
in class akka.actor.UntypedActor
protected void handleMessage(Object message)
handleMessage
in class FlinkUntypedActor
message
- The incoming actor message.protected final UUID getLeaderSessionID()
FlinkUntypedActor
getLeaderSessionID
in class FlinkUntypedActor
public int getDesignatedWorkerPoolSize()
public int getNumberOfStartedTaskManagers()
public Collection<WorkerType> getStartedTaskManagers()
public boolean isStarted(ResourceID resourceId)
resourceId
- The resource ID for the worker.public Collection<WorkerType> allStartedWorkers()
protected void triggerConnectingToJobManager(String leaderAddress)
leaderAddress
- The akka actor URL of the new leader JobManager.public void triggerCheckWorkers()
public void notifyWorkerFailed(ResourceID resourceID, String message)
resourceID
- Id of the worker that has failed.message
- An informational message that explains why the worker failed.protected abstract void initialize() throws Exception
Exception
- Exceptions during initialization cause the resource manager to fail.
If the framework is able to recover this resource manager, it will be
restarted.protected abstract void shutdownApplication(ApplicationStatus finalStatus, String optionalDiagnostics)
finalStatus
- The application status to report.optionalDiagnostics
- An optional diagnostics message.protected abstract void fatalError(String message, Throwable error)
IMPORTANT: This should not cleanly shut down this master, but exit it in such a way that a high-availability setting would restart this or fail over to another master.
protected abstract void requestNewWorkers(int numWorkers)
numWorkers
- The number of workers to allocate.protected abstract void releasePendingWorker(ResourceID resourceID)
resourceID
- The worker resource idprotected abstract void releaseStartedWorker(WorkerType resourceID)
resourceID
- The worker resource idprotected abstract WorkerType workerStarted(ResourceID resourceID)
resourceID
- The worker resource idprotected abstract Collection<WorkerType> reacceptRegisteredWorkers(Collection<ResourceID> registered)
notifyWorkerFailed(ResourceID, String)
.registered
- The list of TaskManagers that the JobManager knows.protected abstract int getNumWorkerRequestsPending()
protected abstract int getNumWorkersPendingRegistration()
protected void sendInfoMessage(String message)
public static akka.actor.ActorRef startResourceManagerActors(Configuration configuration, akka.actor.ActorSystem actorSystem, LeaderRetrievalService leaderRetriever, Class<? extends FlinkResourceManager<?>> resourceManagerClass)
configuration
- The configuration for the resource manageractorSystem
- The actor system to start the resource manager inleaderRetriever
- The leader retriever service to initialize the resource managerresourceManagerClass
- The class of the ResourceManager to be startedpublic static akka.actor.ActorRef startResourceManagerActors(Configuration configuration, akka.actor.ActorSystem actorSystem, LeaderRetrievalService leaderRetriever, Class<? extends FlinkResourceManager<?>> resourceManagerClass, String resourceManagerActorName)
configuration
- The configuration for the resource manageractorSystem
- The actor system to start the resource manager inleaderRetriever
- The leader retriever service to initialize the resource managerresourceManagerClass
- The class of the ResourceManager to be startedresourceManagerActorName
- The name of the resource manager actor.public static akka.actor.Props getResourceManagerProps(Class<? extends FlinkResourceManager> resourceManagerClass, Configuration configuration, LeaderRetrievalService leaderRetrievalService)
Copyright © 2014–2019 The Apache Software Foundation. All rights reserved.