Source code for pyflink.common.restart_strategy

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from abc import ABCMeta
from datetime import timedelta
from typing import Optional

from py4j.java_gateway import get_java_class

from pyflink.java_gateway import get_gateway
from pyflink.util.utils import to_j_flink_time, from_j_flink_time

__all__ = ['RestartStrategies', 'RestartStrategyConfiguration']


[docs]class RestartStrategyConfiguration(object, metaclass=ABCMeta): """ Abstract configuration for restart strategies. """ def __init__(self, j_restart_strategy_configuration): self._j_restart_strategy_configuration = j_restart_strategy_configuration
[docs] def get_description(self) -> str: """ Returns a description which is shown in the web interface. :return: Description of the restart strategy. """ return self._j_restart_strategy_configuration.getDescription()
def __eq__(self, other): return isinstance(other, self.__class__) and \ self._j_restart_strategy_configuration == \ other._j_restart_strategy_configuration def __hash__(self): return self._j_restart_strategy_configuration.hashCode()
[docs]class RestartStrategies(object): """ This class defines methods to generate RestartStrategyConfigurations. These configurations are used to create RestartStrategies at runtime. The RestartStrategyConfigurations are used to decouple the core module from the runtime module. """
[docs] class NoRestartStrategyConfiguration(RestartStrategyConfiguration): """ Configuration representing no restart strategy. """ def __init__(self, j_restart_strategy=None): if j_restart_strategy is None: gateway = get_gateway() self._j_restart_strategy_configuration = \ gateway.jvm.RestartStrategies.NoRestartStrategyConfiguration() super(RestartStrategies.NoRestartStrategyConfiguration, self)\ .__init__(self._j_restart_strategy_configuration) else: super(RestartStrategies.NoRestartStrategyConfiguration, self) \ .__init__(j_restart_strategy)
[docs] class FixedDelayRestartStrategyConfiguration(RestartStrategyConfiguration): """ Configuration representing a fixed delay restart strategy. """ def __init__(self, restart_attempts=None, delay_between_attempts_interval=None, j_restart_strategy=None): if j_restart_strategy is None: if not isinstance(delay_between_attempts_interval, (timedelta, int)): raise TypeError("The delay_between_attempts_interval 'failure_interval' " "only supports integer and datetime.timedelta, current input " "type is %s." % type(delay_between_attempts_interval)) gateway = get_gateway() self._j_restart_strategy_configuration = \ gateway.jvm.RestartStrategies\ .fixedDelayRestart( restart_attempts, to_j_flink_time(delay_between_attempts_interval)) super(RestartStrategies.FixedDelayRestartStrategyConfiguration, self)\ .__init__(self._j_restart_strategy_configuration) else: super(RestartStrategies.FixedDelayRestartStrategyConfiguration, self) \ .__init__(j_restart_strategy)
[docs] def get_restart_attempts(self) -> int: return self._j_restart_strategy_configuration.getRestartAttempts()
[docs] def get_delay_between_attempts_interval(self) -> timedelta: return from_j_flink_time( self._j_restart_strategy_configuration.getDelayBetweenAttemptsInterval())
[docs] class FailureRateRestartStrategyConfiguration(RestartStrategyConfiguration): """ Configuration representing a failure rate restart strategy. """ def __init__(self, max_failure_rate=None, failure_interval=None, delay_between_attempts_interval=None, j_restart_strategy=None): if j_restart_strategy is None: if not isinstance(failure_interval, (timedelta, int)): raise TypeError("The parameter 'failure_interval' " "only supports integer and datetime.timedelta, current input " "type is %s." % type(failure_interval)) if not isinstance(delay_between_attempts_interval, (timedelta, int)): raise TypeError("The delay_between_attempts_interval 'failure_interval' " "only supports integer and datetime.timedelta, current input " "type is %s." % type(delay_between_attempts_interval)) gateway = get_gateway() self._j_restart_strategy_configuration = \ gateway.jvm.RestartStrategies\ .FailureRateRestartStrategyConfiguration(max_failure_rate, to_j_flink_time(failure_interval), to_j_flink_time( delay_between_attempts_interval)) super(RestartStrategies.FailureRateRestartStrategyConfiguration, self)\ .__init__(self._j_restart_strategy_configuration) else: super(RestartStrategies.FailureRateRestartStrategyConfiguration, self)\ .__init__(j_restart_strategy)
[docs] def get_max_failure_rate(self) -> int: return self._j_restart_strategy_configuration.getMaxFailureRate()
[docs] def get_failure_interval(self) -> timedelta: return from_j_flink_time(self._j_restart_strategy_configuration.getFailureInterval())
[docs] def get_delay_between_attempts_interval(self) -> timedelta: return from_j_flink_time(self._j_restart_strategy_configuration .getDelayBetweenAttemptsInterval())
[docs] class FallbackRestartStrategyConfiguration(RestartStrategyConfiguration): """ Restart strategy configuration that could be used by jobs to use cluster level restart strategy. Useful especially when one has a custom implementation of restart strategy set via flink-conf.yaml. """ def __init__(self, j_restart_strategy=None): if j_restart_strategy is None: gateway = get_gateway() self._j_restart_strategy_configuration = \ gateway.jvm.RestartStrategies.FallbackRestartStrategyConfiguration() super(RestartStrategies.FallbackRestartStrategyConfiguration, self)\ .__init__(self._j_restart_strategy_configuration) else: super(RestartStrategies.FallbackRestartStrategyConfiguration, self)\ .__init__(j_restart_strategy)
@staticmethod def _from_j_restart_strategy(j_restart_strategy) -> Optional[RestartStrategyConfiguration]: if j_restart_strategy is None: return None gateway = get_gateway() NoRestartStrategyConfiguration = gateway.jvm.RestartStrategies\ .NoRestartStrategyConfiguration FixedDelayRestartStrategyConfiguration = gateway.jvm.RestartStrategies\ .FixedDelayRestartStrategyConfiguration FailureRateRestartStrategyConfiguration = gateway.jvm.RestartStrategies\ .FailureRateRestartStrategyConfiguration FallbackRestartStrategyConfiguration = gateway.jvm.RestartStrategies\ .FallbackRestartStrategyConfiguration clz = j_restart_strategy.getClass() if clz.getName() == get_java_class(NoRestartStrategyConfiguration).getName(): return RestartStrategies.NoRestartStrategyConfiguration( j_restart_strategy=j_restart_strategy) elif clz.getName() == get_java_class(FixedDelayRestartStrategyConfiguration).getName(): return RestartStrategies.FixedDelayRestartStrategyConfiguration( j_restart_strategy=j_restart_strategy) elif clz.getName() == get_java_class(FailureRateRestartStrategyConfiguration).getName(): return RestartStrategies.FailureRateRestartStrategyConfiguration( j_restart_strategy=j_restart_strategy) elif clz.getName() == get_java_class(FallbackRestartStrategyConfiguration).getName(): return RestartStrategies.FallbackRestartStrategyConfiguration( j_restart_strategy=j_restart_strategy) else: raise Exception("Unsupported java RestartStrategyConfiguration: %s" % clz.getName())
[docs] @staticmethod def no_restart() -> 'NoRestartStrategyConfiguration': """ Generates NoRestartStrategyConfiguration. :return: The :class:`NoRestartStrategyConfiguration`. """ return RestartStrategies.NoRestartStrategyConfiguration()
[docs] @staticmethod def fall_back_restart() -> 'FallbackRestartStrategyConfiguration': return RestartStrategies.FallbackRestartStrategyConfiguration()
[docs] @staticmethod def fixed_delay_restart(restart_attempts: int, delay_between_attempts: int) -> \ 'FixedDelayRestartStrategyConfiguration': """ Generates a FixedDelayRestartStrategyConfiguration. :param restart_attempts: Number of restart attempts for the FixedDelayRestartStrategy. :param delay_between_attempts: Delay in-between restart attempts for the FixedDelayRestartStrategy, the input could be integer value in milliseconds or datetime.timedelta object. :return: The :class:`FixedDelayRestartStrategyConfiguration`. """ return RestartStrategies.FixedDelayRestartStrategyConfiguration(restart_attempts, delay_between_attempts)
[docs] @staticmethod def failure_rate_restart(failure_rate: int, failure_interval: int, delay_interval: int) -> \ 'FailureRateRestartStrategyConfiguration': """ Generates a FailureRateRestartStrategyConfiguration. :param failure_rate: Maximum number of restarts in given interval ``failure_interval`` before failing a job. :param failure_interval: Time interval for failures, the input could be integer value in milliseconds or datetime.timedelta object. :param delay_interval: Delay in-between restart attempts, the input could be integer value in milliseconds or datetime.timedelta object. """ return RestartStrategies.FailureRateRestartStrategyConfiguration(failure_rate, failure_interval, delay_interval)