Class SchedulerBase
- java.lang.Object
-
- org.apache.flink.runtime.scheduler.SchedulerBase
-
- All Implemented Interfaces:
AutoCloseable
,CheckpointScheduling
,GlobalFailureHandler
,SchedulerNG
,AutoCloseableAsync
- Direct Known Subclasses:
DefaultScheduler
public abstract class SchedulerBase extends Object implements SchedulerNG, CheckpointScheduling
Base class which can be used to implementSchedulerNG
.
-
-
Field Summary
Fields Modifier and Type Field Description protected ExecutionVertexVersioner
executionVertexVersioner
protected InputsLocationsRetriever
inputsLocationsRetriever
protected JobInfo
jobInfo
protected JobManagerJobMetricGroup
jobManagerJobMetricGroup
protected OperatorCoordinatorHandler
operatorCoordinatorHandler
protected StateLocationRetriever
stateLocationRetriever
-
Constructor Summary
Constructors Constructor Description SchedulerBase(org.slf4j.Logger log, JobGraph jobGraph, Executor ioExecutor, Configuration jobMasterConfiguration, CheckpointsCleaner checkpointsCleaner, CheckpointRecoveryFactory checkpointRecoveryFactory, JobManagerJobMetricGroup jobManagerJobMetricGroup, ExecutionVertexVersioner executionVertexVersioner, long initializationTimestamp, ComponentMainThreadExecutor mainThreadExecutor, JobStatusListener jobStatusListener, ExecutionGraphFactory executionGraphFactory, VertexParallelismStore vertexParallelismStore)
-
Method Summary
All Methods Static Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description void
acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot checkpointState)
protected void
archiveFromFailureHandlingResult(FailureHandlingResultSnapshot failureHandlingResult)
protected void
archiveGlobalFailure(Throwable failure, CompletableFuture<Map<String,String>> failureLabels)
void
cancel()
protected abstract void
cancelAllPendingSlotRequestsInternal()
CompletableFuture<Void>
closeAsync()
Trigger the closing of the resource and return the corresponding close future.static VertexParallelismStore
computeVertexParallelismStore(Iterable<JobVertex> vertices)
Compute theVertexParallelismStore
for all given vertices, which will set defaults and ensure that the returned store contains valid parallelisms.static VertexParallelismStore
computeVertexParallelismStore(Iterable<JobVertex> vertices, Function<JobVertex,Integer> defaultMaxParallelismFunc)
static VertexParallelismStore
computeVertexParallelismStore(Iterable<JobVertex> vertices, Function<JobVertex,Integer> defaultMaxParallelismFunc, Function<Integer,Integer> normalizeParallelismFunc)
Compute theVertexParallelismStore
for all given vertices, which will set defaults and ensure that the returned store contains valid parallelisms, with a custom function for default max parallelism calculation and a custom function for normalizing vertex parallelism.static VertexParallelismStore
computeVertexParallelismStore(JobGraph jobGraph)
Compute theVertexParallelismStore
for all vertices of a given job graph, which will set defaults and ensure that the returned store contains valid parallelisms.void
declineCheckpoint(DeclineCheckpoint decline)
CompletableFuture<CoordinationResponse>
deliverCoordinationRequestToCoordinator(OperatorID operator, CoordinationRequest request)
Delivers a coordination request to theOperatorCoordinator
with the givenOperatorID
and returns the coordinator's response.void
deliverOperatorEventToCoordinator(ExecutionAttemptID taskExecutionId, OperatorID operatorId, OperatorEvent evt)
Delivers the given OperatorEvent to theOperatorCoordinator
with the givenOperatorID
.protected void
failJob(Throwable cause, long timestamp, CompletableFuture<Map<String,String>> failureLabels)
static int
getDefaultMaxParallelism(JobVertex vertex)
Get a default value to use for a given vertex's max parallelism if none was specified.Iterable<RootExceptionHistoryEntry>
getExceptionHistory()
ExecutionGraph
getExecutionGraph()
ExecutionGraph is exposed to make it easier to rework tests to be based on the new scheduler.ExecutionJobVertex
getExecutionJobVertex(JobVertexID jobVertexId)
ExecutionVertex
getExecutionVertex(ExecutionVertexID executionVertexId)
protected JobGraph
getJobGraph()
protected JobID
getJobId()
CompletableFuture<JobStatus>
getJobTerminationFuture()
protected ComponentMainThreadExecutor
getMainThreadExecutor()
protected MarkPartitionFinishedStrategy
getMarkPartitionFinishedStrategy()
protected abstract long
getNumberOfRestarts()
protected ResultPartitionAvailabilityChecker
getResultPartitionAvailabilityChecker()
protected SchedulingTopology
getSchedulingTopology()
void
notifyEndOfData(ExecutionAttemptID executionAttemptID)
Notifies that the task has reached the end of data.void
notifyKvStateRegistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId, InetSocketAddress kvStateServerAddress)
void
notifyKvStateUnregistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName)
protected abstract void
onTaskFailed(Execution execution)
protected abstract void
onTaskFinished(Execution execution, IOMetrics ioMetrics)
static void
registerJobMetrics(MetricGroup metrics, JobStatusProvider jobStatusProvider, Gauge<Long> numberOfRestarts, DeploymentStateTimeMetrics deploymentTimeMetrics, Consumer<JobStatusListener> jobStatusListenerRegistrar, long initializationTimestamp, MetricOptions.JobStatusMetricsSettings jobStatusMetricsSettings)
void
reportCheckpointMetrics(JobID jobID, ExecutionAttemptID attemptId, long id, CheckpointMetrics metrics)
void
reportInitializationMetrics(JobID jobId, ExecutionAttemptID executionAttemptId, SubTaskInitializationMetrics initializationMetrics)
CheckpointStatsSnapshot
requestCheckpointStats()
Returns the checkpoint statistics for a given job.ExecutionGraphInfo
requestJob()
JobStatus
requestJobStatus()
KvStateLocation
requestKvStateLocation(JobID jobId, String registrationName)
SerializedInputSplit
requestNextInputSplit(JobVertexID vertexID, ExecutionAttemptID executionAttempt)
ExecutionState
requestPartitionState(IntermediateDataSetID intermediateResultId, ResultPartitionID resultPartitionId)
protected void
resetForNewExecution(ExecutionVertexID executionVertexId)
protected void
resetForNewExecutions(Collection<ExecutionVertexID> vertices)
protected void
restoreState(Set<ExecutionVertexID> vertices, boolean isGlobalRecovery)
protected void
setGlobalFailureCause(Throwable cause, long timestamp)
void
startCheckpointScheduler()
Starts the periodic scheduling if possible.void
startScheduling()
protected abstract void
startSchedulingInternal()
void
stopCheckpointScheduler()
Stops the periodic scheduling if possible.CompletableFuture<String>
stopWithSavepoint(String targetDirectory, boolean terminate, SavepointFormatType formatType)
protected void
transitionExecutionGraphState(JobStatus current, JobStatus newState)
protected void
transitionToRunning()
protected void
transitionToScheduled(List<ExecutionVertexID> verticesToDeploy)
CompletableFuture<CompletedCheckpoint>
triggerCheckpoint(CheckpointType checkpointType)
CompletableFuture<String>
triggerSavepoint(String targetDirectory, boolean cancelJob, SavepointFormatType formatType)
void
updateAccumulators(AccumulatorSnapshot accumulatorSnapshot)
boolean
updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState)
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.apache.flink.util.AutoCloseableAsync
close
-
Methods inherited from interface org.apache.flink.runtime.scheduler.GlobalFailureHandler
handleGlobalFailure
-
Methods inherited from interface org.apache.flink.runtime.scheduler.SchedulerNG
requestJobResourceRequirements, updateJobResourceRequirements, updateTaskExecutionState
-
-
-
-
Field Detail
-
jobInfo
protected final JobInfo jobInfo
-
stateLocationRetriever
protected final StateLocationRetriever stateLocationRetriever
-
inputsLocationsRetriever
protected final InputsLocationsRetriever inputsLocationsRetriever
-
jobManagerJobMetricGroup
protected final JobManagerJobMetricGroup jobManagerJobMetricGroup
-
executionVertexVersioner
protected final ExecutionVertexVersioner executionVertexVersioner
-
operatorCoordinatorHandler
protected final OperatorCoordinatorHandler operatorCoordinatorHandler
-
-
Constructor Detail
-
SchedulerBase
public SchedulerBase(org.slf4j.Logger log, JobGraph jobGraph, Executor ioExecutor, Configuration jobMasterConfiguration, CheckpointsCleaner checkpointsCleaner, CheckpointRecoveryFactory checkpointRecoveryFactory, JobManagerJobMetricGroup jobManagerJobMetricGroup, ExecutionVertexVersioner executionVertexVersioner, long initializationTimestamp, ComponentMainThreadExecutor mainThreadExecutor, JobStatusListener jobStatusListener, ExecutionGraphFactory executionGraphFactory, VertexParallelismStore vertexParallelismStore) throws Exception
- Throws:
Exception
-
-
Method Detail
-
getDefaultMaxParallelism
public static int getDefaultMaxParallelism(JobVertex vertex)
Get a default value to use for a given vertex's max parallelism if none was specified.- Parameters:
vertex
- the vertex to compute a default max parallelism for- Returns:
- the computed max parallelism
-
computeVertexParallelismStore
public static VertexParallelismStore computeVertexParallelismStore(Iterable<JobVertex> vertices, Function<JobVertex,Integer> defaultMaxParallelismFunc)
-
computeVertexParallelismStore
public static VertexParallelismStore computeVertexParallelismStore(Iterable<JobVertex> vertices, Function<JobVertex,Integer> defaultMaxParallelismFunc, Function<Integer,Integer> normalizeParallelismFunc)
Compute theVertexParallelismStore
for all given vertices, which will set defaults and ensure that the returned store contains valid parallelisms, with a custom function for default max parallelism calculation and a custom function for normalizing vertex parallelism.- Parameters:
vertices
- the vertices to compute parallelism fordefaultMaxParallelismFunc
- a function for computing a default max parallelism if none is specified on a given vertexnormalizeParallelismFunc
- a function for normalizing vertex parallelism- Returns:
- the computed parallelism store
-
computeVertexParallelismStore
public static VertexParallelismStore computeVertexParallelismStore(Iterable<JobVertex> vertices)
Compute theVertexParallelismStore
for all given vertices, which will set defaults and ensure that the returned store contains valid parallelisms.- Parameters:
vertices
- the vertices to compute parallelism for- Returns:
- the computed parallelism store
-
computeVertexParallelismStore
public static VertexParallelismStore computeVertexParallelismStore(JobGraph jobGraph)
Compute theVertexParallelismStore
for all vertices of a given job graph, which will set defaults and ensure that the returned store contains valid parallelisms.- Parameters:
jobGraph
- the job graph to retrieve vertices from- Returns:
- the computed parallelism store
-
resetForNewExecutions
protected void resetForNewExecutions(Collection<ExecutionVertexID> vertices)
-
resetForNewExecution
protected void resetForNewExecution(ExecutionVertexID executionVertexId)
-
restoreState
protected void restoreState(Set<ExecutionVertexID> vertices, boolean isGlobalRecovery) throws Exception
- Throws:
Exception
-
transitionToScheduled
protected void transitionToScheduled(List<ExecutionVertexID> verticesToDeploy)
-
setGlobalFailureCause
protected void setGlobalFailureCause(@Nullable Throwable cause, long timestamp)
-
getMainThreadExecutor
protected ComponentMainThreadExecutor getMainThreadExecutor()
-
failJob
protected void failJob(Throwable cause, long timestamp, CompletableFuture<Map<String,String>> failureLabels)
-
getSchedulingTopology
protected final SchedulingTopology getSchedulingTopology()
-
getResultPartitionAvailabilityChecker
protected final ResultPartitionAvailabilityChecker getResultPartitionAvailabilityChecker()
-
transitionToRunning
protected final void transitionToRunning()
-
getExecutionVertex
public ExecutionVertex getExecutionVertex(ExecutionVertexID executionVertexId)
-
getExecutionJobVertex
public ExecutionJobVertex getExecutionJobVertex(JobVertexID jobVertexId)
-
getJobGraph
protected JobGraph getJobGraph()
-
getNumberOfRestarts
protected abstract long getNumberOfRestarts()
-
getMarkPartitionFinishedStrategy
protected MarkPartitionFinishedStrategy getMarkPartitionFinishedStrategy()
-
cancelAllPendingSlotRequestsInternal
protected abstract void cancelAllPendingSlotRequestsInternal()
-
transitionExecutionGraphState
protected void transitionExecutionGraphState(JobStatus current, JobStatus newState)
-
getExecutionGraph
@VisibleForTesting public ExecutionGraph getExecutionGraph()
ExecutionGraph is exposed to make it easier to rework tests to be based on the new scheduler. ExecutionGraph is expected to be used only for state check. Yet at the moment, before all the actions are factored out from ExecutionGraph and its sub-components, some actions may still be performed directly on it.
-
startScheduling
public final void startScheduling()
- Specified by:
startScheduling
in interfaceSchedulerNG
-
registerJobMetrics
public static void registerJobMetrics(MetricGroup metrics, JobStatusProvider jobStatusProvider, Gauge<Long> numberOfRestarts, DeploymentStateTimeMetrics deploymentTimeMetrics, Consumer<JobStatusListener> jobStatusListenerRegistrar, long initializationTimestamp, MetricOptions.JobStatusMetricsSettings jobStatusMetricsSettings)
-
startSchedulingInternal
protected abstract void startSchedulingInternal()
-
closeAsync
public CompletableFuture<Void> closeAsync()
Description copied from interface:AutoCloseableAsync
Trigger the closing of the resource and return the corresponding close future.- Specified by:
closeAsync
in interfaceAutoCloseableAsync
- Returns:
- Future which is completed once the resource has been closed
-
cancel
public void cancel()
- Specified by:
cancel
in interfaceSchedulerNG
-
getJobTerminationFuture
public CompletableFuture<JobStatus> getJobTerminationFuture()
- Specified by:
getJobTerminationFuture
in interfaceSchedulerNG
-
archiveGlobalFailure
protected final void archiveGlobalFailure(Throwable failure, CompletableFuture<Map<String,String>> failureLabels)
-
archiveFromFailureHandlingResult
protected final void archiveFromFailureHandlingResult(FailureHandlingResultSnapshot failureHandlingResult)
-
updateTaskExecutionState
public boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState)
- Specified by:
updateTaskExecutionState
in interfaceSchedulerNG
-
onTaskFailed
protected abstract void onTaskFailed(Execution execution)
-
requestNextInputSplit
public SerializedInputSplit requestNextInputSplit(JobVertexID vertexID, ExecutionAttemptID executionAttempt) throws IOException
- Specified by:
requestNextInputSplit
in interfaceSchedulerNG
- Throws:
IOException
-
requestPartitionState
public ExecutionState requestPartitionState(IntermediateDataSetID intermediateResultId, ResultPartitionID resultPartitionId) throws PartitionProducerDisposedException
- Specified by:
requestPartitionState
in interfaceSchedulerNG
- Throws:
PartitionProducerDisposedException
-
getExceptionHistory
@VisibleForTesting public Iterable<RootExceptionHistoryEntry> getExceptionHistory()
-
requestJob
public ExecutionGraphInfo requestJob()
- Specified by:
requestJob
in interfaceSchedulerNG
-
requestCheckpointStats
public CheckpointStatsSnapshot requestCheckpointStats()
Description copied from interface:SchedulerNG
Returns the checkpoint statistics for a given job. Although theCheckpointStatsSnapshot
is included in theExecutionGraphInfo
, this method is preferred toSchedulerNG.requestJob()
because it is less expensive.- Specified by:
requestCheckpointStats
in interfaceSchedulerNG
- Returns:
- checkpoint statistics snapshot for job graph
-
requestJobStatus
public JobStatus requestJobStatus()
- Specified by:
requestJobStatus
in interfaceSchedulerNG
-
requestKvStateLocation
public KvStateLocation requestKvStateLocation(JobID jobId, String registrationName) throws UnknownKvStateLocation, FlinkJobNotFoundException
- Specified by:
requestKvStateLocation
in interfaceSchedulerNG
- Throws:
UnknownKvStateLocation
FlinkJobNotFoundException
-
notifyKvStateRegistered
public void notifyKvStateRegistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId, InetSocketAddress kvStateServerAddress) throws FlinkJobNotFoundException
- Specified by:
notifyKvStateRegistered
in interfaceSchedulerNG
- Throws:
FlinkJobNotFoundException
-
notifyKvStateUnregistered
public void notifyKvStateUnregistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName) throws FlinkJobNotFoundException
- Specified by:
notifyKvStateUnregistered
in interfaceSchedulerNG
- Throws:
FlinkJobNotFoundException
-
updateAccumulators
public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot)
- Specified by:
updateAccumulators
in interfaceSchedulerNG
-
triggerSavepoint
public CompletableFuture<String> triggerSavepoint(String targetDirectory, boolean cancelJob, SavepointFormatType formatType)
- Specified by:
triggerSavepoint
in interfaceSchedulerNG
-
triggerCheckpoint
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType checkpointType)
- Specified by:
triggerCheckpoint
in interfaceSchedulerNG
-
stopCheckpointScheduler
public void stopCheckpointScheduler()
Description copied from interface:CheckpointScheduling
Stops the periodic scheduling if possible.- Specified by:
stopCheckpointScheduler
in interfaceCheckpointScheduling
-
startCheckpointScheduler
public void startCheckpointScheduler()
Description copied from interface:CheckpointScheduling
Starts the periodic scheduling if possible.- Specified by:
startCheckpointScheduler
in interfaceCheckpointScheduling
-
acknowledgeCheckpoint
public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot checkpointState)
- Specified by:
acknowledgeCheckpoint
in interfaceSchedulerNG
-
declineCheckpoint
public void declineCheckpoint(DeclineCheckpoint decline)
- Specified by:
declineCheckpoint
in interfaceSchedulerNG
-
reportCheckpointMetrics
public void reportCheckpointMetrics(JobID jobID, ExecutionAttemptID attemptId, long id, CheckpointMetrics metrics)
- Specified by:
reportCheckpointMetrics
in interfaceSchedulerNG
-
reportInitializationMetrics
public void reportInitializationMetrics(JobID jobId, ExecutionAttemptID executionAttemptId, SubTaskInitializationMetrics initializationMetrics)
- Specified by:
reportInitializationMetrics
in interfaceSchedulerNG
-
stopWithSavepoint
public CompletableFuture<String> stopWithSavepoint(@Nullable String targetDirectory, boolean terminate, SavepointFormatType formatType)
- Specified by:
stopWithSavepoint
in interfaceSchedulerNG
-
deliverOperatorEventToCoordinator
public void deliverOperatorEventToCoordinator(ExecutionAttemptID taskExecutionId, OperatorID operatorId, OperatorEvent evt) throws FlinkException
Description copied from interface:SchedulerNG
Delivers the given OperatorEvent to theOperatorCoordinator
with the givenOperatorID
.Failure semantics: If the task manager sends an event for a non-running task or a non-existing operator coordinator, then respond with an exception to the call. If task and coordinator exist, then we assume that the call from the TaskManager was valid, and any bubbling exception needs to cause a job failure
- Specified by:
deliverOperatorEventToCoordinator
in interfaceSchedulerNG
- Throws:
FlinkException
- Thrown, if the task is not running or no operator/coordinator exists for the given ID.
-
deliverCoordinationRequestToCoordinator
public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(OperatorID operator, CoordinationRequest request) throws FlinkException
Description copied from interface:SchedulerNG
Delivers a coordination request to theOperatorCoordinator
with the givenOperatorID
and returns the coordinator's response.- Specified by:
deliverCoordinationRequestToCoordinator
in interfaceSchedulerNG
- Returns:
- A future containing the response.
- Throws:
FlinkException
- Thrown, if the task is not running, or no operator/coordinator exists for the given ID, or the coordinator cannot handle client events.
-
notifyEndOfData
public void notifyEndOfData(ExecutionAttemptID executionAttemptID)
Description copied from interface:SchedulerNG
Notifies that the task has reached the end of data.- Specified by:
notifyEndOfData
in interfaceSchedulerNG
- Parameters:
executionAttemptID
- The execution attempt id.
-
getJobId
@VisibleForTesting protected JobID getJobId()
-
-