################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from enum import Enum
from typing import Optional
from pyflink.common import Duration
from pyflink.datastream.checkpoint_storage import CheckpointStorage, _from_j_checkpoint_storage
from pyflink.datastream.checkpointing_mode import CheckpointingMode
from pyflink.java_gateway import get_gateway
__all__ = ['CheckpointConfig', 'ExternalizedCheckpointCleanup']
class CheckpointConfig(object):
"""
Configuration that captures all checkpointing related settings.
:data:`DEFAULT_MODE`:
The default checkpoint mode: exactly once.
:data:`DEFAULT_TIMEOUT`:
The default timeout of a checkpoint attempt: 10 minutes.
:data:`DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS`:
The default minimum pause to be made between checkpoints: none.
:data:`DEFAULT_MAX_CONCURRENT_CHECKPOINTS`:
The default limit of concurrently happening checkpoints: one.
"""
DEFAULT_MODE = CheckpointingMode.EXACTLY_ONCE
DEFAULT_TIMEOUT = 10 * 60 * 1000
DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS = 0
DEFAULT_MAX_CONCURRENT_CHECKPOINTS = 1
def __init__(self, j_checkpoint_config):
self._j_checkpoint_config = j_checkpoint_config
[docs] def is_checkpointing_enabled(self) -> bool:
"""
Checks whether checkpointing is enabled.
:return: True if checkpointing is enables, false otherwise.
"""
return self._j_checkpoint_config.isCheckpointingEnabled()
[docs] def get_checkpointing_mode(self) -> CheckpointingMode:
"""
Gets the checkpointing mode (exactly-once vs. at-least-once).
.. seealso:: :func:`set_checkpointing_mode`
:return: The :class:`CheckpointingMode`.
"""
return CheckpointingMode._from_j_checkpointing_mode(
self._j_checkpoint_config.getCheckpointingMode())
[docs] def set_checkpointing_mode(self, checkpointing_mode: CheckpointingMode) -> 'CheckpointConfig':
"""
Sets the checkpointing mode (:data:`CheckpointingMode.EXACTLY_ONCE` vs.
:data:`CheckpointingMode.AT_LEAST_ONCE`).
Example:
::
>>> config.set_checkpointing_mode(CheckpointingMode.AT_LEAST_ONCE)
:param checkpointing_mode: The :class:`CheckpointingMode`.
"""
self._j_checkpoint_config.setCheckpointingMode(
CheckpointingMode._to_j_checkpointing_mode(checkpointing_mode))
return self
[docs] def get_checkpoint_interval(self) -> int:
"""
Gets the interval in which checkpoints are periodically scheduled.
This setting defines the base interval. Checkpoint triggering may be delayed by the settings
:func:`get_max_concurrent_checkpoints` and :func:`get_min_pause_between_checkpoints`.
:return: The checkpoint interval, in milliseconds.
"""
return self._j_checkpoint_config.getCheckpointInterval()
[docs] def set_checkpoint_interval(self, checkpoint_interval: int) -> 'CheckpointConfig':
"""
Sets the interval in which checkpoints are periodically scheduled.
This setting defines the base interval. Checkpoint triggering may be delayed by the settings
:func:`set_max_concurrent_checkpoints` and :func:`set_min_pause_between_checkpoints`.
:param checkpoint_interval: The checkpoint interval, in milliseconds.
"""
self._j_checkpoint_config.setCheckpointInterval(checkpoint_interval)
return self
[docs] def get_checkpoint_timeout(self) -> int:
"""
Gets the maximum time that a checkpoint may take before being discarded.
:return: The checkpoint timeout, in milliseconds.
"""
return self._j_checkpoint_config.getCheckpointTimeout()
[docs] def set_checkpoint_timeout(self, checkpoint_timeout: int) -> 'CheckpointConfig':
"""
Sets the maximum time that a checkpoint may take before being discarded.
:param checkpoint_timeout: The checkpoint timeout, in milliseconds.
"""
self._j_checkpoint_config.setCheckpointTimeout(checkpoint_timeout)
return self
[docs] def get_min_pause_between_checkpoints(self) -> int:
"""
Gets the minimal pause between checkpointing attempts. This setting defines how soon the
checkpoint coordinator may trigger another checkpoint after it becomes possible to trigger
another checkpoint with respect to the maximum number of concurrent checkpoints
(see :func:`get_max_concurrent_checkpoints`).
:return: The minimal pause before the next checkpoint is triggered.
"""
return self._j_checkpoint_config.getMinPauseBetweenCheckpoints()
[docs] def set_min_pause_between_checkpoints(self,
min_pause_between_checkpoints: int) -> 'CheckpointConfig':
"""
Sets the minimal pause between checkpointing attempts. This setting defines how soon the
checkpoint coordinator may trigger another checkpoint after it becomes possible to trigger
another checkpoint with respect to the maximum number of concurrent checkpoints
(see :func:`set_max_concurrent_checkpoints`).
If the maximum number of concurrent checkpoints is set to one, this setting makes
effectively sure that a minimum amount of time passes where no checkpoint is in progress
at all.
:param min_pause_between_checkpoints: The minimal pause before the next checkpoint is
triggered.
"""
self._j_checkpoint_config.setMinPauseBetweenCheckpoints(min_pause_between_checkpoints)
return self
[docs] def get_max_concurrent_checkpoints(self) -> int:
"""
Gets the maximum number of checkpoint attempts that may be in progress at the same time.
If this value is *n*, then no checkpoints will be triggered while *n* checkpoint attempts
are currently in flight. For the next checkpoint to be triggered, one checkpoint attempt
would need to finish or expire.
:return: The maximum number of concurrent checkpoint attempts.
"""
return self._j_checkpoint_config.getMaxConcurrentCheckpoints()
[docs] def set_max_concurrent_checkpoints(self, max_concurrent_checkpoints: int) -> 'CheckpointConfig':
"""
Sets the maximum number of checkpoint attempts that may be in progress at the same time.
If this value is *n*, then no checkpoints will be triggered while *n* checkpoint attempts
are currently in flight. For the next checkpoint to be triggered, one checkpoint attempt
would need to finish or expire.
:param max_concurrent_checkpoints: The maximum number of concurrent checkpoint attempts.
"""
self._j_checkpoint_config.setMaxConcurrentCheckpoints(max_concurrent_checkpoints)
return self
[docs] def is_fail_on_checkpointing_errors(self) -> bool:
"""
This determines the behaviour of tasks if there is an error in their local checkpointing.
If this returns true, tasks will fail as a reaction. If this returns false, task will only
decline the failed checkpoint.
:return: ``True`` if failing on checkpointing errors, false otherwise.
"""
return self._j_checkpoint_config.isFailOnCheckpointingErrors()
[docs] def set_fail_on_checkpointing_errors(self,
fail_on_checkpointing_errors: bool) -> 'CheckpointConfig':
"""
Sets the expected behaviour for tasks in case that they encounter an error in their
checkpointing procedure. If this is set to true, the task will fail on checkpointing error.
If this is set to false, the task will only decline a the checkpoint and continue running.
The default is true.
Example:
::
>>> config.set_fail_on_checkpointing_errors(False)
:param fail_on_checkpointing_errors: ``True`` if failing on checkpointing errors,
false otherwise.
"""
self._j_checkpoint_config.setFailOnCheckpointingErrors(fail_on_checkpointing_errors)
return self
[docs] def get_tolerable_checkpoint_failure_number(self) -> int:
"""
Get the defined number of consecutive checkpoint failures that will be tolerated, before the
whole job is failed over.
:return: The maximum number of tolerated checkpoint failures.
"""
return self._j_checkpoint_config.getTolerableCheckpointFailureNumber()
[docs] def set_tolerable_checkpoint_failure_number(self,
tolerable_checkpoint_failure_number: int
) -> 'CheckpointConfig':
"""
This defines how many consecutive checkpoint failures will be tolerated, before the whole
job is failed over. The default value is `0`, which means no checkpoint failures will be
tolerated, and the job will fail on first reported checkpoint failure.
Example:
::
>>> config.set_tolerable_checkpoint_failure_number(2)
:param tolerable_checkpoint_failure_number: The maximum number of tolerated checkpoint
failures.
"""
self._j_checkpoint_config.setTolerableCheckpointFailureNumber(
tolerable_checkpoint_failure_number)
return self
[docs] def enable_externalized_checkpoints(
self,
cleanup_mode: 'ExternalizedCheckpointCleanup') -> 'CheckpointConfig':
"""
Sets the mode for externalized checkpoint clean-up. Externalized checkpoints will be enabled
automatically unless the mode is set to
:data:`ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS`.
Externalized checkpoints write their meta data out to persistent storage and are **not**
automatically cleaned up when the owning job fails or is suspended (terminating with job
status ``FAILED`` or ``SUSPENDED``). In this case, you have to manually clean up the
checkpoint state, both the meta data and actual program state.
The :class:`ExternalizedCheckpointCleanup` mode defines how an externalized checkpoint
should be cleaned up on job cancellation. If you choose to retain externalized checkpoints
on cancellation you have to handle checkpoint clean-up manually when you cancel the job as
well (terminating with job status ``CANCELED``).
The target directory for externalized checkpoints is configured via
``org.apache.flink.configuration.CheckpointingOptions#CHECKPOINTS_DIRECTORY``.
Example:
::
>>> config.enable_externalized_checkpoints(
... ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
:param cleanup_mode: Externalized checkpoint clean-up behaviour, the mode could be
:data:`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`,
:data:`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION` or
:data:`ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS`
.. note:: Deprecated in 1.15. Use :func:`set_externalized_checkpoint_cleanup` instead.
"""
self._j_checkpoint_config.enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup._to_j_externalized_checkpoint_cleanup(cleanup_mode))
return self
[docs] def set_externalized_checkpoint_cleanup(
self,
cleanup_mode: 'ExternalizedCheckpointCleanup') -> 'CheckpointConfig':
"""
Sets the mode for externalized checkpoint clean-up. Externalized checkpoints will be enabled
automatically unless the mode is set to
:data:`ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS`.
Externalized checkpoints write their meta data out to persistent storage and are **not**
automatically cleaned up when the owning job fails or is suspended (terminating with job
status ``FAILED`` or ``SUSPENDED``). In this case, you have to manually clean up the
checkpoint state, both the meta data and actual program state.
The :class:`ExternalizedCheckpointCleanup` mode defines how an externalized checkpoint
should be cleaned up on job cancellation. If you choose to retain externalized checkpoints
on cancellation you have to handle checkpoint clean-up manually when you cancel the job as
well (terminating with job status ``CANCELED``).
The target directory for externalized checkpoints is configured via
``org.apache.flink.configuration.CheckpointingOptions#CHECKPOINTS_DIRECTORY``.
Example:
::
>>> config.set_externalized_checkpoint_cleanup(
... ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
:param cleanup_mode: Externalized checkpoint clean-up behaviour, the mode could be
:data:`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`,
:data:`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION` or
:data:`ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS`
"""
self._j_checkpoint_config.setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup._to_j_externalized_checkpoint_cleanup(cleanup_mode))
return self
[docs] def is_externalized_checkpoints_enabled(self) -> bool:
"""
Returns whether checkpoints should be persisted externally.
:return: ``True`` if checkpoints should be externalized, false otherwise.
"""
return self._j_checkpoint_config.isExternalizedCheckpointsEnabled()
[docs] def get_externalized_checkpoint_cleanup(self) -> Optional['ExternalizedCheckpointCleanup']:
"""
Returns the cleanup behaviour for externalized checkpoints.
:return: The cleanup behaviour for externalized checkpoints or ``None`` if none is
configured.
"""
cleanup_mode = self._j_checkpoint_config.getExternalizedCheckpointCleanup()
if cleanup_mode is None:
return None
else:
return ExternalizedCheckpointCleanup._from_j_externalized_checkpoint_cleanup(
cleanup_mode)
[docs] def is_unaligned_checkpoints_enabled(self) -> bool:
"""
Returns whether unaligned checkpoints are enabled.
:return: ``True`` if unaligned checkpoints are enabled.
"""
return self._j_checkpoint_config.isUnalignedCheckpointsEnabled()
[docs] def enable_unaligned_checkpoints(self, enabled: bool = True) -> 'CheckpointConfig':
"""
Enables unaligned checkpoints, which greatly reduce checkpointing times under backpressure.
Unaligned checkpoints contain data stored in buffers as part of the checkpoint state, which
allows checkpoint barriers to overtake these buffers. Thus, the checkpoint duration becomes
independent of the current throughput as checkpoint barriers are effectively not embedded
into the stream of data anymore.
Unaligned checkpoints can only be enabled if :func:`get_checkpointing_mode` is
:data:`CheckpointingMode.EXACTLY_ONCE`.
:param enabled: ``True`` if a checkpoints should be taken in unaligned mode.
"""
self._j_checkpoint_config.enableUnalignedCheckpoints(enabled)
return self
[docs] def disable_unaligned_checkpoints(self) -> 'CheckpointConfig':
"""
Enables unaligned checkpoints, which greatly reduce checkpointing times under backpressure
(experimental).
Unaligned checkpoints contain data stored in buffers as part of the checkpoint state, which
allows checkpoint barriers to overtake these buffers. Thus, the checkpoint duration becomes
independent of the current throughput as checkpoint barriers are effectively not embedded
into the stream of data anymore.
Unaligned checkpoints can only be enabled if :func:`get_checkpointing_mode` is
:data:`CheckpointingMode.EXACTLY_ONCE`.
"""
self.enable_unaligned_checkpoints(False)
return self
[docs] def set_alignment_timeout(self, alignment_timeout: Duration) -> 'CheckpointConfig':
"""
Only relevant if :func:`enable_unaligned_checkpoints` is enabled.
If ``alignment_timeout`` has value equal to ``0``, checkpoints will always start unaligned.
If ``alignment_timeout`` has value greater then ``0``, checkpoints will start aligned. If
during checkpointing, checkpoint start delay exceeds this ``alignment_timeout``, alignment
will timeout and checkpoint will start working as unaligned checkpoint.
:param alignment_timeout: The duration until the aligned checkpoint will be converted into
an unaligned checkpoint.
"""
self._j_checkpoint_config.setAlignmentTimeout(alignment_timeout._j_duration)
return self
[docs] def get_alignment_timeout(self) -> 'Duration':
"""
Returns the alignment timeout, as configured via :func:`set_alignment_timeout` or
``org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions#ALIGNMENT_TIMEOUT``.
:return: the alignment timeout.
"""
return Duration(self._j_checkpoint_config.getAlignmentTimeout())
[docs] def set_force_unaligned_checkpoints(
self,
force_unaligned_checkpoints: bool = True) -> 'CheckpointConfig':
"""
Checks whether unaligned checkpoints are forced, despite currently non-checkpointable
iteration feedback or custom partitioners.
:param force_unaligned_checkpoints: The flag to force unaligned checkpoints.
"""
self._j_checkpoint_config.setForceUnalignedCheckpoints(force_unaligned_checkpoints)
return self
[docs] def is_force_unaligned_checkpoints(self) -> 'bool':
"""
Checks whether unaligned checkpoints are forced, despite iteration feedback or custom
partitioners.
:return: True, if unaligned checkpoints are forced, false otherwise.
"""
return self._j_checkpoint_config.isForceUnalignedCheckpoints()
[docs] def set_checkpoint_storage(self, storage: CheckpointStorage) -> 'CheckpointConfig':
"""
Checkpoint storage defines how stat backends checkpoint their state for fault
tolerance in streaming applications. Various implementations store their checkpoints
in different fashions and have different requirements and availability guarantees.
For example, `JobManagerCheckpointStorage` stores checkpoints in the memory of the
JobManager. It is lightweight and without additional dependencies but is not highly
available and only supports small state sizes. This checkpoint storage policy is convenient
for local testing and development.
The `FileSystemCheckpointStorage` stores checkpoints in a filesystem. For systems like
HDFS, NFS Drivs, S3, and GCS, this storage policy supports large state size, in the
magnitude of many terabytes while providing a highly available foundation for stateful
applications. This checkpoint storage policy is recommended for most production deployments.
"""
self._j_checkpoint_config.setCheckpointStorage(storage._j_checkpoint_storage)
return self
[docs] def set_checkpoint_storage_dir(self, checkpoint_path: str) -> 'CheckpointConfig':
"""
Configures the application to write out checkpoint snapshots to the configured directory.
See `FileSystemCheckpointStorage` for more details on checkpointing to a file system.
"""
self._j_checkpoint_config.setCheckpointStorage(checkpoint_path)
return self
[docs] def get_checkpoint_storage(self) -> Optional[CheckpointStorage]:
"""
The checkpoint storage that has been configured for the Job, or None if
none has been set.
"""
j_storage = self._j_checkpoint_config.getCheckpointStorage()
if j_storage is None:
return None
else:
return _from_j_checkpoint_storage(j_storage)
[docs]class ExternalizedCheckpointCleanup(Enum):
"""
Cleanup behaviour for externalized checkpoints when the job is cancelled.
:data:`DELETE_ON_CANCELLATION`:
Delete externalized checkpoints on job cancellation.
All checkpoint state will be deleted when you cancel the owning
job, both the meta data and actual program state. Therefore, you
cannot resume from externalized checkpoints after the job has been
cancelled.
Note that checkpoint state is always kept if the job terminates
with state ``FAILED``.
:data:`RETAIN_ON_CANCELLATION`:
Retain externalized checkpoints on job cancellation.
All checkpoint state is kept when you cancel the owning job. You
have to manually delete both the checkpoint meta data and actual
program state after cancelling the job.
Note that checkpoint state is always kept if the job terminates
with state ``FAILED``.
:data:`NO_EXTERNALIZED_CHECKPOINTS`:
Externalized checkpoints are disabled completely.
"""
DELETE_ON_CANCELLATION = 0
RETAIN_ON_CANCELLATION = 1
NO_EXTERNALIZED_CHECKPOINTS = 2
@staticmethod
def _from_j_externalized_checkpoint_cleanup(j_cleanup_mode) \
-> 'ExternalizedCheckpointCleanup':
return ExternalizedCheckpointCleanup[j_cleanup_mode.name()]
def _to_j_externalized_checkpoint_cleanup(self):
gateway = get_gateway()
JExternalizedCheckpointCleanup = \
gateway.jvm.org.apache.flink.streaming.api.environment.CheckpointConfig \
.ExternalizedCheckpointCleanup
return getattr(JExternalizedCheckpointCleanup, self.name)