@Public public class CheckpointConfig extends Object implements Serializable
Modifier and Type | Class and Description |
---|---|
static class |
CheckpointConfig.ExternalizedCheckpointCleanup
Cleanup behaviour for externalized checkpoints when the job is cancelled.
|
Modifier and Type | Field and Description |
---|---|
static int |
DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA
Deprecated.
This field is no longer used. Please use
ExecutionCheckpointingOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA instead. |
static int |
DEFAULT_MAX_CONCURRENT_CHECKPOINTS
Deprecated.
This field is no longer used. Please use
ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS instead. |
static long |
DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS
Deprecated.
This field is no longer used. Please use
ExecutionCheckpointingOptions.MIN_PAUSE_BETWEEN_CHECKPOINTS instead. |
static CheckpointingMode |
DEFAULT_MODE
Deprecated.
|
static long |
DEFAULT_TIMEOUT
Deprecated.
This field is no longer used. Please use
ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT instead. |
static int |
UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER
Deprecated.
This field is no longer used.
|
Constructor and Description |
---|
CheckpointConfig() |
CheckpointConfig(CheckpointConfig checkpointConfig)
Creates a deep copy of the provided
CheckpointConfig . |
Modifier and Type | Method and Description |
---|---|
void |
configure(ReadableConfig configuration)
Sets all relevant options contained in the
ReadableConfig such as e.g. |
void |
disableCheckpointing()
Disables checkpointing.
|
void |
enableApproximateLocalRecovery(boolean enabled)
Enables the approximate local recovery mode.
|
void |
enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup cleanupMode)
Deprecated.
|
void |
enableUnalignedCheckpoints()
Enables unaligned checkpoints, which greatly reduce checkpointing times under backpressure.
|
void |
enableUnalignedCheckpoints(boolean enabled)
Enables unaligned checkpoints, which greatly reduce checkpointing times under backpressure.
|
java.time.Duration |
getAlignedCheckpointTimeout() |
java.time.Duration |
getAlignmentTimeout()
Deprecated.
User
getAlignedCheckpointTimeout() instead. |
long |
getCheckpointIdOfIgnoredInFlightData() |
CheckpointingMode |
getCheckpointingMode()
Gets the checkpointing mode (exactly-once vs.
|
long |
getCheckpointInterval()
Gets the interval in which checkpoints are periodically scheduled.
|
CheckpointStorage |
getCheckpointStorage() |
long |
getCheckpointTimeout()
Gets the maximum time that a checkpoint may take before being discarded.
|
CheckpointConfig.ExternalizedCheckpointCleanup |
getExternalizedCheckpointCleanup()
Returns the cleanup behaviour for externalized checkpoints.
|
int |
getMaxConcurrentCheckpoints()
Gets the maximum number of checkpoint attempts that may be in progress at the same time.
|
int |
getMaxSubtasksPerChannelStateFile() |
long |
getMinPauseBetweenCheckpoints()
Gets the minimal pause between checkpointing attempts.
|
int |
getTolerableCheckpointFailureNumber()
Get the defined number of consecutive checkpoint failures that will be tolerated, before the
whole job is failed over.
|
boolean |
isApproximateLocalRecoveryEnabled()
Returns whether approximate local recovery is enabled.
|
boolean |
isCheckpointingEnabled()
Checks whether checkpointing is enabled.
|
boolean |
isExternalizedCheckpointsEnabled()
Returns whether checkpoints should be persisted externally.
|
boolean |
isFailOnCheckpointingErrors()
Deprecated.
|
boolean |
isForceCheckpointing()
Deprecated.
This will be removed once iterations properly participate in checkpointing.
|
boolean |
isForceUnalignedCheckpoints()
Checks whether unaligned checkpoints are forced, despite iteration feedback.
|
boolean |
isUnalignedCheckpointsEnabled()
Returns whether unaligned checkpoints are enabled.
|
void |
setAlignedCheckpointTimeout(java.time.Duration alignedCheckpointTimeout)
Only relevant if
ExecutionCheckpointingOptions.ENABLE_UNALIGNED is enabled. |
void |
setAlignmentTimeout(java.time.Duration alignmentTimeout)
Deprecated.
Use
setAlignedCheckpointTimeout(Duration) instead. |
void |
setCheckpointIdOfIgnoredInFlightData(long checkpointIdOfIgnoredInFlightData)
Setup the checkpoint id for which the in-flight data will be ignored for all operators in
case of the recovery from this checkpoint.
|
void |
setCheckpointingMode(CheckpointingMode checkpointingMode)
Sets the checkpointing mode (exactly-once vs.
|
void |
setCheckpointInterval(long checkpointInterval)
Sets the interval in which checkpoints are periodically scheduled.
|
void |
setCheckpointStorage(CheckpointStorage storage)
CheckpointStorage defines how
StateBackend 's checkpoint their state for fault
tolerance in streaming applications. |
void |
setCheckpointStorage(Path checkpointDirectory)
Configures the application to write out checkpoint snapshots to the configured directory.
|
void |
setCheckpointStorage(String checkpointDirectory)
Configures the application to write out checkpoint snapshots to the configured directory.
|
void |
setCheckpointStorage(URI checkpointDirectory)
Configures the application to write out checkpoint snapshots to the configured directory.
|
void |
setCheckpointTimeout(long checkpointTimeout)
Sets the maximum time that a checkpoint may take before being discarded.
|
void |
setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup cleanupMode)
Sets the mode for externalized checkpoint clean-up.
|
void |
setFailOnCheckpointingErrors(boolean failOnCheckpointingErrors)
Deprecated.
|
void |
setForceCheckpointing(boolean forceCheckpointing)
Deprecated.
This will be removed once iterations properly participate in checkpointing.
|
void |
setForceUnalignedCheckpoints(boolean forceUnalignedCheckpoints)
Checks whether unaligned checkpoints are forced, despite currently non-checkpointable
iteration feedback or custom partitioners.
|
void |
setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints)
Sets the maximum number of checkpoint attempts that may be in progress at the same time.
|
void |
setMaxSubtasksPerChannelStateFile(int maxSubtasksPerChannelStateFile)
The number of subtasks to share the same channel state file.
|
void |
setMinPauseBetweenCheckpoints(long minPauseBetweenCheckpoints)
Sets the minimal pause between checkpointing attempts.
|
void |
setTolerableCheckpointFailureNumber(int tolerableCheckpointFailureNumber)
This defines how many consecutive checkpoint failures will be tolerated, before the whole job
is failed over.
|
Configuration |
toConfiguration() |
@Deprecated public static final CheckpointingMode DEFAULT_MODE
@Deprecated public static final long DEFAULT_TIMEOUT
ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT
instead.@Deprecated public static final long DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS
ExecutionCheckpointingOptions.MIN_PAUSE_BETWEEN_CHECKPOINTS
instead.@Deprecated public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS
ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS
instead.@Deprecated public static final int UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER
@Deprecated public static final int DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA
ExecutionCheckpointingOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA
instead.public CheckpointConfig(CheckpointConfig checkpointConfig)
CheckpointConfig
.checkpointConfig
- the config to copy.public CheckpointConfig()
public void disableCheckpointing()
public boolean isCheckpointingEnabled()
public CheckpointingMode getCheckpointingMode()
public void setCheckpointingMode(CheckpointingMode checkpointingMode)
checkpointingMode
- The checkpointing mode.public long getCheckpointInterval()
This setting defines the base interval. Checkpoint triggering may be delayed by the
settings getMaxConcurrentCheckpoints()
and getMinPauseBetweenCheckpoints()
.
public void setCheckpointInterval(long checkpointInterval)
This setting defines the base interval. Checkpoint triggering may be delayed by the
settings setMaxConcurrentCheckpoints(int)
and setMinPauseBetweenCheckpoints(long)
.
checkpointInterval
- The checkpoint interval, in milliseconds.public long getCheckpointTimeout()
public void setCheckpointTimeout(long checkpointTimeout)
checkpointTimeout
- The checkpoint timeout, in milliseconds.public long getMinPauseBetweenCheckpoints()
getMaxConcurrentCheckpoints()
).public void setMinPauseBetweenCheckpoints(long minPauseBetweenCheckpoints)
setMaxConcurrentCheckpoints(int)
).
If the maximum number of concurrent checkpoints is set to one, this setting makes effectively sure that a minimum amount of time passes where no checkpoint is in progress at all.
minPauseBetweenCheckpoints
- The minimal pause before the next checkpoint is triggered.public int getMaxConcurrentCheckpoints()
public void setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints)
maxConcurrentCheckpoints
- The maximum number of concurrent checkpoint attempts.@Deprecated @PublicEvolving public boolean isForceCheckpointing()
@Deprecated @PublicEvolving public void setForceCheckpointing(boolean forceCheckpointing)
forceCheckpointing
- The flag to force checkpointing.@PublicEvolving public boolean isForceUnalignedCheckpoints()
@PublicEvolving public void setForceUnalignedCheckpoints(boolean forceUnalignedCheckpoints)
forceUnalignedCheckpoints
- The flag to force unaligned checkpoints.@Deprecated public boolean isFailOnCheckpointingErrors()
getTolerableCheckpointFailureNumber()
.@Deprecated public void setFailOnCheckpointingErrors(boolean failOnCheckpointingErrors)
setTolerableCheckpointFailureNumber(int)
.setTolerableCheckpointFailureNumber(int)
would always overrule this deprecated
method if they have conflicts.
public int getTolerableCheckpointFailureNumber()
If the ExecutionCheckpointingOptions.TOLERABLE_FAILURE_NUMBER
has not been
configured, this method would return 0 which means the checkpoint failure manager would not
tolerate any declined checkpoint failure.
public void setTolerableCheckpointFailureNumber(int tolerableCheckpointFailureNumber)
@PublicEvolving public void setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup cleanupMode)
CheckpointConfig.ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS
.
Externalized checkpoints write their meta data out to persistent storage and are
not automatically cleaned up when the owning job fails or is suspended
(terminating with job status JobStatus.FAILED
or JobStatus.SUSPENDED
). In
this case, you have to manually clean up the checkpoint state, both the meta data and actual
program state.
The CheckpointConfig.ExternalizedCheckpointCleanup
mode defines how an externalized checkpoint
should be cleaned up on job cancellation. If you choose to retain externalized checkpoints on
cancellation you have to handle checkpoint clean-up manually when you cancel the job as well
(terminating with job status JobStatus.CANCELED
).
The target directory for externalized checkpoints is configured via CheckpointingOptions.CHECKPOINTS_DIRECTORY
.
cleanupMode
- Externalized checkpoint clean-up behaviour.@PublicEvolving @Deprecated public void enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup cleanupMode)
setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup)
instead.CheckpointConfig.ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS
.
Externalized checkpoints write their meta data out to persistent storage and are
not automatically cleaned up when the owning job fails or is suspended
(terminating with job status JobStatus.FAILED
or JobStatus.SUSPENDED
). In
this case, you have to manually clean up the checkpoint state, both the meta data and actual
program state.
The CheckpointConfig.ExternalizedCheckpointCleanup
mode defines how an externalized checkpoint
should be cleaned up on job cancellation. If you choose to retain externalized checkpoints on
cancellation you have to handle checkpoint clean-up manually when you cancel the job as well
(terminating with job status JobStatus.CANCELED
).
The target directory for externalized checkpoints is configured via CheckpointingOptions.CHECKPOINTS_DIRECTORY
.
cleanupMode
- Externalized checkpoint clean-up behaviour.@PublicEvolving public boolean isExternalizedCheckpointsEnabled()
true
if checkpoints should be externalized.@PublicEvolving public void enableUnalignedCheckpoints(boolean enabled)
Unaligned checkpoints contain data stored in buffers as part of the checkpoint state, which allows checkpoint barriers to overtake these buffers. Thus, the checkpoint duration becomes independent of the current throughput as checkpoint barriers are effectively not embedded into the stream of data anymore.
Unaligned checkpoints can only be enabled if ExecutionCheckpointingOptions.CHECKPOINTING_MODE
is CheckpointingMode.EXACTLY_ONCE
.
enabled
- Flag to indicate whether unaligned are enabled.@PublicEvolving public void enableUnalignedCheckpoints()
Unaligned checkpoints contain data stored in buffers as part of the checkpoint state, which allows checkpoint barriers to overtake these buffers. Thus, the checkpoint duration becomes independent of the current throughput as checkpoint barriers are effectively not embedded into the stream of data anymore.
Unaligned checkpoints can only be enabled if ExecutionCheckpointingOptions.CHECKPOINTING_MODE
is CheckpointingMode.EXACTLY_ONCE
.
@PublicEvolving public boolean isUnalignedCheckpointsEnabled()
true
if unaligned checkpoints are enabled.@Deprecated @PublicEvolving public void setAlignmentTimeout(java.time.Duration alignmentTimeout)
setAlignedCheckpointTimeout(Duration)
instead.isUnalignedCheckpointsEnabled()
is enabled.
If ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT
has value equal to
0
, checkpoints will always start unaligned.
If ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT
has value greater then
0
, checkpoints will start aligned. If during checkpointing, checkpoint start
delay exceeds this ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT
,
alignment will timeout and checkpoint will start working as unaligned checkpoint.
@Deprecated @PublicEvolving public java.time.Duration getAlignmentTimeout()
getAlignedCheckpointTimeout()
instead.setAlignmentTimeout(Duration)
or ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT
.@PublicEvolving public java.time.Duration getAlignedCheckpointTimeout()
setAlignedCheckpointTimeout(Duration)
or ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT
.@PublicEvolving public void setAlignedCheckpointTimeout(java.time.Duration alignedCheckpointTimeout)
ExecutionCheckpointingOptions.ENABLE_UNALIGNED
is enabled.
If ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT
has value equal to
0
, checkpoints will
always start unaligned.
If ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT
has value greater then
0
, checkpoints will start aligned. If during checkpointing, checkpoint start
delay exceeds this ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT
,
alignment will timeout and checkpoint will start working as unaligned checkpoint.
@PublicEvolving public int getMaxSubtasksPerChannelStateFile()
setMaxSubtasksPerChannelStateFile(int)
or ExecutionCheckpointingOptions.UNALIGNED_MAX_SUBTASKS_PER_CHANNEL_STATE_FILE
.@PublicEvolving public void setMaxSubtasksPerChannelStateFile(int maxSubtasksPerChannelStateFile)
ExecutionCheckpointingOptions.UNALIGNED_MAX_SUBTASKS_PER_CHANNEL_STATE_FILE
has value equal
to 1
, each subtask will create a new channel state file.@Experimental public boolean isApproximateLocalRecoveryEnabled()
true
if approximate local recovery is enabled.@Experimental public void enableApproximateLocalRecovery(boolean enabled)
In this recovery mode, when a task fails, the entire downstream of the tasks (including the failed task) restart.
Notice that 1. Approximate recovery may lead to data loss. The amount of data which leads the failed task from the state of the last completed checkpoint to the state when the task fails is lost. 2. In the next version, we will support restarting the set of failed set of tasks only. In this version, we only support downstream restarts when a task fails. 3. It is only an internal feature for now.
enabled
- Flag to indicate whether approximate local recovery is enabled .@PublicEvolving public CheckpointConfig.ExternalizedCheckpointCleanup getExternalizedCheckpointCleanup()
null
if none is
configured.@PublicEvolving public void setCheckpointStorage(CheckpointStorage storage)
StateBackend
's checkpoint their state for fault
tolerance in streaming applications. Various implementations store their checkpoints in
different fashions and have different requirements and availability guarantees.
For example, JobManagerCheckpointStorage
stores checkpoints in the memory of the JobManager. It is
lightweight and without additional dependencies but is not highly available and only supports
small state sizes. This checkpoint storage policy is convenient for local testing and
development.
FileSystemCheckpointStorage
stores checkpoints in a filesystem. For systems like HDFS, NFS
Drives, S3, and GCS, this storage policy supports large state size, in the magnitude of many
terabytes while providing a highly available foundation for stateful applications. This
checkpoint storage policy is recommended for most production deployments.
storage
- The checkpoint storage policy.@PublicEvolving public void setCheckpointStorage(String checkpointDirectory)
FileSystemCheckpointStorage
for more details on checkpointing to a file system.checkpointDirectory
- The path to write checkpoint metadata to.setCheckpointStorage(CheckpointStorage)
@PublicEvolving public void setCheckpointStorage(URI checkpointDirectory)
FileSystemCheckpointStorage
for more details on checkpointing to a file system.checkpointDirectory
- The path to write checkpoint metadata to.setCheckpointStorage(CheckpointStorage)
@PublicEvolving public void setCheckpointStorage(Path checkpointDirectory)
FileSystemCheckpointStorage
for more details on checkpointing to a file system.checkpointDirectory
- The path to write checkpoint metadata to.setCheckpointStorage(String)
@Nullable @PublicEvolving public CheckpointStorage getCheckpointStorage()
CheckpointStorage
that has been configured for the job. Or null
if none has been set.setCheckpointStorage(CheckpointStorage)
@PublicEvolving public void setCheckpointIdOfIgnoredInFlightData(long checkpointIdOfIgnoredInFlightData)
checkpointIdOfIgnoredInFlightData
- Checkpoint id for which in-flight data should be
ignored.setCheckpointIdOfIgnoredInFlightData(long)
@PublicEvolving public long getCheckpointIdOfIgnoredInFlightData()
setCheckpointIdOfIgnoredInFlightData(long)
public void configure(ReadableConfig configuration)
ReadableConfig
such as e.g. ExecutionCheckpointingOptions.CHECKPOINTING_MODE
.
It will change the value of a setting only if a corresponding option was set in the configuration
. If a key is not present, the current value of a field will remain untouched.
configuration
- a configuration to read the values from@Internal public Configuration toConfiguration()
configuration
. Note it is missing all options that are
stored as plain java fields in CheckpointConfig
, for example storage
.Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.