################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import math
from abc import ABC, abstractmethod
from enum import Enum
from io import BytesIO
from typing import TypeVar, Generic, Iterable, Collection, Any, cast, Optional
from pyflink.common import Time, Types
from pyflink.common.constants import MAX_LONG_VALUE, MIN_LONG_VALUE
from pyflink.common.serializer import TypeSerializer
from pyflink.datastream.functions import RuntimeContext, InternalWindowFunction, ReduceFunction
from pyflink.datastream.output_tag import OutputTag
from pyflink.datastream.state import StateDescriptor, ReducingStateDescriptor, \
ValueStateDescriptor, State, ReducingState
from pyflink.metrics import MetricGroup
__all__ = ['Window',
'TimeWindow',
'CountWindow',
'GlobalWindow',
'WindowAssigner',
'TumblingProcessingTimeWindows',
'TumblingEventTimeWindows',
'SlidingProcessingTimeWindows',
'SlidingEventTimeWindows',
'ProcessingTimeSessionWindows',
'EventTimeSessionWindows',
'DynamicProcessingTimeSessionWindows',
'DynamicEventTimeSessionWindows',
'GlobalWindows',
'MergingWindowAssigner',
'CountTumblingWindowAssigner',
'CountSlidingWindowAssigner',
'TriggerResult',
'Trigger',
'EventTimeTrigger',
'ProcessingTimeTrigger',
'ContinuousEventTimeTrigger',
'ContinuousProcessingTimeTrigger',
'NeverTrigger',
'PurgingTrigger',
'CountTrigger',
'TimeWindowSerializer',
'CountWindowSerializer',
'GlobalWindowSerializer',
'SessionWindowTimeGapExtractor']
def long_to_int_with_bit_mixing(x: int) -> int:
x = (x ^ (x >> 30)) * 0xbf58476d1ce4e5b9
x = (x ^ (x >> 27)) * 0x94d049bb133111eb
x = x ^ (x >> 31)
return x
def mod_inverse(x: int) -> int:
inverse = x * x * x
inverse *= 2 - x * inverse
inverse *= 2 - x * inverse
inverse *= 2 - x * inverse
return inverse
class Window(ABC):
"""
Window is a grouping of elements into finite buckets. Windows have a maximum timestamp
which means that, at some point, all elements that go into one window will have arrived.
"""
@abstractmethod
def max_timestamp(self) -> int:
pass
[docs]class TimeWindow(Window):
"""
Window that represents a time interval from start (inclusive) to end (exclusive).
"""
def __init__(self, start: int, end: int):
super(TimeWindow, self).__init__()
self.start = start
self.end = end
def max_timestamp(self) -> int:
return self.end - 1
def intersects(self, other: 'TimeWindow') -> bool:
"""
Returns True if this window intersects the given window.
"""
return self.start <= other.end and self.end >= other.start
def cover(self, other: 'TimeWindow') -> 'TimeWindow':
"""
Returns the minimal window covers both this window and the given window.
"""
return TimeWindow(min(self.start, other.start), max(self.end, other.end))
@staticmethod
def get_window_start_with_offset(timestamp: int, offset: int, window_size: int):
"""
Method to get the window start for a timestamp.
:param timestamp: epoch millisecond to get the window start.
:param offset: The offset which window start would be shifted by.
:param window_size: The size of the generated windows.
:return: window start
"""
return timestamp - (timestamp - offset + window_size) % window_size
@staticmethod
def merge_windows(windows: Iterable['TimeWindow'],
callback: 'MergingWindowAssigner.MergeCallback[TimeWindow]') -> None:
"""
Merge overlapping :class`TimeWindow`.
"""
sorted_windows = list(windows)
sorted_windows.sort()
merged = []
current_merge = None
current_merge_set = set()
for candidate in sorted_windows:
if current_merge is None:
current_merge = candidate
current_merge_set.add(candidate)
elif current_merge.intersects(candidate):
current_merge = current_merge.cover(candidate)
current_merge_set.add(candidate)
else:
merged.append((current_merge, current_merge_set))
current_merge = candidate
current_merge_set = set()
current_merge_set.add(candidate)
if current_merge is not None:
merged.append((current_merge, current_merge_set))
for merge_key, merge_set in merged:
if len(merge_set) > 1:
callback.merge(merge_set, merge_key)
def __hash__(self):
return self.start + mod_inverse((self.end << 1) + 1)
def __eq__(self, other):
return self.__class__ == other.__class__ and self.end == other.end \
and self.start == other.start
def __lt__(self, other: 'TimeWindow'):
if not isinstance(other, TimeWindow):
raise Exception("Does not support comparison with non-TimeWindow %s" % other)
return self.start == other.start and self.end < other.end or self.start < other.start
def __le__(self, other: 'TimeWindow'):
return self.__eq__(other) and self.__lt__(other)
def __repr__(self):
return "TimeWindow(start={}, end={})".format(self.start, self.end)
[docs]class CountWindow(Window):
"""
A Window that represents a count window. For each count window, we will assign a unique
id. Thus this CountWindow can act as namespace part in state. We can attach data to each
different CountWindow.
"""
def __init__(self, id: int):
super(CountWindow, self).__init__()
self.id = id
def max_timestamp(self) -> int:
return MAX_LONG_VALUE
def __hash__(self):
return long_to_int_with_bit_mixing(self.id)
def __eq__(self, other):
return self.__class__ == other.__class__ and self.id == other.id
def __repr__(self):
return "CountWindow(id={})".format(self.id)
[docs]class GlobalWindow(Window):
"""
The default window into which all data is placed GlobalWindows.
"""
def __init__(self):
super(GlobalWindow, self).__init__()
@staticmethod
def get() -> 'GlobalWindow':
return GlobalWindow()
def max_timestamp(self) -> int:
return MAX_LONG_VALUE
def __eq__(self, other):
return self.__class__ == other.__class__
def __hash__(self):
return 0
def __repr__(self):
return "GlobalWindow"
class TimeWindowSerializer(TypeSerializer[TimeWindow]):
"""
The serializer used to write the TimeWindow type.
"""
def __init__(self):
self._underlying_coder = None
def serialize(self, element: TimeWindow, stream: BytesIO) -> None:
if self._underlying_coder is None:
self._underlying_coder = self._get_coder().get_impl()
bytes_data = self._underlying_coder.encode(element)
stream.write(bytes_data)
def deserialize(self, stream: BytesIO) -> TimeWindow:
if self._underlying_coder is None:
self._underlying_coder = self._get_coder().get_impl()
bytes_data: bytes = stream.read(16)
return self._underlying_coder.decode(bytes_data)
def _get_coder(self):
from pyflink.fn_execution import coders
return coders.TimeWindowCoder()
class CountWindowSerializer(TypeSerializer[CountWindow]):
def __init__(self):
self._underlying_coder = None
def serialize(self, element: CountWindow, stream: BytesIO) -> None:
if self._underlying_coder is None:
self._underlying_coder = self._get_coder().get_impl()
bytes_data = self._underlying_coder.encode(element)
stream.write(bytes_data)
def deserialize(self, stream: BytesIO) -> CountWindow:
if self._underlying_coder is None:
self._underlying_coder = self._get_coder().get_impl()
bytes_data = stream.read(8)
return self._underlying_coder.decode(bytes_data)
def _get_coder(self):
from pyflink.fn_execution import coders
return coders.CountWindowCoder()
class GlobalWindowSerializer(TypeSerializer[GlobalWindow]):
"""
A TypeSerializer for GlobalWindow.
"""
def __init__(self):
self._underlying_coder = None
def serialize(self, element: GlobalWindow, stream: BytesIO) -> None:
if self._underlying_coder is None:
self._underlying_coder = self._get_coder().get_impl()
bytes_data = self._underlying_coder.encode(element)
stream.write(bytes_data)
def deserialize(self, stream: BytesIO) -> GlobalWindow:
if self._underlying_coder is None:
self._underlying_coder = self._get_coder().get_impl()
bytes_data = stream.read(8)
return self._underlying_coder.decode(bytes_data)
def _get_coder(self):
from pyflink.fn_execution import coders
return coders.GlobalWindowCoder()
T = TypeVar('T')
W = TypeVar('W')
W2 = TypeVar('W2')
IN = TypeVar('IN')
OUT = TypeVar('OUT')
KEY = TypeVar('KEY')
[docs]class TriggerResult(Enum):
"""
Result type for trigger methods. This determines what happens with the window, for example
whether the window function should be called, or the window should be discarded.
If a :class:`Trigger` returns TriggerResult.FIRE or TriggerResult.FIRE_AND_PURGE but the window
does not contain any data the window function will not be invoked, i.e. no data will be produced
for the window.
- CONTINUE: No action is taken on the window.
- FIRE_AND_PURGE: Evaluates the window function and emits the 'window result'.
- FIRE: On FIRE, the window is evaluated and results are emitted. The window is not purged
though, all elements are retained.
- PURGE: All elements in the window are cleared and the window is discarded, without
evaluating the window function or emitting any elements.
"""
CONTINUE = (False, False)
FIRE_AND_PURGE = (True, True)
FIRE = (True, False)
PURGE = (False, True)
def is_fire(self) -> bool:
return self.value[0]
def is_purge(self) -> bool:
return self.value[1]
class Trigger(ABC, Generic[T, W]):
"""
A Trigger determines when a pane of a window should be evaluated to emit the results for that
part of the window.
A pane is the bucket of elements that have the same key (assigned by the KeySelector) and same
Window. An element can be in multiple panes if it was assigned to multiple windows by the
WindowAssigner. These panes all have their own instance of the Trigger.
Triggers must not maintain state internally since they can be re-created or reused for different
keys. All necessary state should be persisted using the state abstraction available on the
TriggerContext.
When used with a MergingWindowAssigner the Trigger must return true from :func:`can_merge` and
:func:`on_merge` most be properly implemented.
"""
class TriggerContext(ABC):
"""
A context object that is given to :class:`Trigger` methods to allow them to register timer
callbacks and deal with state.
"""
@abstractmethod
def get_current_processing_time(self) -> int:
"""
:return: The current processing time.
"""
pass
@abstractmethod
def get_metric_group(self) -> MetricGroup:
"""
Returns the metric group for this :class:`Trigger`. This is the same metric group that
would be returned from
:func:`~pyflink.datasteam.functions.RuntimeContext.get_metric_group` in a user function.
:return: The metric group.
"""
pass
@abstractmethod
def get_current_watermark(self) -> int:
"""
:return: The current watermark time.
"""
pass
@abstractmethod
def register_processing_time_timer(self, time: int) -> None:
"""
Register a system time callback. When the current system time passes the specified time
:func:`~Trigger.on_processing_time` is called with the time specified here.
:param time: The time at which to invoke :func:`~Trigger.on_processing_time`.
"""
pass
@abstractmethod
def register_event_time_timer(self, time: int) -> None:
"""
Register an event-time callback. When the current watermark passes the specified time
:func:`~Trigger.on_event_time` is called with the time specified here.
:param time: The watermark at which to invoke :func:`~Trigger.on_event_time`.
"""
pass
@abstractmethod
def delete_processing_time_timer(self, time: int) -> None:
"""
Delete the processing time trigger for the given time.
"""
pass
@abstractmethod
def delete_event_time_timer(self, time: int) -> None:
"""
Delete the event-time trigger for the given time.
"""
pass
@abstractmethod
def get_partitioned_state(self, state_descriptor: StateDescriptor) -> State:
"""
Retrieves a :class:`State` object that can be used to interact with fault-tolerant state
that is scoped to the window and key of the current trigger invocation.
:param state_descriptor: The StateDescriptor that contains the name and type of the
state that is being accessed.
:return: The partitioned state object.
"""
pass
class OnMergeContext(TriggerContext):
"""
Extension of :class:`TriggerContext` that is given to :func:`~Trigger.on_merge`.
"""
@abstractmethod
def merge_partitioned_state(self, state_descriptor: StateDescriptor) -> None:
pass
@abstractmethod
def on_element(self,
element: T,
timestamp: int,
window: W,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
"""
Called for every element that gets added to a pane. The result of this will determine
whether the pane is evaluated to emit results.
:param element: The element that arrived.
:param timestamp: The timestamp of the element that arrived.
:param window: The window to which the element is being added.
:param ctx: A context object that can be used to register timer callbacks.
"""
pass
@abstractmethod
def on_processing_time(self,
time: int,
window: W,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
"""
Called when a processing-time timer that was set using the trigger context fires.
:param time: The timestamp at which the timer fired.
:param window: The window for which the timer fired.
:param ctx: A context object that can be used to register timer callbacks.
"""
pass
@abstractmethod
def on_event_time(self, time: int, window: W, ctx: 'Trigger.TriggerContext') -> TriggerResult:
"""
Called when an event-time timer that was set using the trigger context fires.
:param time: The timestamp at which the timer fired.
:param window: The window for which the timer fired.
:param ctx: A context object that can be used to register timer callbacks.
"""
pass
def can_merge(self) -> bool:
"""
.. note:: If this returns true you must properly implement :func:`~Trigger.on_merge`
:return: True if this trigger supports merging of trigger state and can therefore be used
with a MergingWindowAssigner.
"""
return False
@abstractmethod
def on_merge(self, window: W, ctx: 'Trigger.OnMergeContext') -> None:
"""
Called when several windows have been merged into one window by the :class:`WindowAssigner`.
:param window: The new window that results from the merge.
:param ctx: A context object that can be used to register timer callbacks and access state.
"""
pass
@abstractmethod
def clear(self, window: W, ctx: 'Trigger.TriggerContext') -> None:
"""
Clears any state that the trigger might still hold for the given window. This is called when
a window is purged. Timers set using :func:`~TriggerContext.register_event_time_timer` and
:func:`~TriggerContext.register_processing_time_timer` should be deleted here as well as
state acquired using :func:`~TriggerContext.get_partitioned_state`.
"""
pass
class WindowAssigner(ABC, Generic[T, W]):
"""
A :class:`WindowAssigner` assigns zero or more :class:`Window` to an element.
In a window operation, elements are grouped by their key (if available) and by the windows to
which it was assigned. The set of elements with the same key and window is called a pane. When a
:class:`Trigger` decides that a certain pane should fire the WindowFunction is applied to
produce output elements for that pane.
"""
class WindowAssignerContext(ABC):
"""
A context provided to the :class:`WindowAssigner` that allows it to query the current
processing time.
"""
@abstractmethod
def get_current_processing_time(self) -> int:
"""
:return: The current processing time.
"""
pass
@abstractmethod
def get_runtime_context(self) -> RuntimeContext:
"""
:return: The current runtime context.
"""
pass
@abstractmethod
def assign_windows(self,
element: T,
timestamp: int,
context: 'WindowAssigner.WindowAssignerContext') -> Collection[W]:
"""
:param element: The element to which windows should be assigned.
:param timestamp: The timestamp of the element.
:param context: The :class:`WindowAssignerContext` in which the assigner operates.
:return: A collection of windows that should be assigned to the element.
"""
pass
@abstractmethod
def get_default_trigger(self, env) -> Trigger[T, W]:
"""
:param env: The StreamExecutionEnvironment used to compile the DataStream job.
:return: The default trigger associated with this :class:`WindowAssigner`.
"""
pass
@abstractmethod
def get_window_serializer(self) -> TypeSerializer[W]:
"""
:return: A TypeSerializer for serializing windows that are assigned by this WindowAssigner.
"""
pass
@abstractmethod
def is_event_time(self) -> bool:
"""
:return: True if elements are assigned to windows based on event time, false otherwise.
"""
pass
[docs]class MergingWindowAssigner(WindowAssigner[T, W]):
"""
A `WindowAssigner` that can merge windows.
"""
class MergeCallback(ABC, Generic[W2]):
"""
Callback to be used in :func:`~MergingWindowAssigner.merge_windows` for specifying which
windows should be merged.
"""
@abstractmethod
def merge(self, to_be_merged: Iterable[W2], merge_result: W2) -> None:
"""
Specifies that the given windows should be merged into the result window.
:param to_be_merged: The list of windows that should be merged into one window.
:param merge_result: The resulting merged window.
"""
pass
@abstractmethod
def merge_windows(self,
windows: Iterable[W],
callback: 'MergingWindowAssigner.MergeCallback[W]') -> None:
"""
Determines which windows (if any) should be merged.
:param windows: The window candidates.
:param callback: A callback that can be invoked to signal which windows should be merged.
"""
pass
class WindowOperationDescriptor(object):
def __init__(self,
assigner: WindowAssigner,
trigger: Trigger,
allowed_lateness: int,
late_data_output_tag: Optional[OutputTag],
window_state_descriptor: StateDescriptor,
window_serializer: TypeSerializer,
internal_window_function: InternalWindowFunction):
self.assigner = assigner
self.trigger = trigger
self.allowed_lateness = allowed_lateness
self.late_data_output_tag = late_data_output_tag
self.window_state_descriptor = window_state_descriptor
self.internal_window_function = internal_window_function
self.window_serializer = window_serializer
def generate_op_name(self):
return type(self.assigner).__name__
def generate_op_desc(self, windowed_stream_type, func_desc):
return "%s(%s, %s, %s)" % (
windowed_stream_type, self.assigner, type(self.trigger).__name__, func_desc)
[docs]class EventTimeTrigger(Trigger[T, TimeWindow]):
"""
A Trigger that fires once the watermark passes the end of the window to which a pane belongs.
"""
def on_element(self,
element: T,
timestamp: int,
window: TimeWindow,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
if window.max_timestamp() <= ctx.get_current_watermark():
return TriggerResult.FIRE
else:
ctx.register_event_time_timer(window.max_timestamp())
# No action is taken on the window.
return TriggerResult.CONTINUE
def on_processing_time(self,
time: int,
window: TimeWindow,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
# No action is taken on the window.
return TriggerResult.CONTINUE
def on_event_time(self,
time: int,
window: TimeWindow,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
if time == window.max_timestamp():
return TriggerResult.FIRE
else:
# No action is taken on the window.
return TriggerResult.CONTINUE
def can_merge(self) -> bool:
return True
def on_merge(self,
window: TimeWindow,
ctx: 'Trigger.OnMergeContext') -> None:
window_max_timestamp = window.max_timestamp()
if window_max_timestamp > ctx.get_current_watermark():
ctx.register_event_time_timer(window_max_timestamp)
def clear(self,
window: TimeWindow,
ctx: 'Trigger.TriggerContext') -> None:
ctx.delete_event_time_timer(window.max_timestamp())
@staticmethod
def create() -> 'EventTimeTrigger':
return EventTimeTrigger()
[docs]class ContinuousEventTimeTrigger(Trigger[T, TimeWindow]):
"""
A Trigger that continuously fires based on a given time interval. This fires based Watermarks.
"""
def __init__(self, interval: int):
self.interval = interval
self.state_desc = ReducingStateDescriptor("fire-time", Min, Types.LONG())
self.fire_timestamp_state = None
@staticmethod
def of(interval: Time) -> 'ContinuousEventTimeTrigger':
return ContinuousEventTimeTrigger(interval.to_milliseconds())
def on_element(self,
element: T,
timestamp: int,
window: TimeWindow,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
if window.max_timestamp() <= ctx.get_current_watermark():
# if the watermark is already past the window fire immediately
return TriggerResult.FIRE
else:
ctx.register_event_time_timer(window.max_timestamp())
fire_timestamp_state = cast(ReducingState, ctx.get_partitioned_state(self.state_desc))
if fire_timestamp_state.get() is None:
self.register_next_fire_timestamp(timestamp - (timestamp % self.interval), window, ctx,
fire_timestamp_state)
return TriggerResult.CONTINUE
def on_processing_time(self,
time: int,
window: TimeWindow,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
return TriggerResult.CONTINUE
def on_event_time(self,
time: int,
window: TimeWindow,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
if time == window.max_timestamp():
return TriggerResult.FIRE
fire_timestamp_state = cast(ReducingState, ctx.get_partitioned_state(self.state_desc))
fire_timestamp = fire_timestamp_state.get()
if fire_timestamp is not None and fire_timestamp == time:
fire_timestamp_state.clear()
self.register_next_fire_timestamp(time, window, ctx, fire_timestamp_state)
return TriggerResult.FIRE
return TriggerResult.CONTINUE
def on_merge(self, window: TimeWindow, ctx: 'Trigger.OnMergeContext') -> None:
ctx.merge_partitioned_state(self.state_desc)
next_fire_timestamp = cast(ReducingState, ctx.get_partitioned_state(self.state_desc)).get()
if next_fire_timestamp is not None:
ctx.register_event_time_timer(next_fire_timestamp)
def clear(self, window: TimeWindow, ctx: 'Trigger.TriggerContext') -> None:
fire_timestamp = cast(ReducingState, ctx.get_partitioned_state(self.state_desc))
timestamp = fire_timestamp.get()
if timestamp is not None:
ctx.delete_event_time_timer(timestamp)
fire_timestamp.clear()
def can_merge(self) -> bool:
return True
def register_next_fire_timestamp(self,
time: int,
window: TimeWindow,
ctx: 'Trigger.TriggerContext',
fire_timestamp_state: ReducingState):
next_fire_timestamp = min(time + self.interval, window.max_timestamp())
fire_timestamp_state.add(next_fire_timestamp)
ctx.register_event_time_timer(next_fire_timestamp)
class Min(ReduceFunction):
def reduce(self, value1, value2):
return min(value1, value2)
[docs]class ProcessingTimeTrigger(Trigger[T, TimeWindow]):
"""
A Trigger that fires once the current system time passes the end of the window to
which a pane belongs.
"""
def on_element(self,
element: T,
timestamp: int,
window: TimeWindow,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
ctx.register_processing_time_timer(window.max_timestamp())
return TriggerResult.CONTINUE
def on_processing_time(self,
time: int,
window: TimeWindow,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
return TriggerResult.FIRE
def on_event_time(self,
time: int,
window: TimeWindow,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
return TriggerResult.CONTINUE
def can_merge(self) -> bool:
return True
def on_merge(self,
window: TimeWindow,
ctx: 'Trigger.OnMergeContext') -> None:
window_max_timestamp = window.max_timestamp()
if window_max_timestamp > ctx.get_current_processing_time():
ctx.register_processing_time_timer(window_max_timestamp)
def clear(self,
window: TimeWindow,
ctx: 'Trigger.TriggerContext') -> None:
ctx.delete_processing_time_timer(window.max_timestamp())
@staticmethod
def create() -> 'ProcessingTimeTrigger':
return ProcessingTimeTrigger()
[docs]class ContinuousProcessingTimeTrigger(Trigger[T, TimeWindow]):
"""
A Trigger that continuously fires based on a given time interval as measured by the clock of the
machine on which the job is running.
"""
def __init__(self, interval: int):
self.interval = interval
self.state_desc = ReducingStateDescriptor("fire-time", Min, Types.LONG())
self.fire_timestamp_state = None
@staticmethod
def of(interval: Time) -> 'ContinuousProcessingTimeTrigger':
return ContinuousProcessingTimeTrigger(interval.to_milliseconds())
def on_element(self,
element: T,
timestamp: int,
window: TimeWindow,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
fire_timestamp_state = cast(ReducingState, ctx.get_partitioned_state(self.state_desc))
timestamp = ctx.get_current_processing_time()
if fire_timestamp_state.get() is None:
self.register_next_fire_timestamp(timestamp - (timestamp % self.interval), window, ctx,
fire_timestamp_state)
return TriggerResult.CONTINUE
def on_processing_time(self,
time: int,
window: TimeWindow,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
fire_timestamp_state = cast(ReducingState, ctx.get_partitioned_state(self.state_desc))
if fire_timestamp_state.get() == time:
fire_timestamp_state.clear()
self.register_next_fire_timestamp(time, window, ctx, fire_timestamp_state)
return TriggerResult.FIRE
return TriggerResult.CONTINUE
def on_event_time(self,
time: int,
window: TimeWindow,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
return TriggerResult.CONTINUE
def on_merge(self, window: TimeWindow, ctx: 'Trigger.OnMergeContext') -> None:
# States for old windows will lose after the call.
ctx.merge_partitioned_state(self.state_desc)
# Register timer for this new window.
next_fire_timestamp = cast(ReducingState, ctx.get_partitioned_state(self.state_desc)).get()
if next_fire_timestamp is not None:
ctx.register_processing_time_timer(next_fire_timestamp)
def clear(self, window: TimeWindow, ctx: 'Trigger.TriggerContext') -> None:
fire_timestamp_state = cast(ReducingState, ctx.get_partitioned_state(self.state_desc))
timestamp = fire_timestamp_state.get()
if timestamp is not None:
ctx.delete_processing_time_timer(timestamp)
fire_timestamp_state.clear()
def can_merge(self) -> bool:
return True
def register_next_fire_timestamp(self,
time: int,
window: TimeWindow,
ctx: 'Trigger.TriggerContext',
fire_timestamp_state: ReducingState):
next_fire_timestamp = min(time + self.interval, window.max_timestamp())
fire_timestamp_state.add(next_fire_timestamp)
ctx.register_processing_time_timer(next_fire_timestamp)
[docs]class PurgingTrigger(Trigger[T, Window]):
"""
A trigger that can turn any Trigger into a purging Trigger.
When the nested trigger fires, this will return a FIRE_AND_PURGE TriggerResult.
"""
def __init__(self, nested_trigger: Trigger[T, Window]):
self.nested_trigger = nested_trigger
@staticmethod
def of(nested_trigger: Trigger[T, Window]) -> 'PurgingTrigger':
return PurgingTrigger(nested_trigger)
def on_element(self,
element: T,
timestamp: int,
window: Window,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
trigger_result = self.nested_trigger.on_element(element, timestamp, window, ctx)
if trigger_result.is_fire() is True:
return TriggerResult.FIRE_AND_PURGE
else:
return trigger_result
def on_event_time(self,
time: int,
window: Window,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
trigger_result = self.nested_trigger.on_event_time(time, window, ctx)
if trigger_result.is_fire() is True:
return TriggerResult.FIRE_AND_PURGE
else:
return trigger_result
def on_processing_time(self,
time: int,
window: Window,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
trigger_result = self.nested_trigger.on_processing_time(time, window, ctx)
if trigger_result.is_fire() is True:
return TriggerResult.FIRE_AND_PURGE
else:
return trigger_result
def clear(self,
window: Window,
ctx: 'Trigger.TriggerContext') -> None:
self.nested_trigger.clear(window, ctx)
def can_merge(self) -> bool:
return self.nested_trigger.can_merge()
def on_merge(self,
window: Window,
ctx: 'Trigger.OnMergeContext') -> None:
self.nested_trigger.on_merge(window, ctx)
[docs]class CountTrigger(Trigger[T, CountWindow]):
"""
A Trigger that fires once the count of elements in a pane reaches the given count.
"""
def __init__(self, window_size: int):
self._window_size = window_size
self._count_state_descriptor = ReducingStateDescriptor(
"count", lambda a, b: a + b, Types.LONG())
@staticmethod
def of(window_size: int) -> 'CountTrigger':
return CountTrigger(window_size)
def on_element(self,
element: T,
timestamp: int,
window: CountWindow,
ctx: Trigger.TriggerContext) -> TriggerResult:
count_state = cast(ReducingState, ctx.get_partitioned_state(self._count_state_descriptor))
count_state.add(1)
if count_state.get() >= self._window_size:
# On FIRE, the window is evaluated and results are emitted. The window is not purged
# though, all elements are retained.
count_state.clear()
return TriggerResult.FIRE
else:
# No action is taken on the window.
return TriggerResult.CONTINUE
def on_processing_time(self,
time: int,
window: CountWindow,
ctx: Trigger.TriggerContext) -> TriggerResult:
# No action is taken on the window.
return TriggerResult.CONTINUE
def on_event_time(self,
time: int,
window: CountWindow,
ctx: Trigger.TriggerContext) -> TriggerResult:
# No action is taken on the window.
return TriggerResult.CONTINUE
def can_merge(self) -> bool:
return True
def on_merge(self, window: CountWindow, ctx: Trigger.OnMergeContext) -> None:
ctx.merge_partitioned_state(self._count_state_descriptor)
def clear(self, window: CountWindow, ctx: Trigger.TriggerContext) -> None:
count_state = ctx.get_partitioned_state(self._count_state_descriptor)
count_state.clear()
[docs]class NeverTrigger(Trigger[T, GlobalWindow]):
"""
A trigger that never fires, as default Trigger for GlobalWindows.
"""
def on_element(self,
element: T,
timestamp: int,
window: GlobalWindow,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
return TriggerResult.CONTINUE
def on_processing_time(self,
time: int,
window: GlobalWindow,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
return TriggerResult.CONTINUE
def on_event_time(self,
time: int,
window: GlobalWindow,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
return TriggerResult.CONTINUE
def on_merge(self,
window: GlobalWindow,
ctx: 'Trigger.OnMergeContext') -> None:
pass
def clear(self,
window: GlobalWindow,
ctx: 'Trigger.TriggerContext') -> None:
pass
[docs]class CountTumblingWindowAssigner(WindowAssigner[T, CountWindow]):
"""
A WindowAssigner that windows elements into fixed-size windows based on the count number
of the elements. Windows cannot overlap.
"""
def __init__(self, window_size: int):
"""
:param window_size: The size of the windows in number of elements.
"""
self._window_size = window_size
self._count_descriptor = ValueStateDescriptor('tumble-count-assigner', Types.LONG())
@staticmethod
def of(window_size: int) -> 'CountTumblingWindowAssigner':
return CountTumblingWindowAssigner(window_size)
def assign_windows(self,
element: T,
timestamp: int,
context: 'WindowAssigner.WindowAssignerContext') -> Collection[CountWindow]:
count_state = context.get_runtime_context().get_state(self._count_descriptor)
count_value = count_state.value()
if count_value is None:
current_count = 0
else:
current_count = count_value
count_state.update(current_count + 1)
return [CountWindow(current_count // self._window_size)]
def get_default_trigger(self, env) -> Trigger[T, CountWindow]:
return CountTrigger(self._window_size)
def get_window_serializer(self) -> TypeSerializer[CountWindow]:
return CountWindowSerializer()
def is_event_time(self) -> bool:
return False
def __repr__(self) -> str:
return "CountTumblingWindowAssigner(%s)" % self._window_size
[docs]class CountSlidingWindowAssigner(WindowAssigner[T, CountWindow]):
"""
A WindowAssigner that windows elements into sliding windows based on the count number of
the elements. Windows can possibly overlap.
"""
def __init__(self, window_size: int, window_slide: int):
"""
:param window_size: The size of the windows in number of elements.
:param window_slide: The slide interval in number of elements.
"""
self._window_size = window_size
self._window_slide = window_slide
self._count_descriptor = ValueStateDescriptor('slide-count-assigner', Types.LONG())
@staticmethod
def of(window_size: int, window_slide: int) -> 'CountSlidingWindowAssigner':
return CountSlidingWindowAssigner(window_size, window_slide)
def assign_windows(self,
element: T,
timestamp: int,
context: 'WindowAssigner.WindowAssignerContext') -> Collection[CountWindow]:
count_state = context.get_runtime_context().get_state(self._count_descriptor)
count_value = count_state.value()
if count_value is None:
current_count = 0
else:
current_count = count_value
count_state.update(current_count + 1)
last_id = current_count // self._window_slide
last_start = last_id * self._window_slide
last_end = last_start + self._window_size - 1
windows = []
while last_id >= 0 and last_start <= current_count <= last_end:
if last_start <= current_count <= last_end:
windows.append(CountWindow(last_id))
last_id -= 1
last_start -= self._window_slide
last_end -= self._window_slide
return windows
def get_default_trigger(self, env) -> Trigger[T, CountWindow]:
return CountTrigger(self._window_size)
def get_window_serializer(self) -> TypeSerializer[CountWindow]:
return CountWindowSerializer()
def is_event_time(self) -> bool:
return False
def __repr__(self):
return "CountSlidingWindowAssigner(%s, %s)" % (self._window_size, self._window_slide)
[docs]class TumblingProcessingTimeWindows(WindowAssigner[T, TimeWindow]):
"""
A WindowAssigner that windows elements into windows based on the current system time of
the machine the operation is running on. Windows cannot overlap.
For example, in order to window into windows of 1 minute, every 10 seconds:
::
>>> data_stream.key_by(lambda x: x[0], key_type=Types.STRING()) \\
... .window(TumblingProcessingTimeWindows.of(Time.minutes(1), Time.seconds(10)))
"""
def __init__(self, size: int, offset: int):
if abs(offset) >= size:
raise Exception("TumblingProcessingTimeWindows parameters must satisfy "
"abs(offset) < size")
self._size = size
self._offset = offset
@staticmethod
def of(size: Time, offset: Time = None) -> 'TumblingProcessingTimeWindows':
"""
Creates a new :class:`TumblingProcessingTimeWindows` :class:`WindowAssigner` that assigns
elements to time windows based on the element timestamp and offset.
For example, if you want window a stream by hour, but window begins at the 15th minutes of
each hour, you can use of(Time.hours(1), Time.minutes(15)), then you will get time
windows start at 0:15:00,1:15:00,2:15:00,etc.
Rather than that, if you are living in somewhere which is not using UTC±00:00 time, such as
China which is using UTC+08:00, and you want a time window with size of one day, and window
begins at every 00:00:00 of local time, you may use of(Time.days(1), Time.hours(-8)).
The parameter of offset is Time.hours(-8) since UTC+08:00 is 8 hours earlier than UTC time.
:param size The size of the generated windows.
:param offset The offset which window start would be shifted by.
:return The time policy.
"""
if offset is None:
return TumblingProcessingTimeWindows(size.to_milliseconds(), 0)
else:
return TumblingProcessingTimeWindows(size.to_milliseconds(), offset.to_milliseconds())
def assign_windows(self,
element: T,
timestamp: int,
context: WindowAssigner.WindowAssignerContext) -> Collection[TimeWindow]:
current_processing_time = context.get_current_processing_time()
start = TimeWindow.get_window_start_with_offset(current_processing_time, self._offset,
self._size)
return [TimeWindow(start, start + self._size)]
def get_default_trigger(self, env) -> Trigger[T, TimeWindow]:
return ProcessingTimeTrigger()
def get_window_serializer(self) -> TypeSerializer[TimeWindow]:
return TimeWindowSerializer()
def is_event_time(self) -> bool:
return False
def __repr__(self):
return "TumblingProcessingTimeWindows(%s, %s)" % (self._size, self._offset)
[docs]class TumblingEventTimeWindows(WindowAssigner[T, TimeWindow]):
"""
A WindowAssigner that windows elements into windows based on the timestamp of the
elements. Windows cannot overlap.
For example, in order to window into windows of 1 minute:
::
>>> data_stream.key_by(lambda x: x[0], key_type=Types.STRING()) \\
... .window(TumblingEventTimeWindows.of(Time.minutes(1)))
"""
def __init__(self, size: int, offset: int):
if abs(offset) >= size:
raise Exception("TumblingEventTimeWindows parameters must satisfy abs(offset) < size")
self._size = size
self._offset = offset
@staticmethod
def of(size: Time, offset: Time = None) -> 'TumblingEventTimeWindows':
"""
Creates a new TumblingEventTimeWindows WindowAssigner that assigns elements
to time windows based on the element timestamp, offset and a staggering offset, depending on
the staggering policy.
:param size The size of the generated windows.
:param offset The globalOffset which window start would be shifted by.
"""
if offset is None:
return TumblingEventTimeWindows(size.to_milliseconds(), 0)
else:
return TumblingEventTimeWindows(size.to_milliseconds(), offset.to_milliseconds())
def assign_windows(self,
element: T,
timestamp: int,
context: WindowAssigner.WindowAssignerContext) -> Collection[TimeWindow]:
if timestamp > MIN_LONG_VALUE:
start = TimeWindow.get_window_start_with_offset(timestamp, self._offset, self._size)
return [TimeWindow(start, start + self._size)]
else:
raise Exception("Record has Java Long.MIN_VALUE timestamp (= no timestamp marker). "
+ "Is the time characteristic set to 'ProcessingTime', "
+ "or did you forget to call "
+ "'data_stream.assign_timestamps_and_watermarks(...)'?")
def get_default_trigger(self, env) -> Trigger[T, TimeWindow]:
return EventTimeTrigger()
def get_window_serializer(self) -> TypeSerializer[TimeWindow]:
return TimeWindowSerializer()
def is_event_time(self) -> bool:
return True
def __repr__(self):
return "TumblingEventTimeWindows(%s, %s)" % (self._size, self._offset)
[docs]class SlidingProcessingTimeWindows(WindowAssigner[T, TimeWindow]):
"""
A WindowAssigner that windows elements into sliding windows based on the current system
time of the machine the operation is running on. Windows can possibly overlap.
For example, in order to window into windows of 1 minute, every 10 seconds:
::
>>> data_stream.key_by(lambda x: x[0], key_type=Types.STRING()) \\
... .window(SlidingProcessingTimeWindows.of(Time.minutes(1), Time.seconds(10)))
"""
def __init__(self, size: int, slide: int, offset: int):
if abs(offset) >= slide or size <= 0:
raise Exception("SlidingProcessingTimeWindows parameters must satisfy "
+ "abs(offset) < slide and size > 0")
self._size = size
self._slide = slide
self._offset = offset
self._pane_size = math.gcd(size, slide)
@staticmethod
def of(size: Time, slide: Time, offset: Time = None) -> 'SlidingProcessingTimeWindows':
"""
Creates a new :class:`SlidingProcessingTimeWindows` :class:`WindowAssigner` that assigns
elements to time windows based on the element timestamp and offset.
For example, if you want window a stream by hour,but window begins at the 15th minutes of
each hour, you can use of(Time.hours(1),Time.minutes(15)),then you will get time
windows start at 0:15:00,1:15:00,2:15:00,etc.
Rather than that, if you are living in somewhere which is not using UTC±00:00 time, such as
China which is using UTC+08:00, and you want a time window with size of one day, and window
begins at every 00:00:00 of local time,you may use of(Time.days(1),Time.hours(-8)).
The parameter of offset is Time.hours(-8) since UTC+08:00 is 8 hours earlier than UTC time.
:param size The size of the generated windows.
:param slide The slide interval of the generated windows.
:param offset The offset which window start would be shifted by.
:return The time policy.
"""
if offset is None:
return SlidingProcessingTimeWindows(size.to_milliseconds(), slide.to_milliseconds(), 0)
else:
return SlidingProcessingTimeWindows(size.to_milliseconds(), slide.to_milliseconds(),
offset.to_milliseconds())
def assign_windows(self,
element: T,
timestamp: int,
context: 'WindowAssigner.WindowAssignerContext') -> Collection[TimeWindow]:
current_processing_time = context.get_current_processing_time()
last_start = TimeWindow.get_window_start_with_offset(
current_processing_time, self._offset, self._slide)
windows = [TimeWindow(start, start + self._size)
for start in range(last_start,
current_processing_time - self._size, -self._slide)]
return windows
def get_default_trigger(self, env) -> Trigger[T, TimeWindow]:
return ProcessingTimeTrigger()
def get_window_serializer(self) -> TypeSerializer[TimeWindow]:
return TimeWindowSerializer()
def is_event_time(self) -> bool:
return False
def __repr__(self) -> str:
return "SlidingProcessingTimeWindows(%s, %s, %s)" % (self._size, self._slide, self._offset)
[docs]class SlidingEventTimeWindows(WindowAssigner[T, TimeWindow]):
"""
A WindowAssigner that windows elements into sliding windows based on the timestamp of the
elements. Windows can possibly overlap.
For example, in order to window into windows of 1 minute, every 10 seconds:
::
>>> data_stream.key_by(lambda x: x[0], key_type=Types.STRING()) \\
... .window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10)))
"""
def __init__(self, size: int, slide: int, offset: int):
if abs(offset) >= slide or size <= 0:
raise Exception("SlidingEventTimeWindows parameters must satisfy "
+ "abs(offset) < slide and size > 0")
self._size = size
self._slide = slide
self._offset = offset
self._pane_size = math.gcd(size, slide)
@staticmethod
def of(size: Time, slide: Time, offset: Time = None) -> 'SlidingEventTimeWindows':
"""
Creates a new :class:`SlidingEventTimeWindows` :class:`WindowAssigner` that assigns elements
to time windows based on the element timestamp and offset.
For example, if you want window a stream by hour,but window begins at the 15th minutes of
each hour, you can use of(Time.hours(1),Time.minutes(15)),then you will get time
windows start at 0:15:00,1:15:00,2:15:00,etc.
Rather than that, if you are living in somewhere which is not using UTC±00:00 time, such as
China which is using UTC+08:00, and you want a time window with size of one day, and window
begins at every 00:00:00 of local time,you may use of(Time.days(1),Time.hours(-8)).
The parameter of offset is Time.hours(-8) since UTC+08:00 is 8 hours earlier than UTC time.
:param size The size of the generated windows.
:param slide The slide interval of the generated windows.
:param offset The offset which window start would be shifted by.
:return The time policy.
"""
if offset is None:
return SlidingEventTimeWindows(size.to_milliseconds(), slide.to_milliseconds(), 0)
else:
return SlidingEventTimeWindows(size.to_milliseconds(), slide.to_milliseconds(),
offset.to_milliseconds())
def assign_windows(self,
element: T,
timestamp: int,
context: 'WindowAssigner.WindowAssignerContext') -> Collection[TimeWindow]:
if timestamp > MIN_LONG_VALUE:
last_start = TimeWindow.get_window_start_with_offset(timestamp,
self._offset, self._slide)
windows = [TimeWindow(start, start + self._size)
for start in range(last_start, timestamp - self._size, -self._slide)]
return windows
else:
raise Exception("Record has Java Long.MIN_VALUE timestamp (= no timestamp marker). "
+ "Is the time characteristic set to 'ProcessingTime', "
"or did you forget to call "
+ "'data_stream.assign_timestamps_and_watermarks(...)'?")
def get_default_trigger(self, env) -> Trigger[T, TimeWindow]:
return EventTimeTrigger()
def get_window_serializer(self) -> TypeSerializer[TimeWindow]:
return TimeWindowSerializer()
def is_event_time(self) -> bool:
return True
def __repr__(self) -> str:
return "SlidingEventTimeWindows(%s, %s, %s)" % (self._size, self._slide, self._offset)
[docs]class ProcessingTimeSessionWindows(MergingWindowAssigner[T, TimeWindow]):
"""
A WindowAssigner that windows elements into sessions based on the current processing
time. Windows cannot overlap.
For example, the processing interval is set to 1 minutes:
::
>>> data_stream.key_by(lambda x: x[0], key_type=Types.STRING()) \\
... .window(ProcessingTimeSessionWindows.with_gap(Time.minutes(1)))
"""
def __init__(self, session_gap: int):
if session_gap <= 0:
raise Exception("ProcessingTimeSessionWindows parameters must satisfy 0 < size")
self._session_gap = session_gap
@staticmethod
def with_gap(size: Time) -> 'ProcessingTimeSessionWindows':
"""
Creates a new SessionWindows WindowAssigner that assigns elements to sessions based on
the element timestamp.
:param size: The session timeout, i.e. the time gap between sessions
:return: The policy.
"""
return ProcessingTimeSessionWindows(size.to_milliseconds())
@staticmethod
def with_dynamic_gap(
extractor: SessionWindowTimeGapExtractor) -> 'DynamicProcessingTimeSessionWindows':
"""
Creates a new SessionWindows WindowAssigner that assigns elements to sessions based on the
element timestamp.
:param extractor: The extractor to use to extract the time gap from the input elements.
:return: The policy.
"""
return DynamicProcessingTimeSessionWindows(extractor)
def merge_windows(self,
windows: Iterable[TimeWindow],
callback: 'MergingWindowAssigner.MergeCallback[TimeWindow]') -> None:
TimeWindow.merge_windows(windows, callback)
def assign_windows(self,
element: T,
timestamp: int,
context: 'WindowAssigner.WindowAssignerContext') -> Collection[TimeWindow]:
timestamp = context.get_current_processing_time()
return [TimeWindow(timestamp, timestamp + self._session_gap)]
def get_default_trigger(self, env) -> Trigger[T, TimeWindow]:
return ProcessingTimeTrigger()
def get_window_serializer(self) -> TypeSerializer[TimeWindow]:
return TimeWindowSerializer()
def is_event_time(self) -> bool:
return False
def __repr__(self):
return "ProcessingTimeSessionWindows(%s)" % self._session_gap
[docs]class EventTimeSessionWindows(MergingWindowAssigner[T, TimeWindow]):
"""
A :class:`WindowAssigner` that windows elements into sessions based on the timestamp of the
elements. Windows cannot overlap.
For example, Set the timestamp of the element to 1 minutes:
::
>>> data_stream.key_by(lambda x: x[0], key_type=Types.STRING()) \\
... .window(EventTimeSessionWindows.with_gap(Time.minutes(1)))
"""
def __init__(self, session_gap: int):
if session_gap <= 0:
raise Exception("EventTimeSessionWindows parameters must satisfy 0 < size")
self._session_gap = session_gap
@staticmethod
def with_gap(size: Time) -> 'EventTimeSessionWindows':
"""
Creates a new SessionWindows WindowAssigner that assigns elements to sessions
based on the element timestamp.
:param size: The session timeout, i.e. the time gap between sessions.
:return: The policy.
"""
return EventTimeSessionWindows(size.to_milliseconds())
@staticmethod
def with_dynamic_gap(
extractor: SessionWindowTimeGapExtractor) -> 'DynamicEventTimeSessionWindows':
"""
Creates a new SessionWindows WindowAssigner that assigns elements to sessions based on
the element timestamp.
:param extractor: The extractor to use to extract the time gap from the input elements.
:return: The policy.
"""
return DynamicEventTimeSessionWindows(extractor)
def merge_windows(self,
windows: Iterable[TimeWindow],
callback: 'MergingWindowAssigner.MergeCallback[TimeWindow]') -> None:
TimeWindow.merge_windows(windows, callback)
def assign_windows(self,
element: T,
timestamp: int,
context: 'WindowAssigner.WindowAssignerContext') -> Collection[TimeWindow]:
return [TimeWindow(timestamp, timestamp + self._session_gap)]
def get_default_trigger(self, env) -> Trigger[T, TimeWindow]:
return EventTimeTrigger()
def get_window_serializer(self) -> TypeSerializer[TimeWindow]:
return TimeWindowSerializer()
def is_event_time(self) -> bool:
return True
def __repr__(self):
return "EventTimeSessionWindows(%s)" % self._session_gap
[docs]class DynamicProcessingTimeSessionWindows(MergingWindowAssigner[T, TimeWindow]):
"""
A WindowAssigner that windows elements into sessions based on the current processing
time. Windows cannot overlap.
For example, in order to window into windows with a dynamic time gap:
::
>>> data_stream.key_by(lambda x: x[0], key_type=Types.STRING()) \\
... .window(DynamicProcessingTimeSessionWindows.with_dynamic_gap(extractor))
"""
def __init__(self,
session_window_time_gap_extractor: SessionWindowTimeGapExtractor):
self._session_gap = 0
self._session_window_time_gap_extractor = session_window_time_gap_extractor
@staticmethod
def with_dynamic_gap(
extractor: SessionWindowTimeGapExtractor) -> 'DynamicProcessingTimeSessionWindows':
"""
Creates a new SessionWindows WindowAssigner that assigns elements to sessions based
on the element timestamp.
:param extractor: The extractor to use to extract the time gap from the input elements.
:return: The policy.
"""
return DynamicProcessingTimeSessionWindows(extractor)
def merge_windows(self,
windows: Iterable[TimeWindow],
callback: 'MergingWindowAssigner.MergeCallback[TimeWindow]') -> None:
TimeWindow.merge_windows(windows, callback)
def assign_windows(self,
element: T,
timestamp: int,
context: 'WindowAssigner.WindowAssignerContext') -> Collection[TimeWindow]:
timestamp = context.get_current_processing_time()
self._session_gap = self._session_window_time_gap_extractor.extract(element)
if self._session_gap <= 0:
raise Exception("Dynamic session time gap must satisfy 0 < gap")
return [TimeWindow(timestamp, timestamp + self._session_gap)]
def get_default_trigger(self, env) -> Trigger[T, TimeWindow]:
return ProcessingTimeTrigger()
def get_window_serializer(self) -> TypeSerializer[TimeWindow]:
return TimeWindowSerializer()
def is_event_time(self) -> bool:
return False
def __repr__(self):
return "DynamicProcessingTimeSessionWindows(%s)" % self._session_gap
[docs]class DynamicEventTimeSessionWindows(MergingWindowAssigner[T, TimeWindow]):
"""
A :class:`WindowAssigner` that windows elements into sessions based on the timestamp of the
elements. Windows cannot overlap.
For example, in order to window into windows with a dynamic time gap:
::
>>> data_stream.key_by(lambda x: x[0], key_type=Types.STRING()) \\
... .window(DynamicEventTimeSessionWindows.with_dynamic_gap(extractor))
"""
def __init__(self,
session_window_time_gap_extractor: SessionWindowTimeGapExtractor):
self._session_gap = 0
self._session_window_time_gap_extractor = session_window_time_gap_extractor
@staticmethod
def with_dynamic_gap(
extractor: SessionWindowTimeGapExtractor) -> 'DynamicEventTimeSessionWindows':
"""
Creates a new SessionWindows WindowAssigner that assigns elements to sessions
based on the element timestamp.
:param extractor: The extractor to use to extract the time gap from the input elements.
:return: The policy.
"""
return DynamicEventTimeSessionWindows(extractor)
def merge_windows(self,
windows: Iterable[TimeWindow],
callback: 'MergingWindowAssigner.MergeCallback[TimeWindow]') -> None:
TimeWindow.merge_windows(windows, callback)
def assign_windows(self,
element: T,
timestamp: int,
context: 'WindowAssigner.WindowAssignerContext') -> Collection[TimeWindow]:
self._session_gap = self._session_window_time_gap_extractor.extract(element)
if self._session_gap <= 0:
raise Exception("Dynamic session time gap must satisfy 0 < gap")
return [TimeWindow(timestamp, timestamp + self._session_gap)]
def get_default_trigger(self, env) -> Trigger[T, TimeWindow]:
return EventTimeTrigger()
def get_window_serializer(self) -> TypeSerializer[TimeWindow]:
return TimeWindowSerializer()
def is_event_time(self) -> bool:
return True
def __repr__(self):
return "DynamicEventTimeSessionWindows(%s)" % self._session_gap
[docs]class GlobalWindows(WindowAssigner[T, GlobalWindow]):
"""
A WindowAssigner that assigns all elements to the same GlobalWindow.
"""
def __init__(self) -> None:
super().__init__()
def assign_windows(self,
element: T,
timestamp: int,
context: 'WindowAssigner.WindowAssignerContext') -> Collection[GlobalWindow]:
return [GlobalWindow.get()]
@staticmethod
def create() -> 'GlobalWindows':
"""
Creates a new GlobalWindows WindowAssigner that assigns all elements to the
same GlobalWindow.
"""
return GlobalWindows()
def get_default_trigger(self, env) -> Trigger[T, GlobalWindow]:
return NeverTrigger()
def get_window_serializer(self) -> TypeSerializer[GlobalWindow]:
return GlobalWindowSerializer()
def is_event_time(self) -> bool:
return False
def __repr__(self) -> str:
return "GlobalWindows()"