Source code for pyflink.table.data_view

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from abc import ABC, abstractmethod
from typing import TypeVar, Generic, Iterable, List, Any, Iterator, Dict, Tuple

T = TypeVar('T')
K = TypeVar('K')
V = TypeVar('V')

__all__ = ['DataView', 'ListView', 'MapView']


class DataView(ABC):
    """
    A DataView is a collection type that can be used in the accumulator of an user defined
    :class:`pyflink.table.AggregateFunction`. Depending on the context in which the function
    is used, a DataView can be backed by a normal collection or a state backend.
    """

    @abstractmethod
    def clear(self) -> None:
        """
        Clears the DataView and removes all data.
        """
        pass


[docs]class ListView(DataView, Generic[T]): """ A :class:`DataView` that provides list-like functionality in the accumulator of an AggregateFunction when large amounts of data are expected. """ def __init__(self): self._list = [] def get(self) -> Iterable[T]: """ Returns an iterable of this list view. """ return self._list def add(self, value: T) -> None: """ Adds the given value to this list view. """ self._list.append(value) def add_all(self, values: List[T]) -> None: """ Adds all of the elements of the specified list to this list view. """ self._list.extend(values) def clear(self) -> None: self._list = [] def __eq__(self, other: Any) -> bool: if isinstance(other, ListView): iter_obj = other.get() self_iterator = iter(self) for value in iter_obj: try: self_value = next(self_iterator) except StopIteration: # this list view is shorter than another one return False if self_value != value: # the elements are not the same. return False try: next(self_iterator) except StopIteration: # the length of this list view is the same as another one return True else: # this list view is longer than another one return False else: # the object is not a ListView return False def __hash__(self) -> int: return hash(self._list) def __iter__(self) -> Iterator[T]: return iter(self.get())
[docs]class MapView(Generic[K, V]): """ A :class:`DataView` that provides dict-like functionality in the accumulator of an AggregateFunction when large amounts of data are expected. """ def __init__(self): self._dict = dict() def get(self, key: K) -> V: """ Return the value for the specified key. """ return self._dict[key] def put(self, key: K, value: V) -> None: """ Inserts a value for the given key into the map view. If the map view already contains a value for the key, the existing value is overwritten. """ self._dict[key] = value def put_all(self, dict_value: Dict[K, V]) -> None: """ Inserts all mappings from the specified map to this map view. """ self._dict.update(dict_value) def remove(self, key: K) -> None: """ Deletes the value for the given key. """ del self._dict[key] def contains(self, key: K) -> bool: """ Checks if the map view contains a value for a given key. """ return key in self._dict def items(self) -> Iterable[Tuple[K, V]]: """ Returns all entries of the map view. """ return self._dict.items() def keys(self) -> Iterable[K]: """ Returns all the keys in the map view. """ return self._dict.keys() def values(self) -> Iterable[V]: """ Returns all the values in the map view. """ return self._dict.values() def is_empty(self) -> bool: """ Returns true if the map view contains no key-value mappings, otherwise false. """ return len(self._dict) == 0 def clear(self) -> None: """ Removes all entries of this map. """ self._dict.clear() def __eq__(self, other: Any) -> bool: if other is None: return False if other.__class__ == MapView: return self._dict == other._dict else: # comparing the content of state backed map view is too expensive return other is self def __getitem__(self, key: K) -> V: return self.get(key) def __setitem__(self, key: K, value: V) -> None: self.put(key, value) def __delitem__(self, key: K) -> None: self.remove(key) def __contains__(self, key: K) -> bool: return self.contains(key) def __iter__(self) -> Iterator[K]: return iter(self.keys())