Checkpointing
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

Checkpointing #

Flink 中的每个方法或算子都能够是有状态的(阅读 working with state 了解更多)。 状态化的方法在处理单个 元素/事件 的时候存储数据,让状态成为使各个类型的算子更加精细的重要部分。 为了让状态容错,Flink 需要为状态添加 checkpoint(检查点)。Checkpoint 使得 Flink 能够恢复状态和在流中的位置,从而向应用提供和无故障执行时一样的语义。

容错文档 中介绍了 Flink 流计算容错机制内部的技术原理。

前提条件 #

Flink 的 checkpoint 机制会和持久化存储进行交互,读写流与状态。一般需要:

  • 一个能够回放一段时间内数据的持久化数据源,例如持久化消息队列(例如 Apache Kafka、RabbitMQ、 Amazon Kinesis、 Google PubSub 等)或文件系统(例如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。
  • 存放状态的持久化存储,通常为分布式文件系统(比如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。

开启与配置 Checkpoint #

默认情况下 checkpoint 是禁用的。通过调用 StreamExecutionEnvironmentenableCheckpointing(n) 来启用 checkpoint,里面的 n 是进行 checkpoint 的间隔,单位毫秒。

Checkpoint 其他的属性包括:

  • Checkpoint 存储: 你可以设置检查点快照的持久化位置。默认情况下,Flink将使用JobManager的堆。建议在生产部署中改为使用持久性文件系统。 有关作业范围和集群范围配置的可用选项的更多详细信息,请参阅Checkpoint 存储

  • 精确一次(exactly-once)对比至少一次(at-least-once):你可以选择向 enableCheckpointing(long interval, CheckpointingMode mode) 方法中传入一个模式来选择使用两种保证等级中的哪一种。 对于大多数应用来说,精确一次是较好的选择。至少一次可能与某些延迟超低(始终只有几毫秒)的应用的关联较大。

  • checkpoint 超时:如果 checkpoint 执行的时间超过了该配置的阈值,还在进行中的 checkpoint 操作就会被抛弃。

  • checkpoints 之间的最小时间:该属性定义在 checkpoint 之间需要多久的时间,以确保流应用在 checkpoint 之间有足够的进展。如果值设置为了 5000, 无论 checkpoint 持续时间与间隔是多久,在前一个 checkpoint 完成时的至少五秒后会才开始下一个 checkpoint。

    往往使用“checkpoints 之间的最小时间”来配置应用会比 checkpoint 间隔容易很多,因为“checkpoints 之间的最小时间”在 checkpoint 的执行时间超过平均值时不会受到影响(例如如果目标的存储系统忽然变得很慢)。

    注意这个值也意味着并发 checkpoint 的数目是

  • checkpoint 可容忍连续失败次数:该属性定义可容忍多少次连续的 checkpoint 失败。超过这个阈值之后会触发作业错误 fail over。 默认次数为“0”,这意味着不容忍 checkpoint 失败,作业将在第一次 checkpoint 失败时fail over。 可容忍的checkpoint失败仅适用于下列情形:Job Manager的IOException,TaskManager做checkpoint时异步部分的失败, checkpoint超时等。TaskManager做checkpoint时同步部分的失败会直接触发作业fail over。其它的checkpoint失败(如一个checkpoint被另一个checkpoint包含)会被忽略掉。

  • 并发 checkpoint 的数目: 默认情况下,在上一个 checkpoint 未完成(失败或者成功)的情况下,系统不会触发另一个 checkpoint。这确保了拓扑不会在 checkpoint 上花费太多时间,从而影响正常的处理流程。 不过允许多个 checkpoint 并行进行是可行的,对于有确定的处理延迟(例如某方法所调用比较耗时的外部服务),但是仍然想进行频繁的 checkpoint 去最小化故障后重跑的 pipelines 来说,是有意义的。

    该选项不能和 “checkpoints 间的最小时间"同时使用。

  • externalized checkpoints: 你可以配置周期存储 checkpoint 到外部系统中。Externalized checkpoints 将他们的元数据写到持久化存储上并且在 job 失败的时候不会被自动删除。 这种方式下,如果你的 job 失败,你将会有一个现有的 checkpoint 去恢复。更多的细节请看 保留 checkpoints 的部署文档

  • 非对齐 checkpoints: 你可以启用非对齐 checkpoints 以在背压时大大减少创建checkpoint的时间。这仅适用于精确一次(exactly-once)checkpoints 并且只有一个并发检查点。

  • 部分任务结束的 checkpoints: 默认情况下,即使DAG的部分已经处理完它们的所有记录,Flink也会继续执行 checkpoints。 请参阅重要注意事项以了解详细信息。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(1000);

// 高级选项:

// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig().setCheckpointTimeout(60000);

// 允许两个连续的 checkpoint 错误
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
        
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
env.getCheckpointConfig().setExternalizedCheckpointRetention(
        ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);

// 开启实验性的 unaligned checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();
val env = StreamExecutionEnvironment.getExecutionEnvironment()

// 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(1000)

// 高级选项:

// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig.setCheckpointTimeout(60000)

// 允许两个连续的 checkpoint 错误
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2)

// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
env.getCheckpointConfig().setExternalizedCheckpointRetention(
  ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION)

// 开启实验性的 unaligned checkpoints
env.getCheckpointConfig.enableUnalignedCheckpoints()
env = StreamExecutionEnvironment.get_execution_environment()

# 每 1000ms 开始一次 checkpoint
env.enable_checkpointing(1000)

# 高级选项:

# 设置模式为精确一次 (这是默认值)
env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)

# 确认 checkpoints 之间的时间会进行 500 ms
env.get_checkpoint_config().set_min_pause_between_checkpoints(500)

# Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.get_checkpoint_config().set_checkpoint_timeout(60000)

# 允许两个连续的 checkpoint 错误
env.get_checkpoint_config().set_tolerable_checkpoint_failure_number(2)

# 同一时间只允许一个 checkpoint 进行
env.get_checkpoint_config().set_max_concurrent_checkpoints(1)

# 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
env.get_checkpoint_config().enable_externalized_checkpoints(
    ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION)
    
# 开启实验性的 unaligned checkpoints
env.get_checkpoint_config().enable_unaligned_checkpoints()

相关的配置选项 #

更多的属性与默认值能在 Flink 配置文件 中设置(完整教程请阅读 配置)。

Key Default Type Description
execution.checkpointing.aligned-checkpoint-timeout
0 ms Duration Only relevant if execution.checkpointing.unaligned.enabled is enabled.

If timeout is 0, checkpoints will always start unaligned.

If timeout has a positive value, checkpoints will start aligned. If during checkpointing, checkpoint start delay exceeds this timeout, alignment will timeout and checkpoint barrier will start working as unaligned checkpoint.
execution.checkpointing.checkpoints-after-tasks-finish
true Boolean Feature toggle for enabling checkpointing even if some of tasks have finished. Before you enable it, please take a look at the important considerations
execution.checkpointing.cleaner.parallel-mode
true Boolean Option whether to discard a checkpoint's states in parallel using the ExecutorService passed into the cleaner
execution.checkpointing.create-subdir
true Boolean Whether to create sub-directories named by job id under the 'execution.checkpointing.dir' to store the data files and meta data of checkpoints. The default value is true to enable user could run several jobs with the same checkpoint directory at the same time. If this value is set to false, pay attention not to run several jobs with the same directory simultaneously.
WARNING: This is an advanced configuration. If set to false, users must ensure that no multiple jobs are run with the same checkpoint directory, and that no files exist other than those necessary for the restoration of the current job when starting a new job.
execution.checkpointing.data-inline-threshold
20 kb MemorySize The minimum size of state data files. All state chunks smaller than that are stored inline in the root checkpoint metadata file. The max memory threshold for this configuration is 1MB.
execution.checkpointing.dir
(none) String The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers). If the 'execution.checkpointing.storage' is set to 'jobmanager', only the meta data of checkpoints will be stored in this directory.
execution.checkpointing.externalized-checkpoint-retention
NO_EXTERNALIZED_CHECKPOINTS

Enum

Externalized checkpoints write their meta data out to persistent storage and are not automatically cleaned up when the owning job fails or is suspended (terminating with job status JobStatus#FAILED or JobStatus#SUSPENDED). In this case, you have to manually clean up the checkpoint state, both the meta data and actual program state.

The mode defines how an externalized checkpoint should be cleaned up on job cancellation. If you choose to retain externalized checkpoints on cancellation you have to handle checkpoint clean up manually when you cancel the job as well (terminating with job status JobStatus#CANCELED).

The target directory for externalized checkpoints is configured via execution.checkpointing.dir.

Possible values:
  • "DELETE_ON_CANCELLATION": Checkpoint state is only kept when the owning job fails. It is deleted if the job is cancelled.
  • "RETAIN_ON_CANCELLATION": Checkpoint state is kept when the owning job is cancelled or fails.
  • "NO_EXTERNALIZED_CHECKPOINTS": Externalized checkpoints are disabled.
execution.checkpointing.file-merging.across-checkpoint-boundary
false Boolean Only relevant if execution.checkpointing.file-merging.enabled is enabled.
Whether to allow merging data of multiple checkpoints into one physical file. If this option is set to false, only merge files within checkpoint boundaries. Otherwise, it is possible for the logical files of different checkpoints to share the same physical file.
execution.checkpointing.file-merging.enabled
false Boolean Whether to enable merging multiple checkpoint files into one, which will greatly reduce the number of small checkpoint files. This is an experimental feature under evaluation, make sure you're aware of the possible effects of enabling it.
execution.checkpointing.file-merging.max-file-size
32 mb MemorySize Max size of a physical file for merged checkpoints.
execution.checkpointing.file-merging.max-space-amplification
2.0 Float Space amplification stands for the magnification of the occupied space compared to the amount of valid data. The more space amplification is, the more waste of space will be. This configs a space amplification above which a re-uploading for physical files will be triggered to reclaim space. Any value below 1f means disabling the space control.
execution.checkpointing.file-merging.pool-blocking
false Boolean Whether to use Blocking or Non-Blocking pool for merging physical files. A Non-Blocking pool will always provide usable physical file without blocking. It may create many physical files if poll file frequently. When poll a small file from a Blocking pool, it may be blocked until the file is returned.
execution.checkpointing.incremental
false Boolean Option whether to create incremental checkpoints, if possible. For an incremental checkpoint, only a diff from the previous checkpoint is stored, rather than the complete checkpoint state. Once enabled, the state size shown in web UI or fetched from rest API only represents the delta checkpoint size instead of full checkpoint size. Some state backends may not support incremental checkpoints and ignore this option.
execution.checkpointing.interval
(none) Duration Gets the interval in which checkpoints are periodically scheduled.

This setting defines the base interval. Checkpoint triggering may be delayed by the settings execution.checkpointing.max-concurrent-checkpoints, execution.checkpointing.min-pause and execution.checkpointing.interval-during-backlog
execution.checkpointing.interval-during-backlog
(none) Duration If it is not null and any source reports isProcessingBacklog=true, it is the interval in which checkpoints are periodically scheduled.

Checkpoint triggering may be delayed by the settings execution.checkpointing.max-concurrent-checkpoints and execution.checkpointing.min-pause.

Note: if it is not null, the value must either be 0, which means the checkpoint is disabled during backlog, or be larger than or equal to execution.checkpointing.interval.
execution.checkpointing.local-backup.dirs
(none) String The config parameter defining the root directories for storing file-based state for local recovery. Local recovery currently only covers keyed state backends. If not configured it will default to <WORKING_DIR>/localState. The <WORKING_DIR> can be configured via process.taskmanager.working-dir
execution.checkpointing.local-backup.enabled
false Boolean This option configures local backup for the state backend, which indicates whether to make backup checkpoint on local disk. If not configured, fallback to execution.state-recovery.from-local. By default, local backup is deactivated. Local backup currently only covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend).
execution.checkpointing.max-concurrent-checkpoints
1 Integer The maximum number of checkpoint attempts that may be in progress at the same time. If this value is n, then no checkpoints will be triggered while n checkpoint attempts are currently in flight. For the next checkpoint to be triggered, one checkpoint attempt would need to finish or expire.
execution.checkpointing.min-pause
0 ms Duration The minimal pause between checkpointing attempts. This setting defines how soon thecheckpoint coordinator may trigger another checkpoint after it becomes possible to triggeranother checkpoint with respect to the maximum number of concurrent checkpoints(see execution.checkpointing.max-concurrent-checkpoints).

If the maximum number of concurrent checkpoints is set to one, this setting makes effectively sure that a minimum amount of time passes where no checkpoint is in progress at all.
execution.checkpointing.mode
EXACTLY_ONCE

Enum

The checkpointing mode (exactly-once vs. at-least-once).

Possible values:
  • "EXACTLY_ONCE"
  • "AT_LEAST_ONCE"
execution.checkpointing.num-retained
1 Integer The maximum number of completed checkpoints to retain.
execution.checkpointing.savepoint-dir
(none) String The default directory for savepoints. Used by the state backends that write savepoints to file systems (HashMapStateBackend, EmbeddedRocksDBStateBackend).
execution.checkpointing.storage
(none) String The checkpoint storage implementation to be used to checkpoint state.
The implementation can be specified either via their shortcut name, or via the class name of a CheckpointStorageFactory. If a factory is specified it is instantiated via its zero argument constructor and its CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader) method is called.
Recognized shortcut names are 'jobmanager' and 'filesystem'.
'execution.checkpointing.storage' and 'execution.checkpointing.dir' are usually combined to configure the checkpoint location. By default, the checkpoint meta data and actual program state will be stored in the JobManager's memory directly. When 'execution.checkpointing.storage' is set to 'jobmanager', if 'execution.checkpointing.dir' is configured, the meta data of checkpoints will be persisted to the path specified by 'execution.checkpointing.dir'. Otherwise, the meta data will be stored in the JobManager's memory. When 'execution.checkpointing.storage' is set to 'filesystem', a valid path must be configured to 'execution.checkpointing.dir', and the checkpoint meta data and actual program state will both be persisted to the path.
execution.checkpointing.timeout
10 min Duration The maximum time that a checkpoint may take before being discarded.
execution.checkpointing.tolerable-failed-checkpoints
0 Integer The tolerable checkpoint consecutive failure number. If set to 0, that means we do not tolerance any checkpoint failure. This only applies to the following failure reasons: IOException on the Job Manager, failures in the async phase on the Task Managers and checkpoint expiration due to a timeout. Failures originating from the sync phase on the Task Managers are always forcing failover of an affected task. Other types of checkpoint failures (such as checkpoint being subsumed) are being ignored.
execution.checkpointing.unaligned.enabled
false Boolean Enables unaligned checkpoints, which greatly reduce checkpointing times under backpressure.

Unaligned checkpoints contain data stored in buffers as part of the checkpoint state, which allows checkpoint barriers to overtake these buffers. Thus, the checkpoint duration becomes independent of the current throughput as checkpoint barriers are effectively not embedded into the stream of data anymore.

Unaligned checkpoints can only be enabled if execution.checkpointing.mode is EXACTLY_ONCE and if execution.checkpointing.max-concurrent-checkpoints is 1
execution.checkpointing.unaligned.forced
false Boolean Forces unaligned checkpoints, particularly allowing them for iterative jobs.
execution.checkpointing.unaligned.interruptible-timers.enabled
false Boolean Allows unaligned checkpoints to skip timers that are currently being fired. For this feature to be enabled, it must be also supported by the operator. Currently this is supported by all TableStreamOperators and CepOperator.
execution.checkpointing.unaligned.max-subtasks-per-channel-state-file
5 Integer Defines the maximum number of subtasks that share the same channel state file. It can reduce the number of small files when enable unaligned checkpoint. Each subtask will create a new channel state file when this is configured to 1.
execution.checkpointing.write-buffer-size
4096 Integer The default size of the write buffer for the checkpoint streams that write to file systems. The actual write buffer size is determined to be the maximum of the value of this option and option 'execution.checkpointing.data-inline-threshold'.

Back to top

选择一个 State Backend #

Flink 的 checkpointing 机制 会将 timer 以及 stateful 的 operator 进行快照,然后存储下来, 包括连接器(connectors),窗口(windows)以及任何用户自定义的状态。 Checkpoint 存储在哪里取决于所配置的 State Backend(比如 JobManager memory、 file system、 database)。

默认情况下,状态是保持在 TaskManagers 的内存中,checkpoint 保存在 JobManager 的内存中。为了合适地持久化大体量状态, Flink 支持各种各样的途径去存储 checkpoint 状态到其他的 state backends 上。可以通过如下代码块来配置:

Configuration config = new Configuration();
config.set(StateBackendOptions.STATE_BACKEND, "hashmap");
env.configure(config);

阅读 state backends 来查看在 job 范围和集群范围上可用的 state backends 与选项的更多细节。

迭代作业中的状态和 checkpoint #

Flink 现在为没有迭代(iterations)的作业提供一致性的处理保证。在迭代作业上开启 checkpoint 会导致异常。为了在迭代程序中强制进行 checkpoint,用户需要在开启 checkpoint 时设置一个特殊的标志: env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE, force = true)

请注意在环形边上游走的记录(以及与之相关的状态变化)在故障时会丢失。

部分任务结束后的 Checkpoint #

从版本 1.14 开始 Flink 支持在部分任务结束后继续进行Checkpoint。 如果一部分数据源是有限数据集,那么就可以出现这种情况。 从版本 1.15 开始,这一特性被默认打开。如果想要关闭这一功能,可以执行:

Configuration config = new Configuration();
config.set(CheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);

在这种情况下,结束的任务不会参与 Checkpoint 的过程。在实现自定义的算子或者 UDF (用户自定义函数)时需要考虑这一点。

为了支持部分任务结束后的 Checkpoint 操作,我们调整了 任务的生命周期 并且引入了 StreamOperator#finish 方法。 在这一方法中,用户需要写出所有缓冲区中的数据。在 finish 方法调用后的 checkpoint 中,这一任务一定不能再有缓冲区中的数据,因为在 finish() 后没有办法来输出这些数据。 在大部分情况下,finish() 后这一任务的状态为空,唯一的例外是如果其中某些算子中包含外部系统事务的句柄(例如为了实现恰好一次语义), 在这种情况下,在 finish() 后进行的 checkpoint 操作应该保留这些句柄,并且在结束 checkpoint(即任务退出前所等待的 checkpoint)时提交。 一个可以参考的例子是满足恰好一次语义的 sink 接口与 TwoPhaseCommitSinkFunction

对 operator state 的影响 #

在部分 Task 结束后的checkpoint中,Flink 对 UnionListState 进行了特殊的处理。 UnionListState 一般用于实现对外部系统读取位置的一个全局视图(例如,用于记录所有 Kafka 分区的读取偏移)。 如果我们在算子的某个并发调用 close() 方法后丢弃它的状态,我们就会丢失它所分配的分区的偏移量信息。 为了解决这一问题,对于使用 UnionListState 的算子我们只允许在它的并发都在运行或都已结束的时候才能进行 checkpoint 操作。

ListState 一般不会用于类似的场景,但是用户仍然需要注意在调用 close() 方法后进行的 checkpoint 会丢弃算子的状态并且 这些状态在算子重启后不可用。

任何支持并发修改操作的算子也可以支持部分并发实例结束后的恢复操作。从这种类型的快照中恢复等价于将算子的并发改为正在运行的并发实例数。

任务结束前等待最后一次 Checkpoint #

为了保证使用两阶段提交的算子可以提交所有的数据,任务会在所有算子都调用 finish() 方法后等待下一次 checkpoint 成功后退出。 需要注意的是,这一行为可能会延长任务运行的时间,如果 checkpoint 周期比较大,这一延迟会非常明显。 极端情况下,如果 checkpoint 的周期被设置为 Long.MAX_VALUE,那么任务永远不会结束,因为下一次 checkpoint 不会进行。

统一的 checkpoint 文件合并机制 (实验性功能) #

Flink 1.20 引入了 MVP 版本的统一 checkpoint 文件合并机制,该机制允许把分散的 checkpoint 小文件合并到大文件中,减少 checkpoint 文件创建删除的次数, 有助于减轻文件过多问题带来的文件系统元数据管理的压力。可以通过将 state.checkpoints.file-merging.enabled 设置为 true 来开启该机制。 注意,考虑 trade-off,开启该机制会导致空间放大,即文件系统上的实际占用会比 state size 更大,可以通过设置 state.checkpoints.file-merging.max-space-amplification 来控制文件放大的上限。

该机制适用于 Flink 中的 keyed state、operator state 和 channel state。对 shared scope state 提供 subtask 级别的合并;对 private scope state 提供 TaskManager 级别的合并,可以通过 state.checkpoints.file-merging.max-subtasks-per-file 选项配置单个文件允许写入的最大 subtask 数目。

统一文件合并机制也支持跨 checkpoint 的文件合并,通过设置 state.checkpoints.file-merging.across-checkpoint-boundarytrue 开启。

该机制引入了文件池用于处理并发写的场景,文件池有两种模式,Non-blocking 模式的文件池会对每个文件请求即时返回一个物理文件,在频繁请求的情况下会创建出许多物理文件; 而 Blocking 模式的文件池会一直阻塞文件请求,直到文件池中有返回的文件可用,可以通过设置 state.checkpoints.file-merging.pool-blockingtrue 选择 Blocking 模式,设置为 false 选择 Non-blocking 模式。

Back to top