public class ExecutionGraph extends Object implements AccessExecutionGraph
The execution graph consists of the following constructs:
ExecutionJobVertex
represents one vertex from the JobGraph (usually one
operation like "map" or "join") during execution. It holds the aggregated state of all
parallel subtasks. The ExecutionJobVertex is identified inside the graph by the JobVertexID
, which it takes from the JobGraph's corresponding JobVertex.
ExecutionVertex
represents one parallel subtask. For each ExecutionJobVertex,
there are as many ExecutionVertices as the parallelism. The ExecutionVertex is identified
by the ExecutionJobVertex and the index of the parallel subtask
Execution
is one attempt to execute a ExecutionVertex. There may be multiple
Executions for the ExecutionVertex, in case of a failure, or in the case where some data
needs to be recomputed because it is no longer available when requested by later
operations. An Execution is always identified by an ExecutionAttemptID
. All
messages between the JobManager and the TaskManager about deployment of tasks and updates
in the task status always use the ExecutionAttemptID to address the message receiver.
The Execution Graph has two failover modes: global failover and local failover.
A global failover aborts the task executions for all vertices and restarts whole data flow graph from the last completed checkpoint. Global failover is considered the "fallback strategy" that is used when a local failover is unsuccessful, or when a issue is found in the state of the ExecutionGraph that could mark it as inconsistent (caused by a bug).
A local failover is triggered when an individual vertex execution (a task) fails. The
local failover is coordinated by the FailoverStrategy
. A local failover typically
attempts to restart as little as possible, but as much as necessary.
Between local- and global failover, the global failover always takes precedence, because it is the core mechanism that the ExecutionGraph relies on to bring back consistency. The guard that, the ExecutionGraph maintains a global modification version, which is incremented with every global failover (and other global actions, like job cancellation, or terminal failure). Local failover is always scoped by the modification version that the execution graph had when the failover was triggered. If a new global modification version is reached during local failover (meaning there is a concurrent global failover), the failover strategy has to yield before the global failover.
Constructor and Description |
---|
ExecutionGraph(JobInformation jobInformation,
ScheduledExecutorService futureExecutor,
Executor ioExecutor,
Time rpcTimeout,
RestartStrategy restartStrategy,
int maxPriorAttemptsHistoryLength,
FailoverStrategy.Factory failoverStrategyFactory,
SlotProvider slotProvider,
ClassLoader userClassLoader,
BlobWriter blobWriter,
Time allocationTimeout,
PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory,
ShuffleMaster<?> shuffleMaster,
JobMasterPartitionTracker partitionTracker,
ScheduleMode scheduleMode) |
Modifier and Type | Method and Description |
---|---|
Map<String,OptionalFailure<Accumulator<?,?>>> |
aggregateUserAccumulators()
Merges all accumulator results from the tasks previously executed in the Executions.
|
void |
attachJobGraph(List<JobVertex> topologiallySorted) |
void |
cancel() |
void |
enableCheckpointing(CheckpointCoordinatorConfiguration chkConfig,
List<ExecutionJobVertex> verticesToTrigger,
List<ExecutionJobVertex> verticesToWaitFor,
List<ExecutionJobVertex> verticesToCommitTo,
List<MasterTriggerRestoreHook<?>> masterHooks,
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore checkpointStore,
StateBackend checkpointStateBackend,
CheckpointStatsTracker statsTracker) |
void |
enableNgScheduling(InternalFailuresListener internalTaskFailuresListener) |
void |
failGlobal(Throwable t)
Fails the execution graph globally.
|
void |
failJob(Throwable cause) |
StringifiedAccumulatorResult[] |
getAccumulatorResultsStringified()
Returns the a stringified version of the user-defined accumulators.
|
Map<String,SerializedValue<OptionalFailure<Object>>> |
getAccumulatorsSerialized()
Gets a serialized accumulator map.
|
Iterable<ExecutionVertex> |
getAllExecutionVertices()
Returns an iterable containing all execution vertices for this execution graph.
|
Map<IntermediateDataSetID,IntermediateResult> |
getAllIntermediateResults() |
Time |
getAllocationTimeout() |
Map<JobVertexID,ExecutionJobVertex> |
getAllVertices()
Returns a map containing all job vertices for this execution graph.
|
ArchivedExecutionConfig |
getArchivedExecutionConfig()
Returns the serializable
ArchivedExecutionConfig . |
BlobWriter |
getBlobWriter() |
CheckpointCoordinator |
getCheckpointCoordinator() |
CheckpointCoordinatorConfiguration |
getCheckpointCoordinatorConfiguration()
Returns the
CheckpointCoordinatorConfiguration or null if checkpointing
is disabled. |
CheckpointStatsSnapshot |
getCheckpointStatsSnapshot()
Returns a snapshot of the checkpoint statistics or
null if checkpointing is
disabled. |
FailoverStrategy |
getFailoverStrategy()
Gets the failover strategy used by the execution graph to recover from failures of tasks.
|
Throwable |
getFailureCause() |
ErrorInfo |
getFailureInfo()
Returns the exception that caused the job to fail.
|
Executor |
getFutureExecutor()
Returns the ExecutionContext associated with this ExecutionGraph.
|
long |
getGlobalModVersion()
Gets the current global modification version of the ExecutionGraph.
|
Configuration |
getJobConfiguration() |
JobID |
getJobID()
Returns the
JobID for this execution graph. |
Either<SerializedValue<JobInformation>,PermanentBlobKey> |
getJobInformationOrBlobKey() |
ComponentMainThreadExecutor |
getJobMasterMainThreadExecutor() |
String |
getJobName()
Returns the job name for the execution graph.
|
ExecutionJobVertex |
getJobVertex(JobVertexID id)
Returns the job vertex for the given
JobVertexID . |
String |
getJsonPlan()
Returns the job plan as a JSON string.
|
KvStateLocationRegistry |
getKvStateLocationRegistry() |
int |
getNumberOfExecutionJobVertices()
Gets the number of job vertices currently held by this execution graph.
|
long |
getNumberOfRestarts()
Gets the number of restarts, including full restarts and fine grained restarts.
|
JobMasterPartitionTracker |
getPartitionTracker() |
Map<ExecutionAttemptID,Execution> |
getRegisteredExecutions() |
RestartStrategy |
getRestartStrategy() |
ResultPartitionAvailabilityChecker |
getResultPartitionAvailabilityChecker() |
ScheduleMode |
getScheduleMode() |
SchedulingTopology |
getSchedulingTopology() |
SlotProviderStrategy |
getSlotProviderStrategy() |
JobStatus |
getState()
Returns the current
JobStatus for this execution graph. |
Optional<String> |
getStateBackendName()
Returns the state backend name for this ExecutionGraph.
|
long |
getStatusTimestamp(JobStatus status)
Returns the timestamp for the given
JobStatus . |
CompletableFuture<JobStatus> |
getTerminationFuture()
Returns the termination future of this
ExecutionGraph . |
int |
getTotalNumberOfVertices() |
ClassLoader |
getUserClassLoader() |
Iterable<ExecutionJobVertex> |
getVerticesTopologically()
Returns an iterable containing all job vertices for this execution graph in the order they
were created.
|
void |
incrementRestarts() |
void |
initFailureCause(Throwable t) |
boolean |
isArchived()
Returns whether this execution graph was archived.
|
boolean |
isLegacyScheduling() |
boolean |
isStoppable()
Returns whether the job for this execution graph is stoppable.
|
void |
registerJobStatusListener(JobStatusListener listener) |
void |
restart(long expectedGlobalVersion) |
void |
scheduleForExecution() |
void |
scheduleOrUpdateConsumers(ResultPartitionID partitionId)
Schedule or updates consumers of the given result partition.
|
void |
setJsonPlan(String jsonPlan) |
void |
start(ComponentMainThreadExecutor jobMasterMainThreadExecutor) |
void |
suspend(Throwable suspensionCause)
Suspends the current ExecutionGraph.
|
boolean |
transitionState(JobStatus current,
JobStatus newState) |
void |
transitionToRunning() |
void |
updateAccumulators(AccumulatorSnapshot accumulatorSnapshot)
Updates the accumulators during the runtime of a job.
|
boolean |
updateState(TaskExecutionState state)
Updates the state of one of the ExecutionVertex's Execution attempts.
|
JobStatus |
waitUntilTerminal() |
public ExecutionGraph(JobInformation jobInformation, ScheduledExecutorService futureExecutor, Executor ioExecutor, Time rpcTimeout, RestartStrategy restartStrategy, int maxPriorAttemptsHistoryLength, FailoverStrategy.Factory failoverStrategyFactory, SlotProvider slotProvider, ClassLoader userClassLoader, BlobWriter blobWriter, Time allocationTimeout, PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory, ShuffleMaster<?> shuffleMaster, JobMasterPartitionTracker partitionTracker, ScheduleMode scheduleMode) throws IOException
IOException
public void start(@Nonnull ComponentMainThreadExecutor jobMasterMainThreadExecutor)
public int getNumberOfExecutionJobVertices()
public SchedulingTopology getSchedulingTopology()
public ScheduleMode getScheduleMode()
public Time getAllocationTimeout()
@Nonnull public ComponentMainThreadExecutor getJobMasterMainThreadExecutor()
public boolean isArchived()
AccessExecutionGraph
isArchived
in interface AccessExecutionGraph
public Optional<String> getStateBackendName()
AccessExecutionGraph
getStateBackendName
in interface AccessExecutionGraph
public void enableCheckpointing(CheckpointCoordinatorConfiguration chkConfig, List<ExecutionJobVertex> verticesToTrigger, List<ExecutionJobVertex> verticesToWaitFor, List<ExecutionJobVertex> verticesToCommitTo, List<MasterTriggerRestoreHook<?>> masterHooks, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore checkpointStore, StateBackend checkpointStateBackend, CheckpointStatsTracker statsTracker)
@Nullable public CheckpointCoordinator getCheckpointCoordinator()
public KvStateLocationRegistry getKvStateLocationRegistry()
public RestartStrategy getRestartStrategy()
public CheckpointCoordinatorConfiguration getCheckpointCoordinatorConfiguration()
AccessExecutionGraph
CheckpointCoordinatorConfiguration
or null
if checkpointing
is disabled.getCheckpointCoordinatorConfiguration
in interface AccessExecutionGraph
public CheckpointStatsSnapshot getCheckpointStatsSnapshot()
AccessExecutionGraph
null
if checkpointing is
disabled.getCheckpointStatsSnapshot
in interface AccessExecutionGraph
public void setJsonPlan(String jsonPlan)
public String getJsonPlan()
AccessExecutionGraph
getJsonPlan
in interface AccessExecutionGraph
public SlotProviderStrategy getSlotProviderStrategy()
public Either<SerializedValue<JobInformation>,PermanentBlobKey> getJobInformationOrBlobKey()
public JobID getJobID()
AccessExecutionGraph
JobID
for this execution graph.getJobID
in interface AccessExecutionGraph
public String getJobName()
AccessExecutionGraph
getJobName
in interface AccessExecutionGraph
public boolean isStoppable()
AccessExecutionGraph
isStoppable
in interface AccessExecutionGraph
public Configuration getJobConfiguration()
public ClassLoader getUserClassLoader()
public JobStatus getState()
AccessExecutionGraph
JobStatus
for this execution graph.getState
in interface AccessExecutionGraph
public Throwable getFailureCause()
public ErrorInfo getFailureInfo()
AccessExecutionGraph
getFailureInfo
in interface AccessExecutionGraph
public long getNumberOfRestarts()
public ExecutionJobVertex getJobVertex(JobVertexID id)
AccessExecutionGraph
JobVertexID
.getJobVertex
in interface AccessExecutionGraph
id
- id of job vertex to be returnednull
public Map<JobVertexID,ExecutionJobVertex> getAllVertices()
AccessExecutionGraph
getAllVertices
in interface AccessExecutionGraph
public Iterable<ExecutionJobVertex> getVerticesTopologically()
AccessExecutionGraph
getVerticesTopologically
in interface AccessExecutionGraph
public int getTotalNumberOfVertices()
public Map<IntermediateDataSetID,IntermediateResult> getAllIntermediateResults()
public Iterable<ExecutionVertex> getAllExecutionVertices()
AccessExecutionGraph
getAllExecutionVertices
in interface AccessExecutionGraph
public long getStatusTimestamp(JobStatus status)
AccessExecutionGraph
JobStatus
.getStatusTimestamp
in interface AccessExecutionGraph
status
- status for which the timestamp should be returnedpublic final BlobWriter getBlobWriter()
public Executor getFutureExecutor()
public Map<String,OptionalFailure<Accumulator<?,?>>> aggregateUserAccumulators()
public Map<String,SerializedValue<OptionalFailure<Object>>> getAccumulatorsSerialized()
getAccumulatorsSerialized
in interface AccessExecutionGraph
public StringifiedAccumulatorResult[] getAccumulatorResultsStringified()
getAccumulatorResultsStringified
in interface AccessExecutionGraph
public void enableNgScheduling(InternalFailuresListener internalTaskFailuresListener)
public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException
JobException
public boolean isLegacyScheduling()
public void transitionToRunning()
public void scheduleForExecution() throws JobException
JobException
public void cancel()
public void suspend(Throwable suspensionCause)
The JobStatus will be directly set to JobStatus.SUSPENDED
iff the current state is
not a terminal state. All ExecutionJobVertices will be canceled and the onTerminalState() is
executed.
The JobStatus.SUSPENDED
state is a local terminal state which stops the execution
of the job but does not remove the job from the HA job store so that it can be recovered by
another JobManager.
suspensionCause
- Cause of the suspensionpublic void failGlobal(Throwable t)
This global failure is meant to be triggered in cases where the consistency of the execution graph' state cannot be guaranteed any more (for example when catching unexpected exceptions that indicate a bug or an unexpected call race), and where a full restart is the safe way to get consistency back.
t
- The exception that caused the failure.public void restart(long expectedGlobalVersion)
public ArchivedExecutionConfig getArchivedExecutionConfig()
ArchivedExecutionConfig
.getArchivedExecutionConfig
in interface AccessExecutionGraph
public CompletableFuture<JobStatus> getTerminationFuture()
ExecutionGraph
. The termination future is
completed with the terminal JobStatus
once the ExecutionGraph reaches this terminal
state and all Execution
have been terminated.ExecutionGraph
.@VisibleForTesting public JobStatus waitUntilTerminal() throws InterruptedException
InterruptedException
public FailoverStrategy getFailoverStrategy()
public long getGlobalModVersion()
public void incrementRestarts()
public void initFailureCause(Throwable t)
public void failJob(Throwable cause)
public boolean updateState(TaskExecutionState state)
state
- The state update.public void scheduleOrUpdateConsumers(ResultPartitionID partitionId) throws ExecutionGraphException
partitionId
- specifying the result partition whose consumer shall be scheduled or
updatedExecutionGraphException
- if the schedule or update consumers operation could not be
executedpublic Map<ExecutionAttemptID,Execution> getRegisteredExecutions()
public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot)
accumulatorSnapshot
- The serialized flink and user-defined accumulatorspublic void registerJobStatusListener(JobStatusListener listener)
public JobMasterPartitionTracker getPartitionTracker()
public ResultPartitionAvailabilityChecker getResultPartitionAvailabilityChecker()
Copyright © 2014–2021 The Apache Software Foundation. All rights reserved.