Package org.apache.flink.runtime.state
Class TaskStateManagerImpl
- java.lang.Object
-
- org.apache.flink.runtime.state.TaskStateManagerImpl
-
- All Implemented Interfaces:
AutoCloseable
,CheckpointListener
,TaskStateManager
public class TaskStateManagerImpl extends Object implements TaskStateManager
This class is the default implementation ofTaskStateManager
and collaborates with the job manager throughCheckpointResponder
) as well as a task-manager-local state store. Like this, client code does not have to deal with the differences between remote or local state on recovery because this class handles both cases transparently.Reported state is tagged by clients so that this class can properly forward to the right receiver for the checkpointed state.
-
-
Constructor Summary
Constructors Constructor Description TaskStateManagerImpl(JobID jobId, ExecutionAttemptID executionAttemptID, TaskLocalStateStore localStateStore, FileMergingSnapshotManagerClosableWrapper fileMergingSnapshotManager, StateChangelogStorage<?> stateChangelogStorage, TaskExecutorStateChangelogStoragesManager changelogStoragesManager, JobManagerTaskRestore jobManagerTaskRestore, CheckpointResponder checkpointResponder)
TaskStateManagerImpl(JobID jobId, ExecutionAttemptID executionAttemptID, TaskLocalStateStore localStateStore, FileMergingSnapshotManagerClosableWrapper fileMergingSnapshotManager, StateChangelogStorage<?> stateChangelogStorage, TaskExecutorStateChangelogStoragesManager changelogStoragesManager, JobManagerTaskRestore jobManagerTaskRestore, CheckpointResponder checkpointResponder, SequentialChannelStateReaderImpl sequentialChannelStateReader)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
close()
LocalRecoveryConfig
createLocalRecoveryConfig()
Returns the configuration for local recovery, i.e. the base directories for all file-based local state of the owning subtask and the general mode for local recovery.FileMergingSnapshotManager
getFileMergingSnapshotManager()
InflightDataRescalingDescriptor
getInputRescalingDescriptor()
InflightDataRescalingDescriptor
getOutputRescalingDescriptor()
Optional<Long>
getRestoreCheckpointId()
Acquires the checkpoint id to restore from.SequentialChannelStateReader
getSequentialChannelStateReader()
StateChangelogStorage<?>
getStateChangelogStorage()
Returns the configured state changelog storage for this task.StateChangelogStorageView<?>
getStateChangelogStorageView(Configuration configuration, ChangelogStateHandle changelogStateHandle)
Returns the state changelog storage view of givenChangelogStateHandle
for this task.Optional<OperatorSubtaskState>
getSubtaskJobManagerRestoredState(OperatorID operatorID)
Get the restored state from jobManager which belongs to an operator running in the owning task.boolean
isTaskDeployedAsFinished()
Whether all the operators of the task are finished on restore.void
notifyCheckpointAborted(long checkpointId)
Tracking when some local state can be disposed.void
notifyCheckpointComplete(long checkpointId)
Tracking when local state can be confirmed and disposed.PrioritizedOperatorSubtaskState
prioritizedOperatorState(OperatorID operatorID)
Returns means to restore previously reported state of an operator running in the owning task.void
reportIncompleteTaskStateSnapshots(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics)
Report the stats for state snapshots for an aborted checkpoint.void
reportInitializationMetrics(SubTaskInitializationMetrics subTaskInitializationMetrics)
void
reportTaskStateSnapshots(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics, TaskStateSnapshot acknowledgedState, TaskStateSnapshot localState)
Report the state snapshots for the operator instances running in the owning task.
-
-
-
Constructor Detail
-
TaskStateManagerImpl
public TaskStateManagerImpl(@Nonnull JobID jobId, @Nonnull ExecutionAttemptID executionAttemptID, @Nonnull TaskLocalStateStore localStateStore, @Nullable FileMergingSnapshotManagerClosableWrapper fileMergingSnapshotManager, @Nullable StateChangelogStorage<?> stateChangelogStorage, @Nonnull TaskExecutorStateChangelogStoragesManager changelogStoragesManager, @Nullable JobManagerTaskRestore jobManagerTaskRestore, @Nonnull CheckpointResponder checkpointResponder)
-
TaskStateManagerImpl
public TaskStateManagerImpl(@Nonnull JobID jobId, @Nonnull ExecutionAttemptID executionAttemptID, @Nonnull TaskLocalStateStore localStateStore, @Nullable FileMergingSnapshotManagerClosableWrapper fileMergingSnapshotManager, @Nullable StateChangelogStorage<?> stateChangelogStorage, @Nonnull TaskExecutorStateChangelogStoragesManager changelogStoragesManager, @Nullable JobManagerTaskRestore jobManagerTaskRestore, @Nonnull CheckpointResponder checkpointResponder, @Nonnull SequentialChannelStateReaderImpl sequentialChannelStateReader)
-
-
Method Detail
-
reportInitializationMetrics
public void reportInitializationMetrics(SubTaskInitializationMetrics subTaskInitializationMetrics)
- Specified by:
reportInitializationMetrics
in interfaceTaskStateManager
-
reportTaskStateSnapshots
public void reportTaskStateSnapshots(@Nonnull CheckpointMetaData checkpointMetaData, @Nonnull CheckpointMetrics checkpointMetrics, @Nullable TaskStateSnapshot acknowledgedState, @Nullable TaskStateSnapshot localState)
Description copied from interface:TaskStateManager
Report the state snapshots for the operator instances running in the owning task.- Specified by:
reportTaskStateSnapshots
in interfaceTaskStateManager
- Parameters:
checkpointMetaData
- meta data from the checkpoint request.checkpointMetrics
- task level metrics for the checkpoint.acknowledgedState
- the reported states to acknowledge to the job manager.localState
- the reported states for local recovery.
-
reportIncompleteTaskStateSnapshots
public void reportIncompleteTaskStateSnapshots(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics)
Description copied from interface:TaskStateManager
Report the stats for state snapshots for an aborted checkpoint.- Specified by:
reportIncompleteTaskStateSnapshots
in interfaceTaskStateManager
- Parameters:
checkpointMetaData
- meta data from the checkpoint request.checkpointMetrics
- task level metrics for the checkpoint.
-
getInputRescalingDescriptor
public InflightDataRescalingDescriptor getInputRescalingDescriptor()
- Specified by:
getInputRescalingDescriptor
in interfaceTaskStateManager
-
getOutputRescalingDescriptor
public InflightDataRescalingDescriptor getOutputRescalingDescriptor()
- Specified by:
getOutputRescalingDescriptor
in interfaceTaskStateManager
-
isTaskDeployedAsFinished
public boolean isTaskDeployedAsFinished()
Description copied from interface:TaskStateManager
Whether all the operators of the task are finished on restore.- Specified by:
isTaskDeployedAsFinished
in interfaceTaskStateManager
-
getRestoreCheckpointId
public Optional<Long> getRestoreCheckpointId()
Description copied from interface:TaskStateManager
Acquires the checkpoint id to restore from.- Specified by:
getRestoreCheckpointId
in interfaceTaskStateManager
-
prioritizedOperatorState
public PrioritizedOperatorSubtaskState prioritizedOperatorState(OperatorID operatorID)
Description copied from interface:TaskStateManager
Returns means to restore previously reported state of an operator running in the owning task.- Specified by:
prioritizedOperatorState
in interfaceTaskStateManager
- Parameters:
operatorID
- the id of the operator for which we request state.- Returns:
- Previous state for the operator. The previous state can be empty if the operator had no previous state.
-
getSubtaskJobManagerRestoredState
public Optional<OperatorSubtaskState> getSubtaskJobManagerRestoredState(OperatorID operatorID)
Description copied from interface:TaskStateManager
Get the restored state from jobManager which belongs to an operator running in the owning task.- Specified by:
getSubtaskJobManagerRestoredState
in interfaceTaskStateManager
- Parameters:
operatorID
- the id of the operator for which we request state.- Returns:
- the subtask restored state from jobManager.
-
createLocalRecoveryConfig
@Nonnull public LocalRecoveryConfig createLocalRecoveryConfig()
Description copied from interface:TaskStateManager
Returns the configuration for local recovery, i.e. the base directories for all file-based local state of the owning subtask and the general mode for local recovery.- Specified by:
createLocalRecoveryConfig
in interfaceTaskStateManager
-
getSequentialChannelStateReader
public SequentialChannelStateReader getSequentialChannelStateReader()
- Specified by:
getSequentialChannelStateReader
in interfaceTaskStateManager
-
getStateChangelogStorage
@Nullable public StateChangelogStorage<?> getStateChangelogStorage()
Description copied from interface:TaskStateManager
Returns the configured state changelog storage for this task.- Specified by:
getStateChangelogStorage
in interfaceTaskStateManager
-
getStateChangelogStorageView
@Nullable public StateChangelogStorageView<?> getStateChangelogStorageView(Configuration configuration, ChangelogStateHandle changelogStateHandle)
Description copied from interface:TaskStateManager
Returns the state changelog storage view of givenChangelogStateHandle
for this task.- Specified by:
getStateChangelogStorageView
in interfaceTaskStateManager
-
getFileMergingSnapshotManager
@Nullable public FileMergingSnapshotManager getFileMergingSnapshotManager()
- Specified by:
getFileMergingSnapshotManager
in interfaceTaskStateManager
-
notifyCheckpointComplete
public void notifyCheckpointComplete(long checkpointId) throws Exception
Tracking when local state can be confirmed and disposed.- Specified by:
notifyCheckpointComplete
in interfaceCheckpointListener
- Parameters:
checkpointId
- The ID of the checkpoint that has been completed.- Throws:
Exception
- This method can propagate exceptions, which leads to a failure/recovery for the task. Note that this will NOT lead to the checkpoint being revoked.
-
notifyCheckpointAborted
public void notifyCheckpointAborted(long checkpointId)
Tracking when some local state can be disposed.- Specified by:
notifyCheckpointAborted
in interfaceCheckpointListener
- Parameters:
checkpointId
- The ID of the checkpoint that has been aborted.
-
close
public void close() throws Exception
- Specified by:
close
in interfaceAutoCloseable
- Throws:
Exception
-
-