public class AdaptiveScheduler extends Object implements SchedulerNG
SchedulerNG
implementation that uses the declarative resource management and
automatically adapts the parallelism in case not enough resource could be acquired to run at the
configured parallelism, as described in FLIP-160.
This scheduler only supports jobs with streaming semantics, i.e., all vertices are connected via pipelined data-exchanges.
The implementation is spread over multiple State
classes that control which RPCs are
allowed in a given state and what state transitions are possible (see the FLIP for an overview).
This class can thus be roughly split into 2 parts:
1) RPCs, which must forward the call to the state via State.tryRun(Class,
ThrowingConsumer, String)
or State.tryCall(Class, FunctionWithException, String)
.
2) Context methods, which are called by states, to either transition into another state or access functionality of some component in the scheduler.
Modifier and Type | Class and Description |
---|---|
static class |
AdaptiveScheduler.Settings
Consolidated settings for the adaptive scheduler.
|
Constructor and Description |
---|
AdaptiveScheduler(AdaptiveScheduler.Settings settings,
JobGraph jobGraph,
JobResourceRequirements jobResourceRequirements,
Configuration configuration,
DeclarativeSlotPool declarativeSlotPool,
SlotAllocator slotAllocator,
Executor ioExecutor,
ClassLoader userCodeClassLoader,
CheckpointsCleaner checkpointsCleaner,
CheckpointRecoveryFactory checkpointRecoveryFactory,
JobManagerJobMetricGroup jobManagerJobMetricGroup,
RestartBackoffTimeStrategy restartBackoffTimeStrategy,
long initializationTimestamp,
ComponentMainThreadExecutor mainThreadExecutor,
FatalErrorHandler fatalErrorHandler,
JobStatusListener jobStatusListener,
Collection<FailureEnricher> failureEnrichers,
ExecutionGraphFactory executionGraphFactory) |
Modifier and Type | Method and Description |
---|---|
void |
acknowledgeCheckpoint(JobID jobID,
ExecutionAttemptID executionAttemptID,
long checkpointId,
CheckpointMetrics checkpointMetrics,
TaskStateSnapshot checkpointState) |
void |
archiveFailure(RootExceptionHistoryEntry failure)
Archive failure.
|
void |
cancel() |
CompletableFuture<Void> |
closeAsync()
Trigger the closing of the resource and return the corresponding close future.
|
void |
declineCheckpoint(DeclineCheckpoint decline) |
CompletableFuture<CoordinationResponse> |
deliverCoordinationRequestToCoordinator(OperatorID operator,
CoordinationRequest request)
Delivers a coordination request to the
OperatorCoordinator with the given OperatorID and returns the coordinator's response. |
void |
deliverOperatorEventToCoordinator(ExecutionAttemptID taskExecution,
OperatorID operator,
OperatorEvent evt)
Delivers the given OperatorEvent to the
OperatorCoordinator with the given OperatorID . |
ArchivedExecutionGraph |
getArchivedExecutionGraph(JobStatus jobStatus,
Throwable cause)
Creates the
ArchivedExecutionGraph for the given job status and cause. |
Executor |
getIOExecutor()
Gets the I/O executor.
|
CompletableFuture<JobStatus> |
getJobTerminationFuture() |
ComponentMainThreadExecutor |
getMainThreadExecutor()
Gets the main thread executor.
|
JobManagerJobMetricGroup |
getMetricGroup()
Gets the
JobManagerJobMetricGroup . |
void |
goToCanceling(ExecutionGraph executionGraph,
ExecutionGraphHandler executionGraphHandler,
OperatorCoordinatorHandler operatorCoordinatorHandler,
List<ExceptionHistoryEntry> failureCollection)
Transitions into the
Canceling state. |
void |
goToCreatingExecutionGraph(ExecutionGraph previousExecutionGraph)
Transitions into the
CreatingExecutionGraph state. |
void |
goToExecuting(ExecutionGraph executionGraph,
ExecutionGraphHandler executionGraphHandler,
OperatorCoordinatorHandler operatorCoordinatorHandler,
List<ExceptionHistoryEntry> failureCollection)
Transitions into the
Executing state. |
void |
goToFailing(ExecutionGraph executionGraph,
ExecutionGraphHandler executionGraphHandler,
OperatorCoordinatorHandler operatorCoordinatorHandler,
Throwable failureCause,
List<ExceptionHistoryEntry> failureCollection)
Transitions into the
Failing state. |
void |
goToFinished(ArchivedExecutionGraph archivedExecutionGraph)
Transitions into the
Finished state. |
void |
goToRestarting(ExecutionGraph executionGraph,
ExecutionGraphHandler executionGraphHandler,
OperatorCoordinatorHandler operatorCoordinatorHandler,
Duration backoffTime,
List<ExceptionHistoryEntry> failureCollection)
Transitions into the
Restarting state. |
CompletableFuture<String> |
goToStopWithSavepoint(ExecutionGraph executionGraph,
ExecutionGraphHandler executionGraphHandler,
OperatorCoordinatorHandler operatorCoordinatorHandler,
CheckpointScheduling checkpointScheduling,
CompletableFuture<String> savepointFuture,
List<ExceptionHistoryEntry> failureCollection)
Transitions into the
StopWithSavepoint state. |
void |
goToWaitingForResources(ExecutionGraph previousExecutionGraph)
Transitions into the
WaitingForResources state. |
void |
handleGlobalFailure(Throwable cause)
Handles a global failure.
|
boolean |
hasDesiredResources()
Checks whether we have the desired resources.
|
boolean |
hasSufficientResources()
Checks if we currently have sufficient resources for executing the job.
|
org.apache.flink.runtime.scheduler.adaptive.FailureResult |
howToHandleFailure(Throwable failure,
CompletableFuture<Map<String,String>> failureLabels)
Asks how to handle the failure.
|
boolean |
isState(org.apache.flink.runtime.scheduler.adaptive.State expectedState)
Checks whether the current state is the expected state.
|
void |
notifyEndOfData(ExecutionAttemptID executionAttemptID)
Notifies that the task has reached the end of data.
|
void |
notifyKvStateRegistered(JobID jobId,
JobVertexID jobVertexId,
KeyGroupRange keyGroupRange,
String registrationName,
KvStateID kvStateId,
InetSocketAddress kvStateServerAddress) |
void |
notifyKvStateUnregistered(JobID jobId,
JobVertexID jobVertexId,
KeyGroupRange keyGroupRange,
String registrationName) |
void |
onFinished(ArchivedExecutionGraph archivedExecutionGraph)
Callback which is called when the execution reaches the
Finished state. |
void |
reportCheckpointMetrics(JobID jobID,
ExecutionAttemptID executionAttemptID,
long checkpointId,
CheckpointMetrics checkpointMetrics) |
void |
reportInitializationMetrics(JobID jobId,
SubTaskInitializationMetrics initializationMetrics) |
CheckpointStatsSnapshot |
requestCheckpointStats()
Returns the checkpoint statistics for a given job.
|
ExecutionGraphInfo |
requestJob() |
JobResourceRequirements |
requestJobResourceRequirements()
Read current
job resource requirements . |
JobStatus |
requestJobStatus() |
KvStateLocation |
requestKvStateLocation(JobID jobId,
String registrationName) |
SerializedInputSplit |
requestNextInputSplit(JobVertexID vertexID,
ExecutionAttemptID executionAttempt) |
ExecutionState |
requestPartitionState(IntermediateDataSetID intermediateResultId,
ResultPartitionID resultPartitionId) |
void |
runIfState(org.apache.flink.runtime.scheduler.adaptive.State expectedState,
Runnable action)
Run the given action if the current state equals the expected state.
|
ScheduledFuture<?> |
runIfState(org.apache.flink.runtime.scheduler.adaptive.State expectedState,
Runnable action,
Duration delay)
Runs the given action after a delay if the state at this time equals the expected state.
|
boolean |
shouldRescale(ExecutionGraph executionGraph,
boolean forceRescale)
In regular mode, rescale the job if added resource meets
JobManagerOptions.MIN_PARALLELISM_INCREASE . |
void |
startScheduling() |
CompletableFuture<String> |
stopWithSavepoint(String targetDirectory,
boolean terminate,
SavepointFormatType formatType) |
CompletableFuture<CompletedCheckpoint> |
triggerCheckpoint(CheckpointType checkpointType) |
CompletableFuture<String> |
triggerSavepoint(String targetDirectory,
boolean cancelJob,
SavepointFormatType formatType) |
org.apache.flink.runtime.scheduler.adaptive.CreatingExecutionGraph.AssignmentResult |
tryToAssignSlots(org.apache.flink.runtime.scheduler.adaptive.CreatingExecutionGraph.ExecutionGraphWithVertexParallelism executionGraphWithVertexParallelism)
Try to assign slots to the created
ExecutionGraph . |
void |
updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) |
void |
updateJobResourceRequirements(JobResourceRequirements jobResourceRequirements)
Update
job resource requirements . |
boolean |
updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState) |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
updateTaskExecutionState
close
public AdaptiveScheduler(AdaptiveScheduler.Settings settings, JobGraph jobGraph, @Nullable JobResourceRequirements jobResourceRequirements, Configuration configuration, DeclarativeSlotPool declarativeSlotPool, SlotAllocator slotAllocator, Executor ioExecutor, ClassLoader userCodeClassLoader, CheckpointsCleaner checkpointsCleaner, CheckpointRecoveryFactory checkpointRecoveryFactory, JobManagerJobMetricGroup jobManagerJobMetricGroup, RestartBackoffTimeStrategy restartBackoffTimeStrategy, long initializationTimestamp, ComponentMainThreadExecutor mainThreadExecutor, FatalErrorHandler fatalErrorHandler, JobStatusListener jobStatusListener, Collection<FailureEnricher> failureEnrichers, ExecutionGraphFactory executionGraphFactory) throws JobExecutionException
JobExecutionException
public void startScheduling()
startScheduling
in interface SchedulerNG
public CompletableFuture<Void> closeAsync()
AutoCloseableAsync
closeAsync
in interface AutoCloseableAsync
public void cancel()
cancel
in interface SchedulerNG
public CompletableFuture<JobStatus> getJobTerminationFuture()
getJobTerminationFuture
in interface SchedulerNG
public void handleGlobalFailure(Throwable cause)
GlobalFailureHandler
handleGlobalFailure
in interface GlobalFailureHandler
cause
- A cause that describes the global failure.public boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState)
updateTaskExecutionState
in interface SchedulerNG
public SerializedInputSplit requestNextInputSplit(JobVertexID vertexID, ExecutionAttemptID executionAttempt) throws IOException
requestNextInputSplit
in interface SchedulerNG
IOException
public ExecutionState requestPartitionState(IntermediateDataSetID intermediateResultId, ResultPartitionID resultPartitionId) throws PartitionProducerDisposedException
requestPartitionState
in interface SchedulerNG
PartitionProducerDisposedException
public ExecutionGraphInfo requestJob()
requestJob
in interface SchedulerNG
public CheckpointStatsSnapshot requestCheckpointStats()
SchedulerNG
CheckpointStatsSnapshot
is included in the ExecutionGraphInfo
, this method is
preferred to SchedulerNG.requestJob()
because it is less expensive.requestCheckpointStats
in interface SchedulerNG
public void archiveFailure(RootExceptionHistoryEntry failure)
public JobStatus requestJobStatus()
requestJobStatus
in interface SchedulerNG
public KvStateLocation requestKvStateLocation(JobID jobId, String registrationName) throws UnknownKvStateLocation, FlinkJobNotFoundException
requestKvStateLocation
in interface SchedulerNG
UnknownKvStateLocation
FlinkJobNotFoundException
public void notifyKvStateRegistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId, InetSocketAddress kvStateServerAddress) throws FlinkJobNotFoundException
notifyKvStateRegistered
in interface SchedulerNG
FlinkJobNotFoundException
public void notifyKvStateUnregistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName) throws FlinkJobNotFoundException
notifyKvStateUnregistered
in interface SchedulerNG
FlinkJobNotFoundException
public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot)
updateAccumulators
in interface SchedulerNG
public CompletableFuture<String> triggerSavepoint(@Nullable String targetDirectory, boolean cancelJob, SavepointFormatType formatType)
triggerSavepoint
in interface SchedulerNG
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType checkpointType)
triggerCheckpoint
in interface SchedulerNG
public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot checkpointState)
acknowledgeCheckpoint
in interface SchedulerNG
public void notifyEndOfData(ExecutionAttemptID executionAttemptID)
SchedulerNG
notifyEndOfData
in interface SchedulerNG
executionAttemptID
- The execution attempt id.public void reportCheckpointMetrics(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics)
reportCheckpointMetrics
in interface SchedulerNG
public void declineCheckpoint(DeclineCheckpoint decline)
declineCheckpoint
in interface SchedulerNG
public void reportInitializationMetrics(JobID jobId, SubTaskInitializationMetrics initializationMetrics)
reportInitializationMetrics
in interface SchedulerNG
public CompletableFuture<String> stopWithSavepoint(@Nullable String targetDirectory, boolean terminate, SavepointFormatType formatType)
stopWithSavepoint
in interface SchedulerNG
public void deliverOperatorEventToCoordinator(ExecutionAttemptID taskExecution, OperatorID operator, OperatorEvent evt) throws FlinkException
SchedulerNG
OperatorCoordinator
with the given OperatorID
.
Failure semantics: If the task manager sends an event for a non-running task or a non-existing operator coordinator, then respond with an exception to the call. If task and coordinator exist, then we assume that the call from the TaskManager was valid, and any bubbling exception needs to cause a job failure
deliverOperatorEventToCoordinator
in interface SchedulerNG
FlinkException
- Thrown, if the task is not running or no operator/coordinator exists
for the given ID.public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(OperatorID operator, CoordinationRequest request) throws FlinkException
SchedulerNG
OperatorCoordinator
with the given OperatorID
and returns the coordinator's response.deliverCoordinationRequestToCoordinator
in interface SchedulerNG
FlinkException
- Thrown, if the task is not running, or no operator/coordinator exists
for the given ID, or the coordinator cannot handle client events.public JobResourceRequirements requestJobResourceRequirements()
SchedulerNG
job resource requirements
.requestJobResourceRequirements
in interface SchedulerNG
public void updateJobResourceRequirements(JobResourceRequirements jobResourceRequirements)
SchedulerNG
job resource requirements
.updateJobResourceRequirements
in interface SchedulerNG
jobResourceRequirements
- new resource requirementspublic boolean hasDesiredResources()
true
if we have enough resources; otherwise false
public boolean hasSufficientResources()
true
if we have sufficient resources; otherwise false
public ArchivedExecutionGraph getArchivedExecutionGraph(JobStatus jobStatus, @Nullable Throwable cause)
ArchivedExecutionGraph
for the given job status and cause. Cause can
be null if there is no failure.jobStatus
- jobStatus to initialize the ArchivedExecutionGraph
withcause
- cause describing a failure cause; null
if there is noneArchivedExecutionGraph
public void goToWaitingForResources(@Nullable ExecutionGraph previousExecutionGraph)
StateTransitions.ToWaitingForResources
WaitingForResources
state.public void goToExecuting(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, List<ExceptionHistoryEntry> failureCollection)
StateTransitions.ToExecuting
Executing
state.executionGraph
- executionGraph to pass to the Executing
stateexecutionGraphHandler
- executionGraphHandler to pass to the Executing
stateoperatorCoordinatorHandler
- operatorCoordinatorHandler to pass to the Executing
statefailureCollection
- collection of failures that are propagatedpublic void goToCanceling(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, List<ExceptionHistoryEntry> failureCollection)
StateTransitions.ToCancelling
Canceling
state.executionGraph
- executionGraph to pass to the Canceling
stateexecutionGraphHandler
- executionGraphHandler to pass to the Canceling
stateoperatorCoordinatorHandler
- operatorCoordinatorHandler to pass to the Canceling
statefailureCollection
- collection of failures that are propagatedpublic void goToRestarting(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Duration backoffTime, List<ExceptionHistoryEntry> failureCollection)
StateTransitions.ToRestarting
Restarting
state.executionGraph
- executionGraph to pass to the Restarting
stateexecutionGraphHandler
- executionGraphHandler to pass to the Restarting
stateoperatorCoordinatorHandler
- operatorCoordinatorHandler to pas to the Restarting
statebackoffTime
- backoffTime to wait before transitioning to the Restarting
statefailureCollection
- collection of failures that are propagatedpublic void goToFailing(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Throwable failureCause, List<ExceptionHistoryEntry> failureCollection)
StateTransitions.ToFailing
Failing
state.executionGraph
- executionGraph to pass to the Failing
stateexecutionGraphHandler
- executionGraphHandler to pass to the Failing
stateoperatorCoordinatorHandler
- operatorCoordinatorHandler to pass to the Failing
statefailureCause
- failureCause describing why the job execution failedfailureCollection
- collection of failures that are propagatedpublic CompletableFuture<String> goToStopWithSavepoint(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, CheckpointScheduling checkpointScheduling, CompletableFuture<String> savepointFuture, List<ExceptionHistoryEntry> failureCollection)
StateTransitions.ToStopWithSavepoint
StopWithSavepoint
state.executionGraph
- executionGraph to pass to the StopWithSavepoint
stateexecutionGraphHandler
- executionGraphHandler to pass to the StopWithSavepoint
stateoperatorCoordinatorHandler
- operatorCoordinatorHandler to pass to the StopWithSavepoint
statesavepointFuture
- Future for the savepoint to complete.failureCollection
- collection of failures that are propagatedpublic void goToFinished(ArchivedExecutionGraph archivedExecutionGraph)
StateTransitions.ToFinished
Finished
state.archivedExecutionGraph
- archivedExecutionGraph which is passed to the Finished
statepublic void goToCreatingExecutionGraph(@Nullable ExecutionGraph previousExecutionGraph)
StateTransitions.ToCreatingExecutionGraph
CreatingExecutionGraph
state.public org.apache.flink.runtime.scheduler.adaptive.CreatingExecutionGraph.AssignmentResult tryToAssignSlots(org.apache.flink.runtime.scheduler.adaptive.CreatingExecutionGraph.ExecutionGraphWithVertexParallelism executionGraphWithVertexParallelism)
ExecutionGraph
. If it is possible, then this
method returns a successful AssignmentResult
which contains the assigned ExecutionGraph
. If not, then the assignment result is a failure.executionGraphWithVertexParallelism
- executionGraphWithVertexParallelism to assign
slots to resourcesAssignmentResult
representing the result of the assignmentpublic boolean shouldRescale(ExecutionGraph executionGraph, boolean forceRescale)
JobManagerOptions.MIN_PARALLELISM_INCREASE
. In force mode rescale if the parallelism has
changed.executionGraph
- executionGraph for making the scaling decision.forceRescale
- should we force rescalingpublic void onFinished(ArchivedExecutionGraph archivedExecutionGraph)
Finished
state.archivedExecutionGraph
- archivedExecutionGraph represents the final state of the
job executionpublic org.apache.flink.runtime.scheduler.adaptive.FailureResult howToHandleFailure(Throwable failure, CompletableFuture<Map<String,String>> failureLabels)
failure
- failure describing the failure causefailureLabels
- future of labels from error classification.FailureResult
which describes how to handle the failurepublic Executor getIOExecutor()
public ComponentMainThreadExecutor getMainThreadExecutor()
public JobManagerJobMetricGroup getMetricGroup()
JobManagerJobMetricGroup
.public boolean isState(org.apache.flink.runtime.scheduler.adaptive.State expectedState)
expectedState
- expectedState is the expected statetrue
if the current state equals the expected state; otherwise false
public void runIfState(org.apache.flink.runtime.scheduler.adaptive.State expectedState, Runnable action)
expectedState
- expectedState is the expected stateaction
- action to run if the current state equals the expected statepublic ScheduledFuture<?> runIfState(org.apache.flink.runtime.scheduler.adaptive.State expectedState, Runnable action, Duration delay)
expectedState
- expectedState describes the required state at the time of running
the actionaction
- action to run if the expected state equals the actual statedelay
- delay after which to run the actionCopyright © 2014–2024 The Apache Software Foundation. All rights reserved.