Checkpoint#

CheckpointConfig#

Configuration that captures all checkpointing related settings.

DEFAULT_MODE:

The default checkpoint mode: exactly once.

DEFAULT_TIMEOUT:

The default timeout of a checkpoint attempt: 10 minutes.

DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS:

The default minimum pause to be made between checkpoints: none.

DEFAULT_MAX_CONCURRENT_CHECKPOINTS:

The default limit of concurrently happening checkpoints: one.

CheckpointConfig.DEFAULT_MODE

CheckpointConfig.DEFAULT_TIMEOUT

CheckpointConfig.DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS

CheckpointConfig.DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS

CheckpointConfig.DEFAULT_MAX_CONCURRENT_CHECKPOINTS

CheckpointConfig.is_checkpointing_enabled()

Checks whether checkpointing is enabled.

CheckpointConfig.get_checkpointing_mode()

Gets the checkpointing mode (exactly-once vs.

CheckpointConfig.set_checkpointing_mode(...)

Sets the checkpointing mode (CheckpointingMode.EXACTLY_ONCE vs.

CheckpointConfig.get_checkpoint_interval()

Gets the interval in which checkpoints are periodically scheduled.

CheckpointConfig.set_checkpoint_interval(...)

Sets the interval in which checkpoints are periodically scheduled.

CheckpointConfig.get_checkpoint_timeout()

Gets the maximum time that a checkpoint may take before being discarded.

CheckpointConfig.set_checkpoint_timeout(...)

Sets the maximum time that a checkpoint may take before being discarded.

CheckpointConfig.get_min_pause_between_checkpoints()

Gets the minimal pause between checkpointing attempts.

CheckpointConfig.set_min_pause_between_checkpoints(...)

Sets the minimal pause between checkpointing attempts.

CheckpointConfig.get_max_concurrent_checkpoints()

Gets the maximum number of checkpoint attempts that may be in progress at the same time.

CheckpointConfig.set_max_concurrent_checkpoints(...)

Sets the maximum number of checkpoint attempts that may be in progress at the same time.

CheckpointConfig.is_fail_on_checkpointing_errors()

This determines the behaviour of tasks if there is an error in their local checkpointing.

CheckpointConfig.set_fail_on_checkpointing_errors(...)

Sets the expected behaviour for tasks in case that they encounter an error in their checkpointing procedure.

CheckpointConfig.get_tolerable_checkpoint_failure_number()

Get the defined number of consecutive checkpoint failures that will be tolerated, before the whole job is failed over.

CheckpointConfig.set_tolerable_checkpoint_failure_number(...)

This defines how many consecutive checkpoint failures will be tolerated, before the whole job is failed over.

CheckpointConfig.enable_externalized_checkpoints(...)

Sets the mode for externalized checkpoint clean-up.

CheckpointConfig.set_externalized_checkpoint_cleanup(...)

Sets the mode for externalized checkpoint clean-up.

CheckpointConfig.is_externalized_checkpoints_enabled()

Returns whether checkpoints should be persisted externally.

CheckpointConfig.get_externalized_checkpoint_cleanup()

Returns the cleanup behaviour for externalized checkpoints.

CheckpointConfig.is_unaligned_checkpoints_enabled()

Returns whether unaligned checkpoints are enabled.

CheckpointConfig.enable_unaligned_checkpoints([...])

Enables unaligned checkpoints, which greatly reduce checkpointing times under backpressure.

CheckpointConfig.disable_unaligned_checkpoints()

Enables unaligned checkpoints, which greatly reduce checkpointing times under backpressure (experimental).

CheckpointConfig.set_alignment_timeout(...)

Only relevant if enable_unaligned_checkpoints() is enabled.

CheckpointConfig.get_alignment_timeout()

Returns the alignment timeout, as configured via set_alignment_timeout() or org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions#ALIGNMENT_TIMEOUT.

CheckpointConfig.set_force_unaligned_checkpoints([...])

Checks whether unaligned checkpoints are forced, despite currently non-checkpointable iteration feedback or custom partitioners.

CheckpointConfig.is_force_unaligned_checkpoints()

Checks whether unaligned checkpoints are forced, despite iteration feedback or custom partitioners.

CheckpointConfig.set_checkpoint_storage(storage)

Checkpoint storage defines how stat backends checkpoint their state for fault tolerance in streaming applications.

CheckpointConfig.set_checkpoint_storage_dir(...)

Configures the application to write out checkpoint snapshots to the configured directory.

CheckpointConfig.get_checkpoint_storage()

The checkpoint storage that has been configured for the Job, or None if none has been set.

ExternalizedCheckpointCleanup(value)

Cleanup behaviour for externalized checkpoints when the job is cancelled.

CheckpointStorage#

Checkpoint storage defines how StateBackend’s store their state for fault-tolerance in streaming applications. Various implementations store their checkpoints in different fashions and have different requirements and availability guarantees.

For example, JobManagerCheckpointStorage stores checkpoints in the memory of the JobManager. It is lightweight and without additional dependencies but is not scalable and only supports small state sizes. This checkpoints storage policy is convenient for local testing and development.

FileSystemCheckpointStorage stores checkpoints in a filesystem. For systems like HDFS NFS drives, S3, and GCS, this storage policy supports large state size, in the magnitude of many terabytes while providing a highly available foundation for streaming applications. This checkpoint storage policy is recommended for most production deployments.

Raw Bytes Storage

The CheckpointStorage creates services for raw bytes storage.

The raw bytes storage (through the CheckpointStreamFactory) is the fundamental service that simply stores bytes in a fault tolerant fashion. This service is used by the JobManager to store checkpoint and recovery metadata and is typically also used by the keyed- and operator- state backends to store checkpoint state.

Serializability

Implementations need to be serializable(java.io.Serializable), because they are distributed across parallel processes (for distributed execution) together with the streaming application code.

Because of that CheckpointStorage implementations are meant to be like _factories_ that create the proper state stores that provide access to the persistent layer. That way, the storage policy can be very lightweight (contain only configurations) which makes it easier to be serializable.

Thread Safety

Checkpoint storage implementations have to be thread-safe. Multiple threads may be creating streams concurrently.

JobManagerCheckpointStorage([...])

The CheckpointStorage checkpoints state directly to the JobManager's memory (hence the name), but savepoints will be persisted to a file system.

FileSystemCheckpointStorage([...])

FileSystemCheckpointStorage checkpoints state as files to a filesystem.

CustomCheckpointStorage(...)

A wrapper of customized java checkpoint storage.