Source code for pyflink.common.watermark_strategy

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import abc

from typing import Any, Optional

from pyflink.common.time import Duration
from pyflink.java_gateway import get_gateway


[docs]class WatermarkStrategy(object): """ The WatermarkStrategy defines how to generate Watermarks in the stream sources. The WatermarkStrategy is a builder/factory for the WatermarkGenerator that generates the watermarks and the TimestampAssigner which assigns the internal timestamp of a record. The convenience methods, for example forBoundedOutOfOrderness(Duration), create a WatermarkStrategy for common built in strategies. """ def __init__(self, j_watermark_strategy): self._j_watermark_strategy = j_watermark_strategy self._timestamp_assigner = None
[docs] def with_timestamp_assigner(self, timestamp_assigner: 'TimestampAssigner') -> \ 'WatermarkStrategy': """ Creates a new WatermarkStrategy that wraps this strategy but instead uses the given a TimestampAssigner by implementing TimestampAssigner interface. Example: :: >>> watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \\ >>> .with_timestamp_assigner(MyTimestampAssigner()) :param timestamp_assigner: The given TimestampAssigner. :return: A WaterMarkStrategy that wraps a TimestampAssigner. """ self._timestamp_assigner = timestamp_assigner return self
[docs] def with_idleness(self, idle_timeout: Duration) -> 'WatermarkStrategy': """ Creates a new enriched WatermarkStrategy that also does idleness detection in the created WatermarkGenerator. Example: :: >>> WatermarkStrategy \\ ... .for_bounded_out_of_orderness(Duration.of_seconds(20)) \\ ... .with_idleness(Duration.of_minutes(1)) :param idle_timeout: The idle timeout. :return: A new WatermarkStrategy with idle detection configured. """ return WatermarkStrategy(self._j_watermark_strategy.withIdleness(idle_timeout._j_duration))
[docs] def with_watermark_alignment(self, watermark_group: str, max_allowed_watermark_drift: Duration, update_interval: Optional[Duration] = None) -> 'WatermarkStrategy': """ Creates a new :class:`WatermarkStrategy` that configures the maximum watermark drift from other sources/tasks/partitions in the same watermark group. The group may contain completely independent sources (e.g. File and Kafka). Once configured Flink will "pause" consuming from a source/task/partition that is ahead of the emitted watermark in the group by more than the maxAllowedWatermarkDrift. Example: :: >>> WatermarkStrategy \\ ... .for_bounded_out_of_orderness(Duration.of_seconds(20)) \\ ... .with_watermark_alignment("alignment-group-1", Duration.of_seconds(20), ... Duration.of_seconds(1)) :param watermark_group: A group of sources to align watermarks :param max_allowed_watermark_drift: Maximal drift, before we pause consuming from the source/task/partition :param update_interval: How often tasks should notify coordinator about the current watermark and how often the coordinator should announce the maximal aligned watermark. If is None, default update interval (1000ms) is used. :return: A new WatermarkStrategy with watermark alignment configured. .. versionadded:: 1.16.0 """ if update_interval is None: return WatermarkStrategy(self._j_watermark_strategy.withWatermarkAlignment( watermark_group, max_allowed_watermark_drift._j_duration )) else: return WatermarkStrategy(self._j_watermark_strategy.withWatermarkAlignment( watermark_group, max_allowed_watermark_drift._j_duration, update_interval._j_duration, ))
[docs] @staticmethod def for_monotonous_timestamps() -> 'WatermarkStrategy': """ Creates a watermark strategy for situations with monotonously ascending timestamps. The watermarks are generated periodically and tightly follow the latest timestamp in the data. The delay introduced by this strategy is mainly the periodic interval in which the watermarks are generated. """ JWaterMarkStrategy = get_gateway().jvm\ .org.apache.flink.api.common.eventtime.WatermarkStrategy return WatermarkStrategy(JWaterMarkStrategy.forMonotonousTimestamps())
[docs] @staticmethod def for_bounded_out_of_orderness(max_out_of_orderness: Duration) -> 'WatermarkStrategy': """ Creates a watermark strategy for situations where records are out of order, but you can place an upper bound on how far the events are out of order. An out-of-order bound B means that once the an event with timestamp T was encountered, no events older than (T - B) will follow any more. """ JWaterMarkStrategy = get_gateway().jvm \ .org.apache.flink.api.common.eventtime.WatermarkStrategy return WatermarkStrategy( JWaterMarkStrategy.forBoundedOutOfOrderness(max_out_of_orderness._j_duration))
[docs] @staticmethod def no_watermarks() -> 'WatermarkStrategy': """ Creates a watermark strategy that generates no watermarks at all. This may be useful in scenarios that do pure processing-time based stream processing. .. versionadded:: 1.16.0 """ JWaterMarkStrategy = get_gateway().jvm \ .org.apache.flink.api.common.eventtime.WatermarkStrategy return WatermarkStrategy(JWaterMarkStrategy.noWatermarks())
class TimestampAssigner(abc.ABC): """ A TimestampAssigner assigns event time timestamps to elements. These timestamps are used by all functions that operate on event time, for example event time windows. Timestamps can be an arbitrary int value, but all built-in implementations represent it as the milliseconds since the Epoch (midnight, January 1, 1970 UTC), the same way as time.time() does it. """ @abc.abstractmethod def extract_timestamp(self, value: Any, record_timestamp: int) -> int: """ Assigns a timestamp to an element, in milliseconds since the Epoch. This is independent of any particular time zone or calendar. The method is passed the previously assigned timestamp of the element. That previous timestamp may have been assigned from a previous assigner. If the element did not carry a timestamp before, this value is the minimum value of int type. :param value: The element that the timestamp will be assigned to. :param record_timestamp: The current internal timestamp of the element, or a negative value, if no timestamp has been assigned yet. :return: The new timestamp. """ pass
[docs]class AssignerWithPeriodicWatermarksWrapper(object): """ The AssignerWithPeriodicWatermarks assigns event time timestamps to elements, and generates low watermarks that signal event time progress within the stream. These timestamps and watermarks are used by functions and operators that operate on event time, for example event time windows. """ def __init__(self, j_assigner_with_periodic_watermarks): self._j_assigner_with_periodic_watermarks = j_assigner_with_periodic_watermarks