Checkpoints make state in Flink fault tolerant by allowing state and the corresponding stream positions to be recovered, thereby giving the application the same semantics as a failure-free execution.
See Checkpointing for how to enable and configure checkpoints for your program.
Checkpoints are by default not persisted externally and are only used to resume a job from failures. They are deleted when a program is cancelled. You can, however, configure periodic checkpoints to be persisted externally similarly to savepoints. These externalized checkpoints write their meta data out to persistent storage and are not automatically cleaned up when the job fails. This way, you will have a checkpoint around to resume from if your job fails.
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
The ExternalizedCheckpointCleanup
mode configures what happens with externalized checkpoints when you cancel the job:
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
: Retain the externalized checkpoint when the job is cancelled. Note that you have to manually clean up the checkpoint state after cancellation in this case.
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION
: Delete the externalized checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails.
Similarly to savepoints, an externalized checkpoint consists
of a meta data file and, depending on the state back-end, some additional data
files. The target directory for the externalized checkpoint’s meta data is
determined from the configuration key state.checkpoints.dir
which, currently,
can only be set via the configuration files.
state.checkpoints.dir: hdfs:///checkpoints/
This directory will then contain the checkpoint meta data required to restore
the checkpoint. For the MemoryStateBackend
, this meta data file will be
self-contained and no further files are needed.
FsStateBackend
and RocksDBStateBackend
write separate data files
and only write the paths to these files into the meta data file. These data
files are stored at the path given to the state back-end during construction.
env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/");
Externalized checkpoints have a few differences from savepoints. They - use a state backend specific (low-level) data format, - may be incremental, - do not support Flink specific features like rescaling.
A job may be resumed from an externalized checkpoint just as from a savepoint by using the checkpoint’s meta data file instead (see the savepoint restore guide). Note that if the meta data file is not self-contained, the jobmanager needs to have access to the data files it refers to (see Directory Structure above).
$ bin/flink run -s :checkpointMetaDataPath [:runArgs]