Source code for pyflink.datastream.window

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import sys
from abc import ABC, abstractmethod
from enum import Enum
from io import BytesIO
from typing import TypeVar, Generic, Iterable, Collection

from pyflink.common.serializer import TypeSerializer
from pyflink.datastream.functions import RuntimeContext, InternalWindowFunction
from pyflink.datastream.state import StateDescriptor, State
from pyflink.metrics import MetricGroup

__all__ = ['Window',
           'TimeWindow',
           'CountWindow',
           'WindowAssigner',
           'MergingWindowAssigner',
           'TriggerResult',
           'Trigger',
           'TimeWindowSerializer',
           'CountWindowSerializer']

MAX_LONG_VALUE = sys.maxsize


def long_to_int_with_bit_mixing(x: int) -> int:
    x = (x ^ (x >> 30)) * 0xbf58476d1ce4e5b9
    x = (x ^ (x >> 27)) * 0x94d049bb133111eb
    x = x ^ (x >> 31)
    return x


def mod_inverse(x: int) -> int:
    inverse = x * x * x
    inverse *= 2 - x * inverse
    inverse *= 2 - x * inverse
    inverse *= 2 - x * inverse
    return inverse


[docs]class Window(ABC): """ Window is a grouping of elements into finite buckets. Windows have a maximum timestamp which means that, at some point, all elements that go into one window will have arrived. """
[docs] @abstractmethod def max_timestamp(self) -> int: pass
[docs]class TimeWindow(Window): """ Window that represents a time interval from start (inclusive) to end (exclusive). """ def __init__(self, start: int, end: int): super(TimeWindow, self).__init__() self.start = start self.end = end
[docs] def max_timestamp(self) -> int: return self.end - 1
[docs] def intersects(self, other: 'TimeWindow') -> bool: """ Returns True if this window intersects the given window. """ return self.start <= other.end and self.end >= other.start
[docs] def cover(self, other: 'TimeWindow') -> 'TimeWindow': """ Returns the minimal window covers both this window and the given window. """ return TimeWindow(min(self.start, other.start), max(self.end, other.end))
[docs] @staticmethod def get_window_start_with_offset(timestamp: int, offset: int, window_size: int): """ Method to get the window start for a timestamp. :param timestamp: epoch millisecond to get the window start. :param offset: The offset which window start would be shifted by. :param window_size: The size of the generated windows. :return: window start """ return timestamp - (timestamp - offset + window_size) % window_size
def __hash__(self): return self.start + mod_inverse((self.end << 1) + 1) def __eq__(self, other): return self.__class__ == other.__class__ and self.end == other.end \ and self.start == other.start def __lt__(self, other: 'TimeWindow'): if not isinstance(other, TimeWindow): raise Exception("Does not support comparison with non-TimeWindow %s" % other) return self.start == other.start and self.end < other.end or self.start < other.start def __le__(self, other: 'TimeWindow'): return self.__eq__(other) and self.__lt__(other) def __repr__(self): return "TimeWindow(start={}, end={})".format(self.start, self.end)
[docs]class CountWindow(Window): """ A Window that represents a count window. For each count window, we will assign a unique id. Thus this CountWindow can act as namespace part in state. We can attach data to each different CountWindow. """ def __init__(self, id: int): super(CountWindow, self).__init__() self.id = id
[docs] def max_timestamp(self) -> int: return MAX_LONG_VALUE
def __hash__(self): return long_to_int_with_bit_mixing(self.id) def __eq__(self, other): return self.__class__ == other.__class__ and self.id == other.id def __repr__(self): return "CountWindow(id={})".format(self.id)
[docs]class TimeWindowSerializer(TypeSerializer[TimeWindow]): def __init__(self): self._underlying_coder = None
[docs] def serialize(self, element: TimeWindow, stream: BytesIO) -> None: if self._underlying_coder is None: self._underlying_coder = self._get_coder() bytes_data = self._underlying_coder.encode_nested(element) stream.write(bytes_data)
[docs] def deserialize(self, stream: BytesIO) -> TimeWindow: if self._underlying_coder is None: self._underlying_coder = self._get_coder() bytes_data = stream.read(16) return self._underlying_coder.decode_nested(bytes_data)
def _get_coder(self): try: from pyflink.fn_execution import coder_impl_fast as coder_impl except: from pyflink.fn_execution.beam import beam_coder_impl_slow as coder_impl return coder_impl.TimeWindowCoderImpl()
[docs]class CountWindowSerializer(TypeSerializer[CountWindow]): def __init__(self): self._underlying_coder = None
[docs] def serialize(self, element: CountWindow, stream: BytesIO) -> None: if self._underlying_coder is None: self._underlying_coder = self._get_coder() bytes_data = self._underlying_coder.encode_nested(element) stream.write(bytes_data)
[docs] def deserialize(self, stream: BytesIO) -> CountWindow: if self._underlying_coder is None: self._underlying_coder = self._get_coder() bytes_data = stream.read(8) return self._underlying_coder.decode_nested(bytes_data)
def _get_coder(self): try: from pyflink.fn_execution import coder_impl_fast as coder_impl except: from pyflink.fn_execution.beam import beam_coder_impl_slow as coder_impl return coder_impl.CountWindowCoderImpl()
T = TypeVar('T') W = TypeVar('W') W2 = TypeVar('W2') IN = TypeVar('IN') OUT = TypeVar('OUT') KEY = TypeVar('KEY')
[docs]class TriggerResult(Enum): """ Result type for trigger methods. This determines what happens with the window, for example whether the window function should be called, or the window should be discarded. If a :class:`Trigger` returns TriggerResult.FIRE or TriggerResult.FIRE_AND_PURGE but the window does not contain any data the window function will not be invoked, i.e. no data will be produced for the window. - CONTINUE: No action is taken on the window. - FIRE_AND_PURGE: Evaluates the window function and emits the 'window result'. - FIRE: On FIRE, the window is evaluated and results are emitted. The window is not purged though, all elements are retained. - PURGE: All elements in the window are cleared and the window is discarded, without evaluating the window function or emitting any elements. """ CONTINUE = (False, False) FIRE_AND_PURGE = (True, True) FIRE = (True, False) PURGE = (False, True)
[docs] def is_fire(self) -> bool: return self.value[0]
[docs] def is_purge(self) -> bool: return self.value[1]
[docs]class Trigger(ABC, Generic[T, W]): """ A Trigger determines when a pane of a window should be evaluated to emit the results for that part of the window. A pane is the bucket of elements that have the same key (assigned by the KeySelector) and same Window. An element can be in multiple panes if it was assigned to multiple windows by the WindowAssigner. These panes all have their own instance of the Trigger. Triggers must not maintain state internally since they can be re-created or reused for different keys. All necessary state should be persisted using the state abstraction available on the TriggerContext. When used with a MergingWindowAssigner the Trigger must return true from :func:`can_merge` and :func:`on_merge` most be properly implemented. """
[docs] class TriggerContext(ABC): """ A context object that is given to :class:`Trigger` methods to allow them to register timer callbacks and deal with state. """
[docs] @abstractmethod def get_current_processing_time(self) -> int: """ :return: The current processing time. """ pass
[docs] @abstractmethod def get_metric_group(self) -> MetricGroup: """ Returns the metric group for this :class:`Trigger`. This is the same metric group that would be returned from :func:`~pyflink.datasteam.functions.RuntimeContext.get_metric_group` in a user function. :return: The metric group. """ pass
[docs] @abstractmethod def get_current_watermark(self) -> int: """ :return: The current watermark time. """ pass
[docs] @abstractmethod def register_processing_time_timer(self, time: int) -> None: """ Register a system time callback. When the current system time passes the specified time :func:`~Trigger.on_processing_time` is called with the time specified here. :param time: The time at which to invoke :func:`~Trigger.on_processing_time`. """ pass
[docs] @abstractmethod def register_event_time_timer(self, time: int) -> None: """ Register an event-time callback. When the current watermark passes the specified time :func:`~Trigger.on_event_time` is called with the time specified here. :param time: The watermark at which to invoke :func:`~Trigger.on_event_time`. """ pass
[docs] @abstractmethod def delete_processing_time_timer(self, time: int) -> None: """ Delete the processing time trigger for the given time. """ pass
[docs] @abstractmethod def delete_event_time_timer(self, time: int) -> None: """ Delete the event-time trigger for the given time. """ pass
[docs] @abstractmethod def get_partitioned_state(self, state_descriptor: StateDescriptor) -> State: """ Retrieves a :class:`State` object that can be used to interact with fault-tolerant state that is scoped to the window and key of the current trigger invocation. :param state_descriptor: The StateDescriptor that contains the name and type of the state that is being accessed. :return: The partitioned state object. """ pass
[docs] class OnMergeContext(TriggerContext): """ Extension of :class:`TriggerContext` that is given to :func:`~Trigger.on_merge`. """
[docs] @abstractmethod def merge_partitioned_state(self, state_descriptor: StateDescriptor) -> None: pass
[docs] @abstractmethod def on_element(self, element: T, timestamp: int, window: W, ctx: 'Trigger.TriggerContext') -> TriggerResult: """ Called for every element that gets added to a pane. The result of this will determine whether the pane is evaluated to emit results. :param element: The element that arrived. :param timestamp: The timestamp of the element that arrived. :param window: The window to which the element is being added. :param ctx: A context object that can be used to register timer callbacks. """ pass
[docs] @abstractmethod def on_processing_time(self, time: int, window: W, ctx: 'Trigger.TriggerContext') -> TriggerResult: """ Called when a processing-time timer that was set using the trigger context fires. :param time: The timestamp at which the timer fired. :param window: The window for which the timer fired. :param ctx: A context object that can be used to register timer callbacks. """ pass
[docs] @abstractmethod def on_event_time(self, time: int, window: W, ctx: 'Trigger.TriggerContext') -> TriggerResult: """ Called when an event-time timer that was set using the trigger context fires. :param time: The timestamp at which the timer fired. :param window: The window for which the timer fired. :param ctx: A context object that can be used to register timer callbacks. """ pass
[docs] def can_merge(self) -> bool: """ .. note:: If this returns true you must properly implement :func:`~Trigger.on_merge` :return: True if this trigger supports merging of trigger state and can therefore be used with a MergingWindowAssigner. """ return False
[docs] @abstractmethod def on_merge(self, window: W, ctx: 'Trigger.OnMergeContext') -> None: """ Called when several windows have been merged into one window by the :class:`WindowAssigner`. :param window: The new window that results from the merge. :param ctx: A context object that can be used to register timer callbacks and access state. """ pass
[docs] @abstractmethod def clear(self, window: W, ctx: 'Trigger.TriggerContext') -> None: """ Clears any state that the trigger might still hold for the given window. This is called when a window is purged. Timers set using :func:`~TriggerContext.register_event_time_timer` and :func:`~TriggerContext.register_processing_time_timer` should be deleted here as well as state acquired using :func:`~TriggerContext.get_partitioned_state`. """ pass
[docs]class WindowAssigner(ABC, Generic[T, W]): """ A :class:`WindowAssigner` assigns zero or more :class:`Window` to an element. In a window operation, elements are grouped by their key (if available) and by the windows to which it was assigned. The set of elements with the same key and window is called a pane. When a :class:`Trigger` decides that a certain pane should fire the WindowFunction is applied to produce output elements for that pane. """
[docs] class WindowAssignerContext(ABC): """ A context provided to the :class:`WindowAssigner` that allows it to query the current processing time. """
[docs] @abstractmethod def get_current_processing_time(self) -> int: """ :return: The current processing time. """ pass
[docs] @abstractmethod def get_runtime_context(self) -> RuntimeContext: """ :return: The current runtime context. """ pass
[docs] @abstractmethod def assign_windows(self, element: T, timestamp: int, context: 'WindowAssigner.WindowAssignerContext') -> Collection[W]: """ :param element: The element to which windows should be assigned. :param timestamp: The timestamp of the element. :param context: The :class:`WindowAssignerContext` in which the assigner operates. :return: A collection of windows that should be assigned to the element. """ pass
[docs] @abstractmethod def get_default_trigger(self, env) -> Trigger[T, W]: """ :param env: The StreamExecutionEnvironment used to compile the DataStream job. :return: The default trigger associated with this :class:`WindowAssigner`. """ pass
[docs] @abstractmethod def get_window_serializer(self) -> TypeSerializer[W]: """ :return: A :class:`TypeSerializer` for serializing windows that are assigned by this :class:`WindowAssigner`. """ pass
[docs] @abstractmethod def is_event_time(self) -> bool: """ :return: True if elements are assigned to windows based on event time, false otherwise. """ pass
[docs]class MergingWindowAssigner(WindowAssigner[T, W]): """ A `WindowAssigner` that can merge windows. """
[docs] class MergeCallback(ABC, Generic[W2]): """ Callback to be used in :func:`~MergingWindowAssigner.merge_windows` for specifying which windows should be merged. """
[docs] @abstractmethod def merge(self, to_be_merged: Iterable[W2], merge_result: W2) -> None: """ Specifies that the given windows should be merged into the result window. :param to_be_merged: The list of windows that should be merged into one window. :param merge_result: The resulting merged window. """ pass
[docs] @abstractmethod def merge_windows(self, windows: Iterable[W], callback: 'MergingWindowAssigner.MergeCallback[W]') -> None: """ Determines which windows (if any) should be merged. :param windows: The window candidates. :param callback: A callback that can be invoked to signal which windows should be merged. """ pass
class WindowOperationDescriptor(object): def __init__(self, assigner: WindowAssigner, trigger: Trigger, allowed_lateness: int, window_state_descriptor: StateDescriptor, window_serializer: TypeSerializer, internal_window_function: InternalWindowFunction): self.assigner = assigner self.trigger = trigger self.allowed_lateness = allowed_lateness self.window_state_descriptor = window_state_descriptor self.internal_window_function = internal_window_function self.window_serializer = window_serializer