public abstract class SchedulerBase extends Object implements SchedulerNG, CheckpointScheduling
SchedulerNG
.Modifier and Type | Field and Description |
---|---|
protected ExecutionVertexVersioner |
executionVertexVersioner |
protected InputsLocationsRetriever |
inputsLocationsRetriever |
protected JobInfo |
jobInfo |
protected JobManagerJobMetricGroup |
jobManagerJobMetricGroup |
protected OperatorCoordinatorHandler |
operatorCoordinatorHandler |
protected StateLocationRetriever |
stateLocationRetriever |
Constructor and Description |
---|
SchedulerBase(org.slf4j.Logger log,
JobGraph jobGraph,
Executor ioExecutor,
Configuration jobMasterConfiguration,
CheckpointsCleaner checkpointsCleaner,
CheckpointRecoveryFactory checkpointRecoveryFactory,
JobManagerJobMetricGroup jobManagerJobMetricGroup,
ExecutionVertexVersioner executionVertexVersioner,
long initializationTimestamp,
ComponentMainThreadExecutor mainThreadExecutor,
JobStatusListener jobStatusListener,
ExecutionGraphFactory executionGraphFactory,
VertexParallelismStore vertexParallelismStore) |
Modifier and Type | Method and Description |
---|---|
void |
acknowledgeCheckpoint(JobID jobID,
ExecutionAttemptID executionAttemptID,
long checkpointId,
CheckpointMetrics checkpointMetrics,
TaskStateSnapshot checkpointState) |
protected void |
archiveFromFailureHandlingResult(FailureHandlingResultSnapshot failureHandlingResult) |
protected void |
archiveGlobalFailure(Throwable failure,
CompletableFuture<Map<String,String>> failureLabels) |
void |
cancel() |
protected abstract void |
cancelAllPendingSlotRequestsInternal() |
CompletableFuture<Void> |
closeAsync()
Trigger the closing of the resource and return the corresponding close future.
|
static VertexParallelismStore |
computeVertexParallelismStore(Iterable<JobVertex> vertices)
Compute the
VertexParallelismStore for all given vertices, which will set defaults
and ensure that the returned store contains valid parallelisms. |
static VertexParallelismStore |
computeVertexParallelismStore(Iterable<JobVertex> vertices,
Function<JobVertex,Integer> defaultMaxParallelismFunc) |
static VertexParallelismStore |
computeVertexParallelismStore(Iterable<JobVertex> vertices,
Function<JobVertex,Integer> defaultMaxParallelismFunc,
Function<Integer,Integer> normalizeParallelismFunc)
Compute the
VertexParallelismStore for all given vertices, which will set defaults
and ensure that the returned store contains valid parallelisms, with a custom function for
default max parallelism calculation and a custom function for normalizing vertex parallelism. |
static VertexParallelismStore |
computeVertexParallelismStore(JobGraph jobGraph)
Compute the
VertexParallelismStore for all vertices of a given job graph, which will
set defaults and ensure that the returned store contains valid parallelisms. |
void |
declineCheckpoint(DeclineCheckpoint decline) |
CompletableFuture<CoordinationResponse> |
deliverCoordinationRequestToCoordinator(OperatorID operator,
CoordinationRequest request)
Delivers a coordination request to the
OperatorCoordinator with the given OperatorID and returns the coordinator's response. |
void |
deliverOperatorEventToCoordinator(ExecutionAttemptID taskExecutionId,
OperatorID operatorId,
OperatorEvent evt)
Delivers the given OperatorEvent to the
OperatorCoordinator with the given OperatorID . |
protected void |
failJob(Throwable cause,
long timestamp,
CompletableFuture<Map<String,String>> failureLabels) |
static int |
getDefaultMaxParallelism(JobVertex vertex)
Get a default value to use for a given vertex's max parallelism if none was specified.
|
Iterable<RootExceptionHistoryEntry> |
getExceptionHistory() |
ExecutionGraph |
getExecutionGraph()
ExecutionGraph is exposed to make it easier to rework tests to be based on the new scheduler.
|
ExecutionJobVertex |
getExecutionJobVertex(JobVertexID jobVertexId) |
ExecutionVertex |
getExecutionVertex(ExecutionVertexID executionVertexId) |
protected JobGraph |
getJobGraph() |
protected JobID |
getJobId() |
CompletableFuture<JobStatus> |
getJobTerminationFuture() |
protected ComponentMainThreadExecutor |
getMainThreadExecutor() |
protected MarkPartitionFinishedStrategy |
getMarkPartitionFinishedStrategy() |
protected abstract long |
getNumberOfRestarts() |
protected ResultPartitionAvailabilityChecker |
getResultPartitionAvailabilityChecker() |
protected SchedulingTopology |
getSchedulingTopology() |
void |
notifyEndOfData(ExecutionAttemptID executionAttemptID)
Notifies that the task has reached the end of data.
|
void |
notifyKvStateRegistered(JobID jobId,
JobVertexID jobVertexId,
KeyGroupRange keyGroupRange,
String registrationName,
KvStateID kvStateId,
InetSocketAddress kvStateServerAddress) |
void |
notifyKvStateUnregistered(JobID jobId,
JobVertexID jobVertexId,
KeyGroupRange keyGroupRange,
String registrationName) |
protected abstract void |
onTaskFailed(Execution execution) |
protected abstract void |
onTaskFinished(Execution execution,
IOMetrics ioMetrics) |
static void |
registerJobMetrics(MetricGroup metrics,
JobStatusProvider jobStatusProvider,
Gauge<Long> numberOfRestarts,
DeploymentStateTimeMetrics deploymentTimeMetrics,
Consumer<JobStatusListener> jobStatusListenerRegistrar,
long initializationTimestamp,
MetricOptions.JobStatusMetricsSettings jobStatusMetricsSettings) |
void |
reportCheckpointMetrics(JobID jobID,
ExecutionAttemptID attemptId,
long id,
CheckpointMetrics metrics) |
void |
reportInitializationMetrics(JobID jobId,
SubTaskInitializationMetrics initializationMetrics) |
CheckpointStatsSnapshot |
requestCheckpointStats()
Returns the checkpoint statistics for a given job.
|
ExecutionGraphInfo |
requestJob() |
JobStatus |
requestJobStatus() |
KvStateLocation |
requestKvStateLocation(JobID jobId,
String registrationName) |
SerializedInputSplit |
requestNextInputSplit(JobVertexID vertexID,
ExecutionAttemptID executionAttempt) |
ExecutionState |
requestPartitionState(IntermediateDataSetID intermediateResultId,
ResultPartitionID resultPartitionId) |
protected void |
resetForNewExecution(ExecutionVertexID executionVertexId) |
protected void |
resetForNewExecutions(Collection<ExecutionVertexID> vertices) |
protected void |
restoreState(Set<ExecutionVertexID> vertices,
boolean isGlobalRecovery) |
protected void |
setGlobalFailureCause(Throwable cause,
long timestamp) |
void |
startCheckpointScheduler()
Starts the periodic scheduling if possible.
|
void |
startScheduling() |
protected abstract void |
startSchedulingInternal() |
void |
stopCheckpointScheduler()
Stops the periodic scheduling if possible.
|
CompletableFuture<String> |
stopWithSavepoint(String targetDirectory,
boolean terminate,
SavepointFormatType formatType) |
protected void |
transitionExecutionGraphState(JobStatus current,
JobStatus newState) |
protected void |
transitionToRunning() |
protected void |
transitionToScheduled(List<ExecutionVertexID> verticesToDeploy) |
CompletableFuture<CompletedCheckpoint> |
triggerCheckpoint(CheckpointType checkpointType) |
CompletableFuture<String> |
triggerSavepoint(String targetDirectory,
boolean cancelJob,
SavepointFormatType formatType) |
void |
updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) |
boolean |
updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState) |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
requestJobResourceRequirements, updateJobResourceRequirements, updateTaskExecutionState
handleGlobalFailure
close
protected final JobInfo jobInfo
protected final StateLocationRetriever stateLocationRetriever
protected final InputsLocationsRetriever inputsLocationsRetriever
protected final JobManagerJobMetricGroup jobManagerJobMetricGroup
protected final ExecutionVertexVersioner executionVertexVersioner
protected final OperatorCoordinatorHandler operatorCoordinatorHandler
public SchedulerBase(org.slf4j.Logger log, JobGraph jobGraph, Executor ioExecutor, Configuration jobMasterConfiguration, CheckpointsCleaner checkpointsCleaner, CheckpointRecoveryFactory checkpointRecoveryFactory, JobManagerJobMetricGroup jobManagerJobMetricGroup, ExecutionVertexVersioner executionVertexVersioner, long initializationTimestamp, ComponentMainThreadExecutor mainThreadExecutor, JobStatusListener jobStatusListener, ExecutionGraphFactory executionGraphFactory, VertexParallelismStore vertexParallelismStore) throws Exception
Exception
public static int getDefaultMaxParallelism(JobVertex vertex)
vertex
- the vertex to compute a default max parallelism forpublic static VertexParallelismStore computeVertexParallelismStore(Iterable<JobVertex> vertices, Function<JobVertex,Integer> defaultMaxParallelismFunc)
public static VertexParallelismStore computeVertexParallelismStore(Iterable<JobVertex> vertices, Function<JobVertex,Integer> defaultMaxParallelismFunc, Function<Integer,Integer> normalizeParallelismFunc)
VertexParallelismStore
for all given vertices, which will set defaults
and ensure that the returned store contains valid parallelisms, with a custom function for
default max parallelism calculation and a custom function for normalizing vertex parallelism.vertices
- the vertices to compute parallelism fordefaultMaxParallelismFunc
- a function for computing a default max parallelism if none
is specified on a given vertexnormalizeParallelismFunc
- a function for normalizing vertex parallelismpublic static VertexParallelismStore computeVertexParallelismStore(Iterable<JobVertex> vertices)
VertexParallelismStore
for all given vertices, which will set defaults
and ensure that the returned store contains valid parallelisms.vertices
- the vertices to compute parallelism forpublic static VertexParallelismStore computeVertexParallelismStore(JobGraph jobGraph)
VertexParallelismStore
for all vertices of a given job graph, which will
set defaults and ensure that the returned store contains valid parallelisms.jobGraph
- the job graph to retrieve vertices fromprotected void resetForNewExecutions(Collection<ExecutionVertexID> vertices)
protected void resetForNewExecution(ExecutionVertexID executionVertexId)
protected void restoreState(Set<ExecutionVertexID> vertices, boolean isGlobalRecovery) throws Exception
Exception
protected void transitionToScheduled(List<ExecutionVertexID> verticesToDeploy)
protected void setGlobalFailureCause(@Nullable Throwable cause, long timestamp)
protected ComponentMainThreadExecutor getMainThreadExecutor()
protected void failJob(Throwable cause, long timestamp, CompletableFuture<Map<String,String>> failureLabels)
protected final SchedulingTopology getSchedulingTopology()
protected final ResultPartitionAvailabilityChecker getResultPartitionAvailabilityChecker()
protected final void transitionToRunning()
public ExecutionVertex getExecutionVertex(ExecutionVertexID executionVertexId)
public ExecutionJobVertex getExecutionJobVertex(JobVertexID jobVertexId)
protected JobGraph getJobGraph()
protected abstract long getNumberOfRestarts()
protected MarkPartitionFinishedStrategy getMarkPartitionFinishedStrategy()
protected abstract void cancelAllPendingSlotRequestsInternal()
protected void transitionExecutionGraphState(JobStatus current, JobStatus newState)
@VisibleForTesting public ExecutionGraph getExecutionGraph()
public final void startScheduling()
startScheduling
in interface SchedulerNG
public static void registerJobMetrics(MetricGroup metrics, JobStatusProvider jobStatusProvider, Gauge<Long> numberOfRestarts, DeploymentStateTimeMetrics deploymentTimeMetrics, Consumer<JobStatusListener> jobStatusListenerRegistrar, long initializationTimestamp, MetricOptions.JobStatusMetricsSettings jobStatusMetricsSettings)
protected abstract void startSchedulingInternal()
public CompletableFuture<Void> closeAsync()
AutoCloseableAsync
closeAsync
in interface AutoCloseableAsync
public void cancel()
cancel
in interface SchedulerNG
public CompletableFuture<JobStatus> getJobTerminationFuture()
getJobTerminationFuture
in interface SchedulerNG
protected final void archiveGlobalFailure(Throwable failure, CompletableFuture<Map<String,String>> failureLabels)
protected final void archiveFromFailureHandlingResult(FailureHandlingResultSnapshot failureHandlingResult)
public boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState)
updateTaskExecutionState
in interface SchedulerNG
protected abstract void onTaskFailed(Execution execution)
public SerializedInputSplit requestNextInputSplit(JobVertexID vertexID, ExecutionAttemptID executionAttempt) throws IOException
requestNextInputSplit
in interface SchedulerNG
IOException
public ExecutionState requestPartitionState(IntermediateDataSetID intermediateResultId, ResultPartitionID resultPartitionId) throws PartitionProducerDisposedException
requestPartitionState
in interface SchedulerNG
PartitionProducerDisposedException
@VisibleForTesting public Iterable<RootExceptionHistoryEntry> getExceptionHistory()
public ExecutionGraphInfo requestJob()
requestJob
in interface SchedulerNG
public CheckpointStatsSnapshot requestCheckpointStats()
SchedulerNG
CheckpointStatsSnapshot
is included in the ExecutionGraphInfo
, this method is
preferred to SchedulerNG.requestJob()
because it is less expensive.requestCheckpointStats
in interface SchedulerNG
public JobStatus requestJobStatus()
requestJobStatus
in interface SchedulerNG
public KvStateLocation requestKvStateLocation(JobID jobId, String registrationName) throws UnknownKvStateLocation, FlinkJobNotFoundException
requestKvStateLocation
in interface SchedulerNG
UnknownKvStateLocation
FlinkJobNotFoundException
public void notifyKvStateRegistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId, InetSocketAddress kvStateServerAddress) throws FlinkJobNotFoundException
notifyKvStateRegistered
in interface SchedulerNG
FlinkJobNotFoundException
public void notifyKvStateUnregistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName) throws FlinkJobNotFoundException
notifyKvStateUnregistered
in interface SchedulerNG
FlinkJobNotFoundException
public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot)
updateAccumulators
in interface SchedulerNG
public CompletableFuture<String> triggerSavepoint(String targetDirectory, boolean cancelJob, SavepointFormatType formatType)
triggerSavepoint
in interface SchedulerNG
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType checkpointType)
triggerCheckpoint
in interface SchedulerNG
public void stopCheckpointScheduler()
CheckpointScheduling
stopCheckpointScheduler
in interface CheckpointScheduling
public void startCheckpointScheduler()
CheckpointScheduling
startCheckpointScheduler
in interface CheckpointScheduling
public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot checkpointState)
acknowledgeCheckpoint
in interface SchedulerNG
public void declineCheckpoint(DeclineCheckpoint decline)
declineCheckpoint
in interface SchedulerNG
public void reportCheckpointMetrics(JobID jobID, ExecutionAttemptID attemptId, long id, CheckpointMetrics metrics)
reportCheckpointMetrics
in interface SchedulerNG
public void reportInitializationMetrics(JobID jobId, SubTaskInitializationMetrics initializationMetrics)
reportInitializationMetrics
in interface SchedulerNG
public CompletableFuture<String> stopWithSavepoint(@Nullable String targetDirectory, boolean terminate, SavepointFormatType formatType)
stopWithSavepoint
in interface SchedulerNG
public void deliverOperatorEventToCoordinator(ExecutionAttemptID taskExecutionId, OperatorID operatorId, OperatorEvent evt) throws FlinkException
SchedulerNG
OperatorCoordinator
with the given OperatorID
.
Failure semantics: If the task manager sends an event for a non-running task or a non-existing operator coordinator, then respond with an exception to the call. If task and coordinator exist, then we assume that the call from the TaskManager was valid, and any bubbling exception needs to cause a job failure
deliverOperatorEventToCoordinator
in interface SchedulerNG
FlinkException
- Thrown, if the task is not running or no operator/coordinator exists
for the given ID.public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(OperatorID operator, CoordinationRequest request) throws FlinkException
SchedulerNG
OperatorCoordinator
with the given OperatorID
and returns the coordinator's response.deliverCoordinationRequestToCoordinator
in interface SchedulerNG
FlinkException
- Thrown, if the task is not running, or no operator/coordinator exists
for the given ID, or the coordinator cannot handle client events.public void notifyEndOfData(ExecutionAttemptID executionAttemptID)
SchedulerNG
notifyEndOfData
in interface SchedulerNG
executionAttemptID
- The execution attempt id.@VisibleForTesting protected JobID getJobId()
Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.