################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from abc import ABCMeta
from datetime import timedelta
from typing import Optional
from py4j.java_gateway import get_java_class
from pyflink.java_gateway import get_gateway
from pyflink.util.java_utils import to_j_flink_time, from_j_flink_time
__all__ = ['RestartStrategies', 'RestartStrategyConfiguration']
[docs]class RestartStrategyConfiguration(object, metaclass=ABCMeta):
"""
Abstract configuration for restart strategies.
"""
def __init__(self, j_restart_strategy_configuration):
self._j_restart_strategy_configuration = j_restart_strategy_configuration
[docs] def get_description(self) -> str:
"""
Returns a description which is shown in the web interface.
:return: Description of the restart strategy.
"""
return self._j_restart_strategy_configuration.getDescription()
def __eq__(self, other):
return isinstance(other, self.__class__) and \
self._j_restart_strategy_configuration == \
other._j_restart_strategy_configuration
def __hash__(self):
return self._j_restart_strategy_configuration.hashCode()
[docs]class RestartStrategies(object):
"""
This class defines methods to generate RestartStrategyConfigurations. These configurations are
used to create RestartStrategies at runtime.
The RestartStrategyConfigurations are used to decouple the core module from the runtime module.
"""
[docs] class NoRestartStrategyConfiguration(RestartStrategyConfiguration):
"""
Configuration representing no restart strategy.
"""
def __init__(self, j_restart_strategy=None):
if j_restart_strategy is None:
gateway = get_gateway()
self._j_restart_strategy_configuration = \
gateway.jvm.RestartStrategies.NoRestartStrategyConfiguration()
super(RestartStrategies.NoRestartStrategyConfiguration, self)\
.__init__(self._j_restart_strategy_configuration)
else:
super(RestartStrategies.NoRestartStrategyConfiguration, self) \
.__init__(j_restart_strategy)
[docs] class FixedDelayRestartStrategyConfiguration(RestartStrategyConfiguration):
"""
Configuration representing a fixed delay restart strategy.
"""
def __init__(self, restart_attempts=None, delay_between_attempts_interval=None,
j_restart_strategy=None):
if j_restart_strategy is None:
if not isinstance(delay_between_attempts_interval, (timedelta, int)):
raise TypeError("The delay_between_attempts_interval 'failure_interval' "
"only supports integer and datetime.timedelta, current input "
"type is %s." % type(delay_between_attempts_interval))
gateway = get_gateway()
self._j_restart_strategy_configuration = \
gateway.jvm.RestartStrategies\
.fixedDelayRestart(
restart_attempts, to_j_flink_time(delay_between_attempts_interval))
super(RestartStrategies.FixedDelayRestartStrategyConfiguration, self)\
.__init__(self._j_restart_strategy_configuration)
else:
super(RestartStrategies.FixedDelayRestartStrategyConfiguration, self) \
.__init__(j_restart_strategy)
[docs] def get_restart_attempts(self) -> int:
return self._j_restart_strategy_configuration.getRestartAttempts()
[docs] def get_delay_between_attempts_interval(self) -> timedelta:
return from_j_flink_time(
self._j_restart_strategy_configuration.getDelayBetweenAttemptsInterval())
[docs] class FailureRateRestartStrategyConfiguration(RestartStrategyConfiguration):
"""
Configuration representing a failure rate restart strategy.
"""
def __init__(self,
max_failure_rate=None,
failure_interval=None,
delay_between_attempts_interval=None,
j_restart_strategy=None):
if j_restart_strategy is None:
if not isinstance(failure_interval, (timedelta, int)):
raise TypeError("The parameter 'failure_interval' "
"only supports integer and datetime.timedelta, current input "
"type is %s." % type(failure_interval))
if not isinstance(delay_between_attempts_interval, (timedelta, int)):
raise TypeError("The delay_between_attempts_interval 'failure_interval' "
"only supports integer and datetime.timedelta, current input "
"type is %s." % type(delay_between_attempts_interval))
gateway = get_gateway()
self._j_restart_strategy_configuration = \
gateway.jvm.RestartStrategies\
.FailureRateRestartStrategyConfiguration(max_failure_rate,
to_j_flink_time(failure_interval),
to_j_flink_time(
delay_between_attempts_interval))
super(RestartStrategies.FailureRateRestartStrategyConfiguration, self)\
.__init__(self._j_restart_strategy_configuration)
else:
super(RestartStrategies.FailureRateRestartStrategyConfiguration, self)\
.__init__(j_restart_strategy)
[docs] def get_max_failure_rate(self) -> int:
return self._j_restart_strategy_configuration.getMaxFailureRate()
[docs] def get_failure_interval(self) -> timedelta:
return from_j_flink_time(self._j_restart_strategy_configuration.getFailureInterval())
[docs] def get_delay_between_attempts_interval(self) -> timedelta:
return from_j_flink_time(self._j_restart_strategy_configuration
.getDelayBetweenAttemptsInterval())
[docs] class FallbackRestartStrategyConfiguration(RestartStrategyConfiguration):
"""
Restart strategy configuration that could be used by jobs to use cluster level restart
strategy. Useful especially when one has a custom implementation of restart strategy set via
flink-conf.yaml.
"""
def __init__(self, j_restart_strategy=None):
if j_restart_strategy is None:
gateway = get_gateway()
self._j_restart_strategy_configuration = \
gateway.jvm.RestartStrategies.FallbackRestartStrategyConfiguration()
super(RestartStrategies.FallbackRestartStrategyConfiguration, self)\
.__init__(self._j_restart_strategy_configuration)
else:
super(RestartStrategies.FallbackRestartStrategyConfiguration, self)\
.__init__(j_restart_strategy)
@staticmethod
def _from_j_restart_strategy(j_restart_strategy) -> Optional[RestartStrategyConfiguration]:
if j_restart_strategy is None:
return None
gateway = get_gateway()
NoRestartStrategyConfiguration = gateway.jvm.RestartStrategies\
.NoRestartStrategyConfiguration
FixedDelayRestartStrategyConfiguration = gateway.jvm.RestartStrategies\
.FixedDelayRestartStrategyConfiguration
FailureRateRestartStrategyConfiguration = gateway.jvm.RestartStrategies\
.FailureRateRestartStrategyConfiguration
FallbackRestartStrategyConfiguration = gateway.jvm.RestartStrategies\
.FallbackRestartStrategyConfiguration
clz = j_restart_strategy.getClass()
if clz.getName() == get_java_class(NoRestartStrategyConfiguration).getName():
return RestartStrategies.NoRestartStrategyConfiguration(
j_restart_strategy=j_restart_strategy)
elif clz.getName() == get_java_class(FixedDelayRestartStrategyConfiguration).getName():
return RestartStrategies.FixedDelayRestartStrategyConfiguration(
j_restart_strategy=j_restart_strategy)
elif clz.getName() == get_java_class(FailureRateRestartStrategyConfiguration).getName():
return RestartStrategies.FailureRateRestartStrategyConfiguration(
j_restart_strategy=j_restart_strategy)
elif clz.getName() == get_java_class(FallbackRestartStrategyConfiguration).getName():
return RestartStrategies.FallbackRestartStrategyConfiguration(
j_restart_strategy=j_restart_strategy)
else:
raise Exception("Unsupported java RestartStrategyConfiguration: %s" % clz.getName())
[docs] @staticmethod
def no_restart() -> 'NoRestartStrategyConfiguration':
"""
Generates NoRestartStrategyConfiguration.
:return: The :class:`NoRestartStrategyConfiguration`.
"""
return RestartStrategies.NoRestartStrategyConfiguration()
[docs] @staticmethod
def fall_back_restart() -> 'FallbackRestartStrategyConfiguration':
return RestartStrategies.FallbackRestartStrategyConfiguration()
[docs] @staticmethod
def fixed_delay_restart(restart_attempts: int, delay_between_attempts: int) -> \
'FixedDelayRestartStrategyConfiguration':
"""
Generates a FixedDelayRestartStrategyConfiguration.
:param restart_attempts: Number of restart attempts for the FixedDelayRestartStrategy.
:param delay_between_attempts: Delay in-between restart attempts for the
FixedDelayRestartStrategy, the input could be integer value
in milliseconds or datetime.timedelta object.
:return: The :class:`FixedDelayRestartStrategyConfiguration`.
"""
return RestartStrategies.FixedDelayRestartStrategyConfiguration(restart_attempts,
delay_between_attempts)
[docs] @staticmethod
def failure_rate_restart(failure_rate: int, failure_interval: int, delay_interval: int) -> \
'FailureRateRestartStrategyConfiguration':
"""
Generates a FailureRateRestartStrategyConfiguration.
:param failure_rate: Maximum number of restarts in given interval ``failure_interval``
before failing a job.
:param failure_interval: Time interval for failures, the input could be integer value
in milliseconds or datetime.timedelta object.
:param delay_interval: Delay in-between restart attempts, the input could be integer value
in milliseconds or datetime.timedelta object.
"""
return RestartStrategies.FailureRateRestartStrategyConfiguration(failure_rate,
failure_interval,
delay_interval)