public abstract class ClusterClient extends Object
Modifier and Type | Class and Description |
---|---|
protected static class |
ClusterClient.LazyActorSystemLoader
Utility class to lazily instantiate an
ActorSystem . |
Modifier and Type | Field and Description |
---|---|
protected ClusterClient.LazyActorSystemLoader |
actorSystemLoader
The actor system used to communicate with the JobManager.
|
protected Configuration |
flinkConfig
Configuration of the client.
|
protected HighAvailabilityServices |
highAvailabilityServices
Service factory for high available.
|
protected org.slf4j.Logger |
log |
protected scala.concurrent.duration.FiniteDuration |
timeout
Timeout for futures.
|
Constructor and Description |
---|
ClusterClient(Configuration flinkConfig)
Creates a instance that submits the programs to the JobManager defined in the
configuration.
|
ClusterClient(Configuration flinkConfig,
HighAvailabilityServices highAvailabilityServices)
Creates a instance that submits the programs to the JobManager defined in the
configuration.
|
Modifier and Type | Method and Description |
---|---|
void |
cancel(JobID jobId)
Cancels a job identified by the job id.
|
String |
cancelWithSavepoint(JobID jobId,
String savepointDirectory)
Cancels a job identified by the job id and triggers a savepoint.
|
JobListeningContext |
connectToJob(JobID jobID)
Reattaches to a running job with the given job id.
|
void |
endSession(JobID jobId)
Tells the JobManager to finish the session (job) defined by the given ID.
|
void |
endSessions(List<JobID> jobIds)
Tells the JobManager to finish the sessions (jobs) defined by the given IDs.
|
protected abstract void |
finalizeCluster()
Request the cluster to shut down or disconnect.
|
Map<String,Object> |
getAccumulators(JobID jobID)
Requests and returns the accumulators for the given job identifier.
|
Map<String,Object> |
getAccumulators(JobID jobID,
ClassLoader loader)
Requests and returns the accumulators for the given job identifier.
|
abstract String |
getClusterIdentifier()
Returns a string representation of the cluster.
|
abstract GetClusterStatusResponse |
getClusterStatus()
Returns the latest cluster status, with number of Taskmanagers and slots.
|
Configuration |
getFlinkConfiguration()
Return the Flink configuration object.
|
JobGraph |
getJobGraph(PackagedProgram prog,
FlinkPlan optPlan,
SavepointRestoreSettings savepointSettings) |
InetSocketAddress |
getJobManagerAddress()
Gets the current JobManager address (may change in case of a HA setup).
|
ActorGateway |
getJobManagerGateway()
Returns the
ActorGateway of the current job manager leader using
the LeaderRetrievalService . |
abstract int |
getMaxSlots()
The client may define an upper limit on the number of slots to use.
|
protected abstract List<String> |
getNewMessages()
May return new messages from the cluster.
|
static FlinkPlan |
getOptimizedPlan(Optimizer compiler,
PackagedProgram prog,
int parallelism) |
static OptimizedPlan |
getOptimizedPlan(Optimizer compiler,
Plan p,
int parallelism) |
static String |
getOptimizedPlanAsJson(Optimizer compiler,
PackagedProgram prog,
int parallelism) |
boolean |
getPrintStatusDuringExecution() |
abstract String |
getWebInterfaceURL()
Returns an URL (as a string) to the JobManager web interface.
|
abstract boolean |
hasUserJarsInClassPath(List<URL> userJarFiles)
Returns true if the client already has the user jar and providing it again would
result in duplicate uploading of the jar.
|
boolean |
isDetached()
A flag to indicate whether this clients submits jobs detached.
|
CompletableFuture<Collection<JobStatusMessage>> |
listJobs()
Lists the currently running and finished jobs on the cluster.
|
protected void |
logAndSysout(String message)
Logs and prints to sysout if printing to stdout is enabled.
|
JobExecutionResult |
retrieveJob(JobID jobID)
Reattaches to a running from from the supplied job id.
|
JobSubmissionResult |
run(FlinkPlan compiledPlan,
List<URL> libraries,
List<URL> classpaths,
ClassLoader classLoader) |
JobSubmissionResult |
run(FlinkPlan compiledPlan,
List<URL> libraries,
List<URL> classpaths,
ClassLoader classLoader,
SavepointRestoreSettings savepointSettings) |
JobExecutionResult |
run(JobGraph jobGraph,
ClassLoader classLoader)
Submits a JobGraph blocking.
|
JobSubmissionResult |
run(JobWithJars program,
int parallelism) |
JobSubmissionResult |
run(JobWithJars jobWithJars,
int parallelism,
SavepointRestoreSettings savepointSettings)
Runs a program on the Flink cluster to which this client is connected.
|
JobSubmissionResult |
run(PackagedProgram prog,
int parallelism)
General purpose method to run a user jar from the CliFrontend in either blocking or detached mode, depending
on whether
setDetached(true) or setDetached(false) . |
JobSubmissionResult |
runDetached(JobGraph jobGraph,
ClassLoader classLoader)
Submits a JobGraph detached.
|
void |
setDetached(boolean isDetached)
Set the mode of this client (detached or blocking job execution).
|
void |
setPrintStatusDuringExecution(boolean print)
Configures whether the client should print progress updates during the execution to
System.out . |
void |
shutdown()
Shuts down the client.
|
void |
stop(JobID jobId)
Stops a program on Flink cluster whose job-manager is configured in this client's configuration.
|
protected abstract JobSubmissionResult |
submitJob(JobGraph jobGraph,
ClassLoader classLoader)
Calls the subclasses' submitJob method.
|
CompletableFuture<String> |
triggerSavepoint(JobID jobId,
String savepointDirectory)
Triggers a savepoint for the job identified by the job id.
|
abstract void |
waitForClusterToBeReady()
Blocks until the client has determined that the cluster is ready for Job submission.
|
protected final org.slf4j.Logger log
protected final ClusterClient.LazyActorSystemLoader actorSystemLoader
protected final Configuration flinkConfig
protected final scala.concurrent.duration.FiniteDuration timeout
protected final HighAvailabilityServices highAvailabilityServices
public ClusterClient(Configuration flinkConfig) throws Exception
flinkConfig
- The config used to obtain the job-manager's address, and used to configure the optimizer.Exception
- we cannot create the high availability servicespublic ClusterClient(Configuration flinkConfig, HighAvailabilityServices highAvailabilityServices)
flinkConfig
- The config used to obtain the job-manager's address, and used to configure the optimizer.highAvailabilityServices
- HighAvailabilityServices to use for leader retrievalpublic void shutdown() throws Exception
Exception
public void setPrintStatusDuringExecution(boolean print)
System.out
.
All updates are logged via the SLF4J loggers regardless of this setting.print
- True to print updates to standard out during execution, false to not print them.public boolean getPrintStatusDuringExecution()
System.out
public InetSocketAddress getJobManagerAddress()
public static String getOptimizedPlanAsJson(Optimizer compiler, PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException
public static FlinkPlan getOptimizedPlan(Optimizer compiler, PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException
public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException
CompilerException
public JobSubmissionResult run(PackagedProgram prog, int parallelism) throws ProgramInvocationException, ProgramMissingJobException
setDetached(true)
or setDetached(false)
.prog
- the packaged programparallelism
- the parallelism to execute the contained Flink jobProgramMissingJobException
ProgramInvocationException
public JobSubmissionResult run(JobWithJars program, int parallelism) throws ProgramInvocationException
ProgramInvocationException
public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings savepointSettings) throws CompilerException, ProgramInvocationException
jobWithJars
- The program to be executed.parallelism
- The default parallelism to use when running the program. The default parallelism is used
when the program does not set a parallelism by itself.CompilerException
- Thrown, if the compiler encounters an illegal situation.ProgramInvocationException
- Thrown, if the program could not be instantiated from its jar file,
or if the submission failed. That might be either due to an I/O problem,
i.e. the job-manager is unreachable, or due to the fact that the
parallel execution failed.public JobSubmissionResult run(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader) throws ProgramInvocationException
ProgramInvocationException
public JobSubmissionResult run(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings) throws ProgramInvocationException
ProgramInvocationException
public JobExecutionResult run(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException
jobGraph
- The JobGraphclassLoader
- User code class loader to deserialize the results and errors (may contain custom classes).ProgramInvocationException
public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException
jobGraph
- The JobGraphclassLoader
- User code class loader to deserialize the results and errors (may contain custom classes).ProgramInvocationException
public JobExecutionResult retrieveJob(JobID jobID) throws JobExecutionException
jobID
- The job id of the job to attach toJobExecutionException
- if an error occurs during monitoring the job executionpublic JobListeningContext connectToJob(JobID jobID) throws JobExecutionException
jobID
- The job id of the job to attach toJobExecutionException
- if an error occurs during monitoring the job executionpublic void cancel(JobID jobId) throws Exception
jobId
- the job idException
- In case an error occurred.public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirectory) throws Exception
jobId
- the job idsavepointDirectory
- directory the savepoint should be written toException
- In case an error cocurred.public void stop(JobID jobId) throws Exception
jobId
- the job ID of the streaming program to stopException
- If the job ID is invalid (ie, is unknown or refers to a batch job) or if sending the stop signal
failed. That might be due to an I/O problem, ie, the job-manager is unreachable.public CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory) throws Exception
CoreOptions.SAVEPOINT_DIRECTORY
if it is null.jobId
- job idsavepointDirectory
- directory the savepoint should be written toException
- if no connection to the cluster could be establishedpublic CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception
Exception
- if no connection to the cluster could be establishedpublic Map<String,Object> getAccumulators(JobID jobID) throws Exception
jobID
- The job identifier of a job.Exception
public Map<String,Object> getAccumulators(JobID jobID, ClassLoader loader) throws Exception
jobID
- The job identifier of a job.loader
- The class loader for deserializing the accumulator results.Exception
public void endSession(JobID jobId) throws Exception
jobId
- The ID that identifies the session.Exception
public void endSessions(List<JobID> jobIds) throws Exception
jobIds
- The IDs that identify the sessions.Exception
public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan, SavepointRestoreSettings savepointSettings) throws ProgramInvocationException
ProgramInvocationException
public ActorGateway getJobManagerGateway() throws Exception
ActorGateway
of the current job manager leader using
the LeaderRetrievalService
.Exception
protected void logAndSysout(String message)
message
- The message to log/printpublic abstract void waitForClusterToBeReady()
This is delayed until right before job submission to report any other errors first (e.g. invalid job definitions/errors in the user jar)
public abstract String getWebInterfaceURL()
public abstract GetClusterStatusResponse getClusterStatus()
protected abstract List<String> getNewMessages()
public abstract String getClusterIdentifier()
protected abstract void finalizeCluster()
public void setDetached(boolean isDetached)
isDetached
- If true, the client will submit programs detached via the run
methodpublic boolean isDetached()
public Configuration getFlinkConfiguration()
public abstract int getMaxSlots()
public abstract boolean hasUserJarsInClassPath(List<URL> userJarFiles)
protected abstract JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException
jobGraph
- The JobGraph to be submittedProgramInvocationException
Copyright © 2014–2018 The Apache Software Foundation. All rights reserved.