################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from abc import ABC, abstractmethod
from enum import Enum
from typing import TypeVar, Generic, Iterable, List, Iterator, Dict, Tuple, Optional
from pyflink.common.time import Duration, Time
from pyflink.common.typeinfo import TypeInformation, Types
__all__ = [
'ValueStateDescriptor',
'ValueState',
'ListStateDescriptor',
'ListState',
'MapStateDescriptor',
'MapState',
'ReducingStateDescriptor',
'ReducingState',
'AggregatingStateDescriptor',
'AggregatingState',
'ReadOnlyBroadcastState',
'BroadcastState',
'StateTtlConfig',
'OperatorStateStore',
]
T = TypeVar('T')
K = TypeVar('K')
V = TypeVar('V')
IN = TypeVar('IN')
OUT = TypeVar('OUT')
class OperatorStateStore(ABC):
"""
Interface for getting operator states. Currently, only :class:`~state.BroadcastState` is
supported.
.. versionadded:: 1.16.0
"""
[docs] @abstractmethod
def get_broadcast_state(self, state_descriptor: 'MapStateDescriptor') -> 'BroadcastState':
"""
Fetches the :class:`~state.BroadcastState` described by :class:`~state.MapStateDescriptor`,
which has read/write access to the broadcast operator state.
"""
pass
class State(ABC):
"""
Interface that different types of partitioned state must implement.
"""
@abstractmethod
def clear(self) -> None:
"""
Removes the value mapped under the current key.
"""
pass
[docs]class ValueState(State, Generic[T]):
"""
:class:`State` interface for partitioned single-value state. The value can be retrieved or
updated.
The state is accessed and modified by user functions, and checkpointed consistently by the
system as part of the distributed snapshots.
"""
@abstractmethod
def value(self) -> T:
"""
Returns the current value for the state. When the state is not partitioned the returned
value is the same for all inputs in a given operator instance. If state partitioning is
applied, the value returned depends on the current operator input, as the operator
maintains an independent state for each partition.
"""
pass
@abstractmethod
def update(self, value: T) -> None:
"""
Updates the operator state accessible by :func:`value` to the given value. The next time
:func:`value` is called (for the same state partition) the returned state will represent
the updated value. When a partitioned state is updated with null, the state for the current
key will be removed and the default value is returned on the next access.
"""
pass
[docs]class AppendingState(State, Generic[IN, OUT]):
"""
Base interface for partitioned state that supports adding elements and inspecting the current
state. Elements can either be kept in a buffer (list-like) or aggregated into one value.
This state is accessed and modified by user functions, and checkpointed consistently by the
system as part of the distributed snapshots.
The state is only accessible by functions applied on a KeyedStream. The key is automatically
supplied by the system, so the function always sees the value mapped to the key of the current
element. That way, the system can handle stream and state partitioning consistently together.
"""
@abstractmethod
def get(self) -> OUT:
"""
Returns the elements under the current key.
"""
pass
@abstractmethod
def add(self, value: IN) -> None:
"""
Adding the given value to the tail of this list state.
"""
pass
[docs]class MergingState(AppendingState[IN, OUT]):
"""
Extension of AppendingState that allows merging of state. That is, two instance of MergingState
can be combined into a single instance that contains all the information of the two merged
states.
"""
pass
[docs]class ReducingState(MergingState[T, T]):
"""
:class:`State` interface for reducing state. Elements can be added to the state, they will be
combined using a reduce function. The current state can be inspected.
The state is accessed and modified by user functions, and checkpointed consistently by the
system as part of the distributed snapshots.
The state is only accessible by functions applied on a KeyedStream. The key is automatically
supplied by the system, so the function always sees the value mapped to the key of the current
element. That way, the system can handle stream and state partitioning consistently together.
"""
pass
[docs]class AggregatingState(MergingState[IN, OUT]):
"""
:class:`State` interface for aggregating state, based on an
:class:`~pyflink.datastream.functions.AggregateFunction`. Elements that are added to this type
of state will be eagerly pre-aggregated using a given AggregateFunction.
The state holds internally always the accumulator type of the AggregateFunction. When
accessing the result of the state, the function's
:func:`~pyflink.datastream.functions.AggregateFunction.get_result` method.
The state is accessed and modified by user functions, and checkpointed consistently by the
system as part of the distributed snapshots.
The state is only accessible by functions applied on a KeyedStream. The key is automatically
supplied by the system, so the function always sees the value mapped to the key of the current
element. That way, the system can handle stream and state partitioning consistently together.
"""
pass
[docs]class ListState(MergingState[T, Iterable[T]]):
"""
:class:`State` interface for partitioned list state in Operations.
The state is accessed and modified by user functions, and checkpointed consistently
by the system as part of the distributed snapshots.
Currently only keyed list state is supported.
When it is a keyed list state, the state key is automatically supplied by the system, so the
user function always sees the value mapped to the key of the current element. That way, the
system can handle stream and state partitioning consistently together.
"""
@abstractmethod
def update(self, values: List[T]) -> None:
"""
Updating existing values to the given list of values.
"""
pass
@abstractmethod
def add_all(self, values: List[T]) -> None:
"""
Adding the given values to the tail of this list state.
"""
pass
def __iter__(self) -> Iterator[T]:
return iter(self.get())
[docs]class MapState(State, Generic[K, V]):
"""
:class:`State` interface for partitioned key-value state. The key-value pair can be added,
updated and retrieved.
The state is accessed and modified by user functions, and checkpointed consistently by the
system as part of the distributed snapshots.
The state key is automatically supplied by the system, so the function always sees the value
mapped to the key of the current element. That way, the system can handle stream and state
partitioning consistently together.
"""
@abstractmethod
def get(self, key: K) -> V:
"""
Returns the current value associated with the given key.
"""
pass
@abstractmethod
def put(self, key: K, value: V) -> None:
"""
Associates a new value with the given key.
"""
pass
@abstractmethod
def put_all(self, dict_value: Dict[K, V]) -> None:
"""
Copies all of the mappings from the given map into the state.
"""
pass
@abstractmethod
def remove(self, key: K) -> None:
"""
Deletes the mapping of the given key.
"""
pass
@abstractmethod
def contains(self, key: K) -> bool:
"""
Returns whether there exists the given mapping.
"""
pass
@abstractmethod
def items(self) -> Iterable[Tuple[K, V]]:
"""
Returns all the mappings in the state.
"""
pass
@abstractmethod
def keys(self) -> Iterable[K]:
"""
Returns all the keys in the state.
"""
pass
@abstractmethod
def values(self) -> Iterable[V]:
"""
Returns all the values in the state.
"""
pass
@abstractmethod
def is_empty(self) -> bool:
"""
Returns true if this state contains no key-value mappings, otherwise false.
"""
pass
def __getitem__(self, key: K) -> V:
return self.get(key)
def __setitem__(self, key: K, value: V) -> None:
self.put(key, value)
def __delitem__(self, key: K) -> None:
self.remove(key)
def __contains__(self, key: K) -> bool:
return self.contains(key)
def __iter__(self) -> Iterator[K]:
return iter(self.keys())
[docs]class ReadOnlyBroadcastState(State, Generic[K, V]):
"""
A read-only view of the :class:`BroadcastState`.
Although read-only, the user code should not modify the value returned by the :meth:`get` or the
items returned by :meth:`items`, as this can lead to inconsistent states. The reason for this is
that we do not create extra copies of the elements for performance reasons.
"""
@abstractmethod
def get(self, key: K) -> V:
"""
Returns the current value associated with the given key.
"""
pass
@abstractmethod
def contains(self, key: K) -> bool:
"""
Returns whether there exists the given mapping.
"""
pass
@abstractmethod
def items(self) -> Iterable[Tuple[K, V]]:
"""
Returns all the mappings in the state.
"""
pass
@abstractmethod
def keys(self) -> Iterable[K]:
"""
Returns all the keys in the state.
"""
pass
@abstractmethod
def values(self) -> Iterable[V]:
"""
Returns all the values in the state.
"""
pass
@abstractmethod
def is_empty(self) -> bool:
"""
Returns true if this state contains no key-value mappings, otherwise false.
"""
pass
def __getitem__(self, key: K) -> V:
return self.get(key)
def __contains__(self, key: K) -> bool:
return self.contains(key)
def __iter__(self) -> Iterator[K]:
return iter(self.keys())
[docs]class BroadcastState(ReadOnlyBroadcastState[K, V]):
"""
A type of state that can be created to store the state of a :class:`BroadcastStream`. This state
assumes that the same elements are sent to all instances of an operator.
CAUTION: the user has to guarantee that all task instances store the same elements in
this type of state.
Each operator instance individually maintains and stores elements in the broadcast state. The
fact that the incoming stream is a broadcast one guarantees that all instances see all the
elements. Upon recovery or re-scaling, the same state is given to each of the instances.
To avoid hotspots, each task reads its previous partition, and if there are more tasks (scale up
), then the new instances read from the old instances in a round-robin fashion. This is why each
instance has to guarantee that it stores the same elements as the rest. If not, upon recovery or
rescaling you may have unpredictable redistribution of the partitions, thus unpredictable
results.
"""
@abstractmethod
def put(self, key: K, value: V) -> None:
"""
Associates a new value with the given key.
"""
pass
@abstractmethod
def put_all(self, dict_value: Dict[K, V]) -> None:
"""
Copies all of the mappings from the given map into the state.
"""
pass
@abstractmethod
def remove(self, key: K) -> None:
"""
Deletes the mapping of the given key.
"""
pass
def __setitem__(self, key: K, value: V) -> None:
self.put(key, value)
def __delitem__(self, key: K) -> None:
self.remove(key)
class StateDescriptor(ABC):
"""
Base class for state descriptors. A StateDescriptor is used for creating partitioned State in
stateful operations.
"""
def __init__(self, name: str, type_info: TypeInformation):
"""
Constructor for StateDescriptor.
:param name: The name of the state
:param type_info: The type information of the value.
"""
self.name = name
self.type_info = type_info
self._ttl_config = None # type: Optional[StateTtlConfig]
def get_name(self) -> str:
"""
Get the name of the state.
:return: The name of the state.
"""
return self.name
def enable_time_to_live(self, ttl_config: 'StateTtlConfig'):
"""
Configures optional activation of state time-to-live (TTL).
State user value will expire, become unavailable and be cleaned up in storage depending on
configured StateTtlConfig.
:param ttl_config: Configuration of state TTL
"""
self._ttl_config = ttl_config
[docs]class ValueStateDescriptor(StateDescriptor):
"""
StateDescriptor for ValueState. This can be used to create partitioned value state using
RuntimeContext.get_state(ValueStateDescriptor).
"""
def __init__(self, name: str, value_type_info: TypeInformation):
"""
Constructor of the ValueStateDescriptor.
:param name: The name of the state.
:param value_type_info: the type information of the state.
"""
super(ValueStateDescriptor, self).__init__(name, value_type_info)
[docs]class ListStateDescriptor(StateDescriptor):
"""
StateDescriptor for ListState. This can be used to create state where the type is a list that
can be appended and iterated over.
"""
def __init__(self, name: str, elem_type_info: TypeInformation):
"""
Constructor of the ListStateDescriptor.
:param name: The name of the state.
:param elem_type_info: the type information of the state element.
"""
super(ListStateDescriptor, self).__init__(name, Types.LIST(elem_type_info))
[docs]class MapStateDescriptor(StateDescriptor):
"""
StateDescriptor for MapState. This can be used to create state where the type is a map that can
be updated and iterated over.
"""
def __init__(self, name: str, key_type_info: TypeInformation, value_type_info: TypeInformation):
"""
Constructor of the MapStateDescriptor.
:param name: The name of the state.
:param key_type_info: The type information of the key.
:param value_type_info: the type information of the value.
"""
super(MapStateDescriptor, self).__init__(name, Types.MAP(key_type_info, value_type_info))
[docs]class ReducingStateDescriptor(StateDescriptor):
"""
StateDescriptor for ReducingState. This can be used to create partitioned reducing state using
RuntimeContext.get_reducing_state(ReducingStateDescriptor).
"""
def __init__(self,
name: str,
reduce_function,
type_info: TypeInformation):
"""
Constructor of the ReducingStateDescriptor.
:param name: The name of the state.
:param reduce_function: The ReduceFunction used to aggregate the state.
:param type_info: The type of the values in the state.
"""
super(ReducingStateDescriptor, self).__init__(name, type_info)
from pyflink.datastream.functions import ReduceFunction, ReduceFunctionWrapper
if not isinstance(reduce_function, ReduceFunction):
if callable(reduce_function):
reduce_function = ReduceFunctionWrapper(reduce_function) # type: ignore
else:
raise TypeError("The input must be a ReduceFunction or a callable function!")
self._reduce_function = reduce_function
def get_reduce_function(self):
return self._reduce_function
[docs]class AggregatingStateDescriptor(StateDescriptor):
"""
A StateDescriptor for AggregatingState.
The type internally stored in the state is the type of the Accumulator of the
:func:`~pyflink.datastream.functions.AggregateFunction`.
"""
def __init__(self,
name: str,
agg_function,
state_type_info):
super(AggregatingStateDescriptor, self).__init__(name, state_type_info)
from pyflink.datastream.functions import AggregateFunction
if not isinstance(agg_function, AggregateFunction):
raise TypeError("The input must be a pyflink.datastream.functions.AggregateFunction!")
self._agg_function = agg_function
def get_agg_function(self):
return self._agg_function
class StateTtlConfig(object):
[docs] class UpdateType(Enum):
"""
This option value configures when to update last access timestamp which prolongs state TTL.
"""
Disabled = 0
"""
TTL is disabled. State does not expire.
"""
OnCreateAndWrite = 1
"""
Last access timestamp is initialised when state is created and updated on every write
operation.
"""
OnReadAndWrite = 2
"""
The same as OnCreateAndWrite but also updated on read.
"""
def _to_proto(self):
from pyflink.fn_execution.flink_fn_execution_pb2 import StateDescriptor
return getattr(StateDescriptor.StateTTLConfig.UpdateType, self.name)
@staticmethod
def _from_proto(proto):
from pyflink.fn_execution.flink_fn_execution_pb2 import StateDescriptor
update_type_name = StateDescriptor.StateTTLConfig.UpdateType.Name(proto)
return StateTtlConfig.UpdateType[update_type_name]
[docs] class StateVisibility(Enum):
"""
This option configures whether expired user value can be returned or not.
"""
ReturnExpiredIfNotCleanedUp = 0
"""
Return expired user value if it is not cleaned up yet.
"""
NeverReturnExpired = 1
"""
Never return expired user value.
"""
def _to_proto(self):
from pyflink.fn_execution.flink_fn_execution_pb2 import StateDescriptor
return getattr(StateDescriptor.StateTTLConfig.StateVisibility, self.name)
@staticmethod
def _from_proto(proto):
from pyflink.fn_execution.flink_fn_execution_pb2 import StateDescriptor
state_visibility_name = StateDescriptor.StateTTLConfig.StateVisibility.Name(proto)
return StateTtlConfig.StateVisibility[state_visibility_name]
[docs] class TtlTimeCharacteristic(Enum):
"""
This option configures time scale to use for ttl.
"""
ProcessingTime = 0
"""
Processing time
"""
def _to_proto(self):
from pyflink.fn_execution.flink_fn_execution_pb2 import StateDescriptor
return getattr(StateDescriptor.StateTTLConfig.TtlTimeCharacteristic, self.name)
@staticmethod
def _from_proto(proto):
from pyflink.fn_execution.flink_fn_execution_pb2 import StateDescriptor
ttl_time_characteristic_name = \
StateDescriptor.StateTTLConfig.TtlTimeCharacteristic.Name(proto)
return StateTtlConfig.TtlTimeCharacteristic[ttl_time_characteristic_name]
def __init__(self,
update_type: UpdateType,
state_visibility: StateVisibility,
ttl_time_characteristic: TtlTimeCharacteristic,
ttl: Time,
cleanup_strategies: 'StateTtlConfig.CleanupStrategies'):
self._update_type = update_type
self._state_visibility = state_visibility
self._ttl_time_characteristic = ttl_time_characteristic
self._ttl = ttl
self._cleanup_strategies = cleanup_strategies
[docs] @staticmethod
def new_builder(ttl: Time):
return StateTtlConfig.Builder(ttl)
[docs] def get_update_type(self) -> 'StateTtlConfig.UpdateType':
return self._update_type
[docs] def get_state_visibility(self) -> 'StateTtlConfig.StateVisibility':
return self._state_visibility
[docs] def get_ttl(self) -> Time:
return self._ttl
[docs] def get_ttl_time_characteristic(self) -> 'StateTtlConfig.TtlTimeCharacteristic':
return self._ttl_time_characteristic
[docs] def is_enabled(self) -> bool:
return self._update_type.value != StateTtlConfig.UpdateType.Disabled.value
[docs] def get_cleanup_strategies(self) -> 'StateTtlConfig.CleanupStrategies':
return self._cleanup_strategies
def _to_proto(self):
from pyflink.fn_execution.flink_fn_execution_pb2 import StateDescriptor
state_ttl_config = StateDescriptor.StateTTLConfig()
state_ttl_config.update_type = self._update_type._to_proto()
state_ttl_config.state_visibility = self._state_visibility._to_proto()
state_ttl_config.ttl_time_characteristic = self._ttl_time_characteristic._to_proto()
state_ttl_config.ttl = self._ttl.to_milliseconds()
state_ttl_config.cleanup_strategies.CopyFrom(self._cleanup_strategies._to_proto())
return state_ttl_config
@staticmethod
def _from_proto(proto):
update_type = StateTtlConfig.UpdateType._from_proto(proto.update_type)
state_visibility = StateTtlConfig.StateVisibility._from_proto(proto.state_visibility)
ttl_time_characteristic = \
StateTtlConfig.TtlTimeCharacteristic._from_proto(proto.ttl_time_characteristic)
ttl = Time.milliseconds(proto.ttl)
cleanup_strategies = StateTtlConfig.CleanupStrategies._from_proto(proto.cleanup_strategies)
builder = StateTtlConfig.new_builder(ttl) \
.set_update_type(update_type) \
.set_state_visibility(state_visibility) \
.set_ttl_time_characteristic(ttl_time_characteristic)
builder._strategies = cleanup_strategies._strategies
builder._is_cleanup_in_background = cleanup_strategies._is_cleanup_in_background
return builder.build()
def __repr__(self):
return "StateTtlConfig<" \
"update_type={}," \
" state_visibility={}," \
"ttl_time_characteristic ={}," \
"ttl={}>".format(self._update_type,
self._state_visibility,
self._ttl_time_characteristic,
self._ttl)
class Builder(object):
"""
Builder for the StateTtlConfig.
"""
def __init__(self, ttl: Time):
self._ttl = ttl
self._update_type = StateTtlConfig.UpdateType.OnCreateAndWrite
self._state_visibility = StateTtlConfig.StateVisibility.NeverReturnExpired
self._ttl_time_characteristic = StateTtlConfig.TtlTimeCharacteristic.ProcessingTime
self._is_cleanup_in_background = True
self._strategies = {} # type: Dict
def set_update_type(self,
update_type: 'StateTtlConfig.UpdateType') -> 'StateTtlConfig.Builder':
"""
Sets the ttl update type.
:param update_type: The ttl update type configures when to update last access timestamp
which prolongs state TTL.
"""
self._update_type = update_type
return self
def update_ttl_on_create_and_write(self) -> 'StateTtlConfig.Builder':
return self.set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
def update_ttl_on_read_and_write(self) -> 'StateTtlConfig.Builder':
return self.set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite)
def set_state_visibility(
self,
state_visibility: 'StateTtlConfig.StateVisibility') -> 'StateTtlConfig.Builder':
"""
Sets the state visibility.
:param state_visibility: The state visibility configures whether expired user value can
be returned or not.
"""
self._state_visibility = state_visibility
return self
def return_expired_if_not_cleaned_up(self) -> 'StateTtlConfig.Builder':
return self.set_state_visibility(
StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
def never_return_expired(self) -> 'StateTtlConfig.Builder':
return self.set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
def set_ttl_time_characteristic(
self,
ttl_time_characteristic: 'StateTtlConfig.TtlTimeCharacteristic') \
-> 'StateTtlConfig.Builder':
"""
Sets the time characteristic.
:param ttl_time_characteristic: The time characteristic configures time scale to use for
ttl.
"""
self._ttl_time_characteristic = ttl_time_characteristic
return self
def use_processing_time(self) -> 'StateTtlConfig.Builder':
return self.set_ttl_time_characteristic(
StateTtlConfig.TtlTimeCharacteristic.ProcessingTime)
def cleanup_full_snapshot(self) -> 'StateTtlConfig.Builder':
"""
Cleanup expired state in full snapshot on checkpoint.
"""
self._strategies[
StateTtlConfig.CleanupStrategies.Strategies.FULL_STATE_SCAN_SNAPSHOT] = \
StateTtlConfig.CleanupStrategies.EMPTY_STRATEGY
return self
def cleanup_incrementally(self,
cleanup_size: int,
run_cleanup_for_every_record) -> 'StateTtlConfig.Builder':
"""
Cleanup expired state incrementally cleanup local state.
Upon every state access this cleanup strategy checks a bunch of state keys for
expiration and cleans up expired ones. It keeps a lazy iterator through all keys with
relaxed consistency if backend supports it. This way all keys should be regularly
checked and cleaned eventually over time if any state is constantly being accessed.
Additionally to the incremental cleanup upon state access, it can also run per every
record. Caution: if there are a lot of registered states using this option, they all
will be iterated for every record to check if there is something to cleanup.
if no access happens to this state or no records are processed in case of
run_cleanup_for_every_record, expired state will persist.
Time spent for the incremental cleanup increases record processing latency.
Note:
At the moment incremental cleanup is implemented only for Heap state backend.
Setting it for RocksDB will have no effect.
Note:
If heap state backend is used with synchronous snapshotting, the global iterator keeps a
copy of all keys while iterating because of its specific implementation which does not
support concurrent modifications. Enabling of this feature will increase memory
consumption then. Asynchronous snapshotting does not have this problem.
:param cleanup_size: max number of keys pulled from queue for clean up upon state touch
for any key
:param run_cleanup_for_every_record: run incremental cleanup per each processed record
"""
self._strategies[StateTtlConfig.CleanupStrategies.Strategies.INCREMENTAL_CLEANUP] = \
StateTtlConfig.CleanupStrategies.IncrementalCleanupStrategy(
cleanup_size, run_cleanup_for_every_record)
return self
def cleanup_in_rocksdb_compact_filter(
self,
query_time_after_num_entries,
periodic_compaction_time=None) -> \
'StateTtlConfig.Builder':
"""
Cleanup expired state while Rocksdb compaction is running.
RocksDB compaction filter will query current timestamp, used to check expiration, from
Flink every time after processing {@code queryTimeAfterNumEntries} number of state
entries. Updating the timestamp more often can improve cleanup speed but it decreases
compaction performance because it uses JNI call from native code.
Periodic compaction could speed up expired state entries cleanup, especially for state
entries rarely accessed. Files older than this value will be picked up for compaction,
and re-written to the same level as they were before. It makes sure a file goes through
compaction filters periodically.
:param query_time_after_num_entries: number of state entries to process by compaction
filter before updating current timestamp
:param periodic_compaction_time: periodic compaction which could
speed up expired state cleanup. 0 means turning off periodic compaction.
:return:
"""
self._strategies[
StateTtlConfig.CleanupStrategies.Strategies.ROCKSDB_COMPACTION_FILTER] = \
StateTtlConfig.CleanupStrategies.RocksdbCompactFilterCleanupStrategy(
query_time_after_num_entries,
periodic_compaction_time if periodic_compaction_time else Duration.of_days(30))
return self
def disable_cleanup_in_background(self) -> 'StateTtlConfig.Builder':
"""
Disable default cleanup of expired state in background (enabled by default).
If some specific cleanup is configured, e.g. :func:`cleanup_incrementally` or
:func:`cleanup_in_rocksdb_compact_filter`, this setting does not disable it.
"""
self._is_cleanup_in_background = False
return self
def set_ttl(self, ttl: Time) -> 'StateTtlConfig.Builder':
"""
Sets the ttl time.
:param ttl: The ttl time.
"""
self._ttl = ttl
return self
def build(self) -> 'StateTtlConfig':
return StateTtlConfig(
self._update_type,
self._state_visibility,
self._ttl_time_characteristic,
self._ttl,
StateTtlConfig.CleanupStrategies(self._strategies, self._is_cleanup_in_background)
)
[docs] class CleanupStrategies(object):
"""
TTL cleanup strategies.
This class configures when to cleanup expired state with TTL. By default, state is always
cleaned up on explicit read access if found expired. Currently cleanup of state full
snapshot can be additionally activated.
"""
[docs] class Strategies(Enum):
"""
Fixed strategies ordinals in strategies config field.
"""
FULL_STATE_SCAN_SNAPSHOT = 0
INCREMENTAL_CLEANUP = 1
ROCKSDB_COMPACTION_FILTER = 2
def _to_proto(self):
from pyflink.fn_execution.flink_fn_execution_pb2 import StateDescriptor
return getattr(
StateDescriptor.StateTTLConfig.CleanupStrategies.Strategies, self.name)
@staticmethod
def _from_proto(proto):
from pyflink.fn_execution.flink_fn_execution_pb2 import StateDescriptor
strategies_name = \
StateDescriptor.StateTTLConfig.CleanupStrategies.Strategies.Name(proto)
return StateTtlConfig.CleanupStrategies.Strategies[strategies_name]
[docs] class CleanupStrategy(ABC):
"""
Base interface for cleanup strategies configurations.
"""
pass
[docs] class EmptyCleanupStrategy(CleanupStrategy):
pass
[docs] class IncrementalCleanupStrategy(CleanupStrategy):
"""
Configuration of cleanup strategy while taking the full snapshot.
"""
def __init__(self, cleanup_size: int, run_cleanup_for_every_record: bool):
self._cleanup_size = cleanup_size
self._run_cleanup_for_every_record = run_cleanup_for_every_record
def get_cleanup_size(self) -> int:
return self._cleanup_size
def run_cleanup_for_every_record(self) -> bool:
return self._run_cleanup_for_every_record
[docs] class RocksdbCompactFilterCleanupStrategy(CleanupStrategy):
"""
Configuration of cleanup strategy using custom compaction filter in RocksDB.
"""
def __init__(self,
query_time_after_num_entries: int,
periodic_compaction_time=None):
self._query_time_after_num_entries = query_time_after_num_entries
self._periodic_compaction_time = periodic_compaction_time \
if periodic_compaction_time else Duration.of_days(30)
def get_query_time_after_num_entries(self) -> int:
return self._query_time_after_num_entries
def get_periodic_compaction_time(self) -> Duration:
return self._periodic_compaction_time
EMPTY_STRATEGY = EmptyCleanupStrategy()
def __init__(self,
strategies: Dict[Strategies, CleanupStrategy],
is_cleanup_in_background: bool):
self._strategies = strategies
self._is_cleanup_in_background = is_cleanup_in_background
def is_cleanup_in_background(self) -> bool:
return self._is_cleanup_in_background
def in_full_snapshot(self) -> bool:
return (StateTtlConfig.CleanupStrategies.Strategies.FULL_STATE_SCAN_SNAPSHOT in
self._strategies)
def get_incremental_cleanup_strategy(self) \
-> 'StateTtlConfig.CleanupStrategies.IncrementalCleanupStrategy':
if self._is_cleanup_in_background:
default_strategy = \
StateTtlConfig.CleanupStrategies.IncrementalCleanupStrategy(5, False)
else:
default_strategy = None
return self._strategies.get( # type: ignore
StateTtlConfig.CleanupStrategies.Strategies.INCREMENTAL_CLEANUP,
default_strategy)
def get_rocksdb_compact_filter_cleanup_strategy(self) \
-> 'StateTtlConfig.CleanupStrategies.RocksdbCompactFilterCleanupStrategy':
if self._is_cleanup_in_background:
default_strategy = \
StateTtlConfig.CleanupStrategies.RocksdbCompactFilterCleanupStrategy(1000)
else:
default_strategy = None
return self._strategies.get( # type: ignore
StateTtlConfig.CleanupStrategies.Strategies.ROCKSDB_COMPACTION_FILTER,
default_strategy)
def _to_proto(self):
from pyflink.fn_execution.flink_fn_execution_pb2 import StateDescriptor
DescriptorCleanupStrategies = StateDescriptor.StateTTLConfig.CleanupStrategies
CleanupStrategies = StateTtlConfig.CleanupStrategies
cleanup_strategies = StateDescriptor.StateTTLConfig.CleanupStrategies()
cleanup_strategies.is_cleanup_in_background = self._is_cleanup_in_background
for k, v in self._strategies.items():
cleanup_strategy = cleanup_strategies.strategies.add()
cleanup_strategy.strategy = k._to_proto()
if isinstance(v, CleanupStrategies.EmptyCleanupStrategy):
empty_strategy = DescriptorCleanupStrategies.EmptyCleanupStrategy.EMPTY_STRATEGY
cleanup_strategy.empty_strategy = empty_strategy
elif isinstance(v, CleanupStrategies.IncrementalCleanupStrategy):
incremental_cleanup_strategy = \
DescriptorCleanupStrategies.IncrementalCleanupStrategy()
incremental_cleanup_strategy.cleanup_size = v._cleanup_size
incremental_cleanup_strategy.run_cleanup_for_every_record = \
v._run_cleanup_for_every_record
cleanup_strategy.incremental_cleanup_strategy.CopyFrom(
incremental_cleanup_strategy)
elif isinstance(v, CleanupStrategies.RocksdbCompactFilterCleanupStrategy):
rocksdb_compact_filter_cleanup_strategy = \
DescriptorCleanupStrategies.RocksdbCompactFilterCleanupStrategy()
rocksdb_compact_filter_cleanup_strategy.query_time_after_num_entries = \
v._query_time_after_num_entries
cleanup_strategy.rocksdb_compact_filter_cleanup_strategy.CopyFrom(
rocksdb_compact_filter_cleanup_strategy)
return cleanup_strategies
@staticmethod
def _from_proto(proto):
CleanupStrategies = StateTtlConfig.CleanupStrategies
strategies = {}
is_cleanup_in_background = proto.is_cleanup_in_background
for strategy_entry in proto.strategies:
strategy = CleanupStrategies.Strategies._from_proto(strategy_entry.strategy)
if strategy_entry.HasField('empty_strategy'):
strategies[strategy] = CleanupStrategies.EmptyCleanupStrategy
elif strategy_entry.HasField('incremental_cleanup_strategy'):
incremental_cleanup_strategy = strategy_entry.incremental_cleanup_strategy
strategies[strategy] = CleanupStrategies.IncrementalCleanupStrategy(
incremental_cleanup_strategy.cleanup_size,
incremental_cleanup_strategy.run_cleanup_for_every_record)
elif strategy_entry.HasField('rocksdb_compact_filter_cleanup_strategy'):
rocksdb_compact_filter_cleanup_strategy = \
strategy_entry.rocksdb_compact_filter_cleanup_strategy
strategies[strategy] = CleanupStrategies.RocksdbCompactFilterCleanupStrategy(
rocksdb_compact_filter_cleanup_strategy.query_time_after_num_entries)
return CleanupStrategies(strategies, is_cleanup_in_background)