Source code for pyflink.datastream.checkpoint_config

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from enum import Enum
from typing import Optional

from pyflink.common import Duration
from pyflink.datastream.checkpoint_storage import CheckpointStorage, _from_j_checkpoint_storage
from pyflink.datastream.checkpointing_mode import CheckpointingMode
from pyflink.java_gateway import get_gateway

__all__ = ['CheckpointConfig', 'ExternalizedCheckpointCleanup']


class CheckpointConfig(object):
    """
    Configuration that captures all checkpointing related settings.

    :data:`DEFAULT_MODE`:

    The default checkpoint mode: exactly once.

    :data:`DEFAULT_TIMEOUT`:

    The default timeout of a checkpoint attempt: 10 minutes.

    :data:`DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS`:

    The default minimum pause to be made between checkpoints: none.

    :data:`DEFAULT_MAX_CONCURRENT_CHECKPOINTS`:

    The default limit of concurrently happening checkpoints: one.
    """

    DEFAULT_MODE = CheckpointingMode.EXACTLY_ONCE

    DEFAULT_TIMEOUT = 10 * 60 * 1000

    DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS = 0

    DEFAULT_MAX_CONCURRENT_CHECKPOINTS = 1

    def __init__(self, j_checkpoint_config):
        self._j_checkpoint_config = j_checkpoint_config

[docs] def is_checkpointing_enabled(self) -> bool: """ Checks whether checkpointing is enabled. :return: True if checkpointing is enables, false otherwise. """ return self._j_checkpoint_config.isCheckpointingEnabled()
[docs] def get_checkpointing_mode(self) -> CheckpointingMode: """ Gets the checkpointing mode (exactly-once vs. at-least-once). .. seealso:: :func:`set_checkpointing_mode` :return: The :class:`CheckpointingMode`. """ return CheckpointingMode._from_j_checkpointing_mode( self._j_checkpoint_config.getCheckpointingMode())
[docs] def set_checkpointing_mode(self, checkpointing_mode: CheckpointingMode) -> 'CheckpointConfig': """ Sets the checkpointing mode (:data:`CheckpointingMode.EXACTLY_ONCE` vs. :data:`CheckpointingMode.AT_LEAST_ONCE`). Example: :: >>> config.set_checkpointing_mode(CheckpointingMode.AT_LEAST_ONCE) :param checkpointing_mode: The :class:`CheckpointingMode`. """ self._j_checkpoint_config.setCheckpointingMode( CheckpointingMode._to_j_checkpointing_mode(checkpointing_mode)) return self
[docs] def get_checkpoint_interval(self) -> int: """ Gets the interval in which checkpoints are periodically scheduled. This setting defines the base interval. Checkpoint triggering may be delayed by the settings :func:`get_max_concurrent_checkpoints` and :func:`get_min_pause_between_checkpoints`. :return: The checkpoint interval, in milliseconds. """ return self._j_checkpoint_config.getCheckpointInterval()
[docs] def set_checkpoint_interval(self, checkpoint_interval: int) -> 'CheckpointConfig': """ Sets the interval in which checkpoints are periodically scheduled. This setting defines the base interval. Checkpoint triggering may be delayed by the settings :func:`set_max_concurrent_checkpoints` and :func:`set_min_pause_between_checkpoints`. :param checkpoint_interval: The checkpoint interval, in milliseconds. """ self._j_checkpoint_config.setCheckpointInterval(checkpoint_interval) return self
[docs] def get_checkpoint_timeout(self) -> int: """ Gets the maximum time that a checkpoint may take before being discarded. :return: The checkpoint timeout, in milliseconds. """ return self._j_checkpoint_config.getCheckpointTimeout()
[docs] def set_checkpoint_timeout(self, checkpoint_timeout: int) -> 'CheckpointConfig': """ Sets the maximum time that a checkpoint may take before being discarded. :param checkpoint_timeout: The checkpoint timeout, in milliseconds. """ self._j_checkpoint_config.setCheckpointTimeout(checkpoint_timeout) return self
[docs] def get_min_pause_between_checkpoints(self) -> int: """ Gets the minimal pause between checkpointing attempts. This setting defines how soon the checkpoint coordinator may trigger another checkpoint after it becomes possible to trigger another checkpoint with respect to the maximum number of concurrent checkpoints (see :func:`get_max_concurrent_checkpoints`). :return: The minimal pause before the next checkpoint is triggered. """ return self._j_checkpoint_config.getMinPauseBetweenCheckpoints()
[docs] def set_min_pause_between_checkpoints(self, min_pause_between_checkpoints: int) -> 'CheckpointConfig': """ Sets the minimal pause between checkpointing attempts. This setting defines how soon the checkpoint coordinator may trigger another checkpoint after it becomes possible to trigger another checkpoint with respect to the maximum number of concurrent checkpoints (see :func:`set_max_concurrent_checkpoints`). If the maximum number of concurrent checkpoints is set to one, this setting makes effectively sure that a minimum amount of time passes where no checkpoint is in progress at all. :param min_pause_between_checkpoints: The minimal pause before the next checkpoint is triggered. """ self._j_checkpoint_config.setMinPauseBetweenCheckpoints(min_pause_between_checkpoints) return self
[docs] def get_max_concurrent_checkpoints(self) -> int: """ Gets the maximum number of checkpoint attempts that may be in progress at the same time. If this value is *n*, then no checkpoints will be triggered while *n* checkpoint attempts are currently in flight. For the next checkpoint to be triggered, one checkpoint attempt would need to finish or expire. :return: The maximum number of concurrent checkpoint attempts. """ return self._j_checkpoint_config.getMaxConcurrentCheckpoints()
[docs] def set_max_concurrent_checkpoints(self, max_concurrent_checkpoints: int) -> 'CheckpointConfig': """ Sets the maximum number of checkpoint attempts that may be in progress at the same time. If this value is *n*, then no checkpoints will be triggered while *n* checkpoint attempts are currently in flight. For the next checkpoint to be triggered, one checkpoint attempt would need to finish or expire. :param max_concurrent_checkpoints: The maximum number of concurrent checkpoint attempts. """ self._j_checkpoint_config.setMaxConcurrentCheckpoints(max_concurrent_checkpoints) return self
[docs] def is_fail_on_checkpointing_errors(self) -> bool: """ This determines the behaviour of tasks if there is an error in their local checkpointing. If this returns true, tasks will fail as a reaction. If this returns false, task will only decline the failed checkpoint. :return: ``True`` if failing on checkpointing errors, false otherwise. """ return self._j_checkpoint_config.isFailOnCheckpointingErrors()
[docs] def set_fail_on_checkpointing_errors(self, fail_on_checkpointing_errors: bool) -> 'CheckpointConfig': """ Sets the expected behaviour for tasks in case that they encounter an error in their checkpointing procedure. If this is set to true, the task will fail on checkpointing error. If this is set to false, the task will only decline a the checkpoint and continue running. The default is true. Example: :: >>> config.set_fail_on_checkpointing_errors(False) :param fail_on_checkpointing_errors: ``True`` if failing on checkpointing errors, false otherwise. """ self._j_checkpoint_config.setFailOnCheckpointingErrors(fail_on_checkpointing_errors) return self
[docs] def get_tolerable_checkpoint_failure_number(self) -> int: """ Get the defined number of consecutive checkpoint failures that will be tolerated, before the whole job is failed over. :return: The maximum number of tolerated checkpoint failures. """ return self._j_checkpoint_config.getTolerableCheckpointFailureNumber()
[docs] def set_tolerable_checkpoint_failure_number(self, tolerable_checkpoint_failure_number: int ) -> 'CheckpointConfig': """ This defines how many consecutive checkpoint failures will be tolerated, before the whole job is failed over. The default value is `0`, which means no checkpoint failures will be tolerated, and the job will fail on first reported checkpoint failure. Example: :: >>> config.set_tolerable_checkpoint_failure_number(2) :param tolerable_checkpoint_failure_number: The maximum number of tolerated checkpoint failures. """ self._j_checkpoint_config.setTolerableCheckpointFailureNumber( tolerable_checkpoint_failure_number) return self
[docs] def enable_externalized_checkpoints( self, cleanup_mode: 'ExternalizedCheckpointCleanup') -> 'CheckpointConfig': """ Sets the mode for externalized checkpoint clean-up. Externalized checkpoints will be enabled automatically unless the mode is set to :data:`ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS`. Externalized checkpoints write their meta data out to persistent storage and are **not** automatically cleaned up when the owning job fails or is suspended (terminating with job status ``FAILED`` or ``SUSPENDED``). In this case, you have to manually clean up the checkpoint state, both the meta data and actual program state. The :class:`ExternalizedCheckpointCleanup` mode defines how an externalized checkpoint should be cleaned up on job cancellation. If you choose to retain externalized checkpoints on cancellation you have to handle checkpoint clean-up manually when you cancel the job as well (terminating with job status ``CANCELED``). The target directory for externalized checkpoints is configured via ``org.apache.flink.configuration.CheckpointingOptions#CHECKPOINTS_DIRECTORY``. Example: :: >>> config.enable_externalized_checkpoints( ... ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) :param cleanup_mode: Externalized checkpoint clean-up behaviour, the mode could be :data:`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`, :data:`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION` or :data:`ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS` .. note:: Deprecated in 1.15. Use :func:`set_externalized_checkpoint_cleanup` instead. """ self._j_checkpoint_config.enableExternalizedCheckpoints( ExternalizedCheckpointCleanup._to_j_externalized_checkpoint_cleanup(cleanup_mode)) return self
[docs] def set_externalized_checkpoint_cleanup( self, cleanup_mode: 'ExternalizedCheckpointCleanup') -> 'CheckpointConfig': """ Sets the mode for externalized checkpoint clean-up. Externalized checkpoints will be enabled automatically unless the mode is set to :data:`ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS`. Externalized checkpoints write their meta data out to persistent storage and are **not** automatically cleaned up when the owning job fails or is suspended (terminating with job status ``FAILED`` or ``SUSPENDED``). In this case, you have to manually clean up the checkpoint state, both the meta data and actual program state. The :class:`ExternalizedCheckpointCleanup` mode defines how an externalized checkpoint should be cleaned up on job cancellation. If you choose to retain externalized checkpoints on cancellation you have to handle checkpoint clean-up manually when you cancel the job as well (terminating with job status ``CANCELED``). The target directory for externalized checkpoints is configured via ``org.apache.flink.configuration.CheckpointingOptions#CHECKPOINTS_DIRECTORY``. Example: :: >>> config.set_externalized_checkpoint_cleanup( ... ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) :param cleanup_mode: Externalized checkpoint clean-up behaviour, the mode could be :data:`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`, :data:`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION` or :data:`ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS` """ self._j_checkpoint_config.setExternalizedCheckpointCleanup( ExternalizedCheckpointCleanup._to_j_externalized_checkpoint_cleanup(cleanup_mode)) return self
[docs] def is_externalized_checkpoints_enabled(self) -> bool: """ Returns whether checkpoints should be persisted externally. :return: ``True`` if checkpoints should be externalized, false otherwise. """ return self._j_checkpoint_config.isExternalizedCheckpointsEnabled()
[docs] def get_externalized_checkpoint_cleanup(self) -> Optional['ExternalizedCheckpointCleanup']: """ Returns the cleanup behaviour for externalized checkpoints. :return: The cleanup behaviour for externalized checkpoints or ``None`` if none is configured. """ cleanup_mode = self._j_checkpoint_config.getExternalizedCheckpointCleanup() if cleanup_mode is None: return None else: return ExternalizedCheckpointCleanup._from_j_externalized_checkpoint_cleanup( cleanup_mode)
[docs] def is_unaligned_checkpoints_enabled(self) -> bool: """ Returns whether unaligned checkpoints are enabled. :return: ``True`` if unaligned checkpoints are enabled. """ return self._j_checkpoint_config.isUnalignedCheckpointsEnabled()
[docs] def enable_unaligned_checkpoints(self, enabled: bool = True) -> 'CheckpointConfig': """ Enables unaligned checkpoints, which greatly reduce checkpointing times under backpressure. Unaligned checkpoints contain data stored in buffers as part of the checkpoint state, which allows checkpoint barriers to overtake these buffers. Thus, the checkpoint duration becomes independent of the current throughput as checkpoint barriers are effectively not embedded into the stream of data anymore. Unaligned checkpoints can only be enabled if :func:`get_checkpointing_mode` is :data:`CheckpointingMode.EXACTLY_ONCE`. :param enabled: ``True`` if a checkpoints should be taken in unaligned mode. """ self._j_checkpoint_config.enableUnalignedCheckpoints(enabled) return self
[docs] def disable_unaligned_checkpoints(self) -> 'CheckpointConfig': """ Enables unaligned checkpoints, which greatly reduce checkpointing times under backpressure (experimental). Unaligned checkpoints contain data stored in buffers as part of the checkpoint state, which allows checkpoint barriers to overtake these buffers. Thus, the checkpoint duration becomes independent of the current throughput as checkpoint barriers are effectively not embedded into the stream of data anymore. Unaligned checkpoints can only be enabled if :func:`get_checkpointing_mode` is :data:`CheckpointingMode.EXACTLY_ONCE`. """ self.enable_unaligned_checkpoints(False) return self
[docs] def set_alignment_timeout(self, alignment_timeout: Duration) -> 'CheckpointConfig': """ Only relevant if :func:`enable_unaligned_checkpoints` is enabled. If ``alignment_timeout`` has value equal to ``0``, checkpoints will always start unaligned. If ``alignment_timeout`` has value greater then ``0``, checkpoints will start aligned. If during checkpointing, checkpoint start delay exceeds this ``alignment_timeout``, alignment will timeout and checkpoint will start working as unaligned checkpoint. :param alignment_timeout: The duration until the aligned checkpoint will be converted into an unaligned checkpoint. """ self._j_checkpoint_config.setAlignmentTimeout(alignment_timeout._j_duration) return self
[docs] def get_alignment_timeout(self) -> 'Duration': """ Returns the alignment timeout, as configured via :func:`set_alignment_timeout` or ``org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions#ALIGNMENT_TIMEOUT``. :return: the alignment timeout. """ return Duration(self._j_checkpoint_config.getAlignmentTimeout())
[docs] def set_force_unaligned_checkpoints( self, force_unaligned_checkpoints: bool = True) -> 'CheckpointConfig': """ Checks whether unaligned checkpoints are forced, despite currently non-checkpointable iteration feedback or custom partitioners. :param force_unaligned_checkpoints: The flag to force unaligned checkpoints. """ self._j_checkpoint_config.setForceUnalignedCheckpoints(force_unaligned_checkpoints) return self
[docs] def is_force_unaligned_checkpoints(self) -> 'bool': """ Checks whether unaligned checkpoints are forced, despite iteration feedback or custom partitioners. :return: True, if unaligned checkpoints are forced, false otherwise. """ return self._j_checkpoint_config.isForceUnalignedCheckpoints()
[docs] def set_checkpoint_storage(self, storage: CheckpointStorage) -> 'CheckpointConfig': """ Checkpoint storage defines how stat backends checkpoint their state for fault tolerance in streaming applications. Various implementations store their checkpoints in different fashions and have different requirements and availability guarantees. For example, `JobManagerCheckpointStorage` stores checkpoints in the memory of the JobManager. It is lightweight and without additional dependencies but is not highly available and only supports small state sizes. This checkpoint storage policy is convenient for local testing and development. The `FileSystemCheckpointStorage` stores checkpoints in a filesystem. For systems like HDFS, NFS Drivs, S3, and GCS, this storage policy supports large state size, in the magnitude of many terabytes while providing a highly available foundation for stateful applications. This checkpoint storage policy is recommended for most production deployments. """ self._j_checkpoint_config.setCheckpointStorage(storage._j_checkpoint_storage) return self
[docs] def set_checkpoint_storage_dir(self, checkpoint_path: str) -> 'CheckpointConfig': """ Configures the application to write out checkpoint snapshots to the configured directory. See `FileSystemCheckpointStorage` for more details on checkpointing to a file system. """ self._j_checkpoint_config.setCheckpointStorage(checkpoint_path) return self
[docs] def get_checkpoint_storage(self) -> Optional[CheckpointStorage]: """ The checkpoint storage that has been configured for the Job, or None if none has been set. """ j_storage = self._j_checkpoint_config.getCheckpointStorage() if j_storage is None: return None else: return _from_j_checkpoint_storage(j_storage)
[docs]class ExternalizedCheckpointCleanup(Enum): """ Cleanup behaviour for externalized checkpoints when the job is cancelled. :data:`DELETE_ON_CANCELLATION`: Delete externalized checkpoints on job cancellation. All checkpoint state will be deleted when you cancel the owning job, both the meta data and actual program state. Therefore, you cannot resume from externalized checkpoints after the job has been cancelled. Note that checkpoint state is always kept if the job terminates with state ``FAILED``. :data:`RETAIN_ON_CANCELLATION`: Retain externalized checkpoints on job cancellation. All checkpoint state is kept when you cancel the owning job. You have to manually delete both the checkpoint meta data and actual program state after cancelling the job. Note that checkpoint state is always kept if the job terminates with state ``FAILED``. :data:`NO_EXTERNALIZED_CHECKPOINTS`: Externalized checkpoints are disabled completely. """ DELETE_ON_CANCELLATION = 0 RETAIN_ON_CANCELLATION = 1 NO_EXTERNALIZED_CHECKPOINTS = 2 @staticmethod def _from_j_externalized_checkpoint_cleanup(j_cleanup_mode) \ -> 'ExternalizedCheckpointCleanup': return ExternalizedCheckpointCleanup[j_cleanup_mode.name()] def _to_j_externalized_checkpoint_cleanup(self): gateway = get_gateway() JExternalizedCheckpointCleanup = \ gateway.jvm.org.apache.flink.streaming.api.environment.CheckpointConfig \ .ExternalizedCheckpointCleanup return getattr(JExternalizedCheckpointCleanup, self.name)