Interface ExecutionGraph
-
- All Superinterfaces:
AccessExecutionGraph
,JobStatusProvider
- All Known Implementing Classes:
DefaultExecutionGraph
public interface ExecutionGraph extends AccessExecutionGraph
The execution graph is the central data structure that coordinates the distributed execution of a data flow. It keeps representations of each parallel task, each intermediate stream, and the communication between them.The execution graph consists of the following constructs:
- The
ExecutionJobVertex
represents one vertex from the JobGraph (usually one operation like "map" or "join") during execution. It holds the aggregated state of all parallel subtasks. The ExecutionJobVertex is identified inside the graph by theJobVertexID
, which it takes from the JobGraph's corresponding JobVertex. - The
ExecutionVertex
represents one parallel subtask. For each ExecutionJobVertex, there are as many ExecutionVertices as the parallelism. The ExecutionVertex is identified by the ExecutionJobVertex and the index of the parallel subtask - The
Execution
is one attempt to execute a ExecutionVertex. There may be multiple Executions for the ExecutionVertex, in case of a failure, or in the case where some data needs to be recomputed because it is no longer available when requested by later operations. An Execution is always identified by anExecutionAttemptID
. All messages between the JobManager and the TaskManager about deployment of tasks and updates in the task status always use the ExecutionAttemptID to address the message receiver.
-
-
Method Summary
All Methods Instance Methods Abstract Methods Default Methods Modifier and Type Method Description Map<String,OptionalFailure<Accumulator<?,?>>>
aggregateUserAccumulators()
Merges all accumulator results from the tasks previously executed in the Executions.void
attachJobGraph(List<JobVertex> topologicallySorted, JobManagerJobMetricGroup jobManagerJobMetricGroup)
void
cancel()
void
enableCheckpointing(CheckpointCoordinatorConfiguration chkConfig, List<MasterTriggerRestoreHook<?>> masterHooks, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore checkpointStore, StateBackend checkpointStateBackend, CheckpointStorage checkpointStorage, CheckpointStatsTracker statsTracker, CheckpointsCleaner checkpointsCleaner, String changelogStorage)
void
failJob(Throwable cause, long timestamp)
Optional<AccessExecution>
findExecution(ExecutionAttemptID attemptId)
Optional<String>
findVertexWithAttempt(ExecutionAttemptID attemptId)
Iterable<ExecutionVertex>
getAllExecutionVertices()
Returns an iterable containing all execution vertices for this execution graph.Map<IntermediateDataSetID,IntermediateResult>
getAllIntermediateResults()
Map<JobVertexID,ExecutionJobVertex>
getAllVertices()
Returns a map containing all job vertices for this execution graph.CheckpointCoordinator
getCheckpointCoordinator()
Throwable
getFailureCause()
Configuration
getJobConfiguration()
ComponentMainThreadExecutor
getJobMasterMainThreadExecutor()
ExecutionJobVertex
getJobVertex(JobVertexID id)
Returns the job vertex for the givenJobVertexID
.KvStateLocationRegistry
getKvStateLocationRegistry()
long
getNumberOfRestarts()
Gets the number of restarts, including full restarts and fine grained restarts.int
getNumFinishedVertices()
Map<ExecutionAttemptID,Execution>
getRegisteredExecutions()
ResultPartitionAvailabilityChecker
getResultPartitionAvailabilityChecker()
IntermediateResultPartition
getResultPartitionOrThrow(IntermediateResultPartitionID id)
Gets the intermediate result partition by the given partition ID, or throw an exception if the partition is not found.SchedulingTopology
getSchedulingTopology()
CompletableFuture<JobStatus>
getTerminationFuture()
Returns the termination future of thisExecutionGraph
.Iterable<ExecutionJobVertex>
getVerticesTopologically()
Returns an iterable containing all job vertices for this execution graph in the order they were created.void
incrementRestarts()
void
initFailureCause(Throwable t, long timestamp)
default void
initializeJobVertex(ExecutionJobVertex ejv, long createTimestamp)
void
initializeJobVertex(ExecutionJobVertex ejv, long createTimestamp, Map<IntermediateDataSetID,JobVertexInputInfo> jobVertexInputInfos)
Initialize the given execution job vertex, mainly includes creating execution vertices according to the parallelism, and connecting to the predecessors.void
notifyNewlyInitializedJobVertices(List<ExecutionJobVertex> vertices)
Notify that some job vertices have been newly initialized, execution graph will try to update scheduling topology.void
registerJobStatusListener(JobStatusListener listener)
void
setInternalTaskFailuresListener(InternalFailuresListener internalTaskFailuresListener)
void
setJsonPlan(String jsonPlan)
void
start(ComponentMainThreadExecutor jobMasterMainThreadExecutor)
void
suspend(Throwable suspensionCause)
Suspends the current ExecutionGraph.boolean
transitionState(JobStatus current, JobStatus newState)
void
transitionToRunning()
void
updateAccumulators(AccumulatorSnapshot accumulatorSnapshot)
Updates the accumulators during the runtime of a job.boolean
updateState(TaskExecutionStateTransition state)
Updates the state of one of the ExecutionVertex's Execution attempts.JobStatus
waitUntilTerminal()
-
Methods inherited from interface org.apache.flink.runtime.executiongraph.AccessExecutionGraph
getAccumulatorResultsStringified, getAccumulatorsSerialized, getArchivedExecutionConfig, getChangelogStorageName, getCheckpointCoordinatorConfiguration, getCheckpointStatsSnapshot, getCheckpointStorageName, getFailureInfo, getJobID, getJobName, getJobType, getJsonPlan, getState, getStateBackendName, getStatusTimestamp, isChangelogStateBackendEnabled, isStoppable
-
-
-
-
Method Detail
-
start
void start(@Nonnull ComponentMainThreadExecutor jobMasterMainThreadExecutor)
-
getSchedulingTopology
SchedulingTopology getSchedulingTopology()
-
enableCheckpointing
void enableCheckpointing(CheckpointCoordinatorConfiguration chkConfig, List<MasterTriggerRestoreHook<?>> masterHooks, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore checkpointStore, StateBackend checkpointStateBackend, CheckpointStorage checkpointStorage, CheckpointStatsTracker statsTracker, CheckpointsCleaner checkpointsCleaner, String changelogStorage)
-
getCheckpointCoordinator
@Nullable CheckpointCoordinator getCheckpointCoordinator()
-
getKvStateLocationRegistry
KvStateLocationRegistry getKvStateLocationRegistry()
-
setJsonPlan
void setJsonPlan(String jsonPlan)
-
getJobConfiguration
Configuration getJobConfiguration()
-
getFailureCause
Throwable getFailureCause()
-
getVerticesTopologically
Iterable<ExecutionJobVertex> getVerticesTopologically()
Description copied from interface:AccessExecutionGraph
Returns an iterable containing all job vertices for this execution graph in the order they were created.- Specified by:
getVerticesTopologically
in interfaceAccessExecutionGraph
- Returns:
- iterable containing all job vertices for this execution graph in the order they were created
-
getAllExecutionVertices
Iterable<ExecutionVertex> getAllExecutionVertices()
Description copied from interface:AccessExecutionGraph
Returns an iterable containing all execution vertices for this execution graph.- Specified by:
getAllExecutionVertices
in interfaceAccessExecutionGraph
- Returns:
- iterable containing all execution vertices for this execution graph
-
getJobVertex
ExecutionJobVertex getJobVertex(JobVertexID id)
Description copied from interface:AccessExecutionGraph
Returns the job vertex for the givenJobVertexID
.- Specified by:
getJobVertex
in interfaceAccessExecutionGraph
- Parameters:
id
- id of job vertex to be returned- Returns:
- job vertex for the given id, or
null
-
getAllVertices
Map<JobVertexID,ExecutionJobVertex> getAllVertices()
Description copied from interface:AccessExecutionGraph
Returns a map containing all job vertices for this execution graph.- Specified by:
getAllVertices
in interfaceAccessExecutionGraph
- Returns:
- map containing all job vertices for this execution graph
-
getNumberOfRestarts
long getNumberOfRestarts()
Gets the number of restarts, including full restarts and fine grained restarts. If a recovery is currently pending, this recovery is included in the count.- Returns:
- The number of restarts so far
-
getAllIntermediateResults
Map<IntermediateDataSetID,IntermediateResult> getAllIntermediateResults()
-
getResultPartitionOrThrow
IntermediateResultPartition getResultPartitionOrThrow(IntermediateResultPartitionID id)
Gets the intermediate result partition by the given partition ID, or throw an exception if the partition is not found.- Parameters:
id
- of the intermediate result partition- Returns:
- intermediate result partition
-
aggregateUserAccumulators
Map<String,OptionalFailure<Accumulator<?,?>>> aggregateUserAccumulators()
Merges all accumulator results from the tasks previously executed in the Executions.- Returns:
- The accumulator map
-
updateAccumulators
void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot)
Updates the accumulators during the runtime of a job. Final accumulator results are transferred through the UpdateTaskExecutionState message.- Parameters:
accumulatorSnapshot
- The serialized flink and user-defined accumulators
-
setInternalTaskFailuresListener
void setInternalTaskFailuresListener(InternalFailuresListener internalTaskFailuresListener)
-
attachJobGraph
void attachJobGraph(List<JobVertex> topologicallySorted, JobManagerJobMetricGroup jobManagerJobMetricGroup) throws JobException
- Throws:
JobException
-
transitionToRunning
void transitionToRunning()
-
cancel
void cancel()
-
suspend
void suspend(Throwable suspensionCause)
Suspends the current ExecutionGraph.The JobStatus will be directly set to
JobStatus.SUSPENDED
iff the current state is not a terminal state. All ExecutionJobVertices will be canceled and the onTerminalState() is executed.The
JobStatus.SUSPENDED
state is a local terminal state which stops the execution of the job but does not remove the job from the HA job store so that it can be recovered by another JobManager.- Parameters:
suspensionCause
- Cause of the suspension
-
failJob
void failJob(Throwable cause, long timestamp)
-
getTerminationFuture
CompletableFuture<JobStatus> getTerminationFuture()
Returns the termination future of thisExecutionGraph
. The termination future is completed with the terminalJobStatus
once the ExecutionGraph reaches this terminal state and allExecution
have been terminated.- Returns:
- Termination future of this
ExecutionGraph
.
-
waitUntilTerminal
@VisibleForTesting JobStatus waitUntilTerminal() throws InterruptedException
- Throws:
InterruptedException
-
incrementRestarts
void incrementRestarts()
-
initFailureCause
void initFailureCause(Throwable t, long timestamp)
-
updateState
boolean updateState(TaskExecutionStateTransition state)
Updates the state of one of the ExecutionVertex's Execution attempts. If the new status if "FINISHED", this also updates the accumulators.- Parameters:
state
- The state update.- Returns:
- True, if the task update was properly applied, false, if the execution attempt was not found.
-
getRegisteredExecutions
Map<ExecutionAttemptID,Execution> getRegisteredExecutions()
-
registerJobStatusListener
void registerJobStatusListener(JobStatusListener listener)
-
getResultPartitionAvailabilityChecker
ResultPartitionAvailabilityChecker getResultPartitionAvailabilityChecker()
-
getNumFinishedVertices
int getNumFinishedVertices()
-
getJobMasterMainThreadExecutor
@Nonnull ComponentMainThreadExecutor getJobMasterMainThreadExecutor()
-
initializeJobVertex
default void initializeJobVertex(ExecutionJobVertex ejv, long createTimestamp) throws JobException
- Throws:
JobException
-
initializeJobVertex
void initializeJobVertex(ExecutionJobVertex ejv, long createTimestamp, Map<IntermediateDataSetID,JobVertexInputInfo> jobVertexInputInfos) throws JobException
Initialize the given execution job vertex, mainly includes creating execution vertices according to the parallelism, and connecting to the predecessors.- Parameters:
ejv
- The execution job vertex that needs to be initialized.createTimestamp
- The timestamp for creating execution vertices, used to initialize the first Execution with.jobVertexInputInfos
- The input infos of this job vertex.- Throws:
JobException
-
notifyNewlyInitializedJobVertices
void notifyNewlyInitializedJobVertices(List<ExecutionJobVertex> vertices)
Notify that some job vertices have been newly initialized, execution graph will try to update scheduling topology.- Parameters:
vertices
- The execution job vertices that are newly initialized.
-
findVertexWithAttempt
Optional<String> findVertexWithAttempt(ExecutionAttemptID attemptId)
-
findExecution
Optional<AccessExecution> findExecution(ExecutionAttemptID attemptId)
-
-