Window#

Tumble Window#

Tumbling windows are consecutive, non-overlapping windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups elements in 5 minutes intervals.

Example:

>>> from pyflink.table.expressions import col, lit
>>> Tumble.over(lit(10).minutes) \
...       .on(col("rowtime")) \
...       .alias("w")

Tumble.over(size)

Creates a tumbling window.

TumbleWithSize.on(time_field)

Specifies the time attribute on which rows are grouped.

TumbleWithSizeOnTime.alias(alias)

Assigns an alias for this window that the following group_by() and select() clause can refer to.

Sliding Window#

Sliding windows have a fixed size and slide by a specified slide interval. If the slide interval is smaller than the window size, sliding windows are overlapping. Thus, an element can be assigned to multiple windows.

For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive window evaluations.

Example:

>>> from pyflink.table.expressions import col, lit
>>> Slide.over(lit(10).minutes) \
...      .every(lit(5).minutes) \
...      .on(col("rowtime")) \
...      .alias("w")

Slide.over(size)

Creates a sliding window.

SlideWithSize.every(slide)

Specifies the window's slide as time or row-count interval.

SlideWithSizeAndSlide.on(time_field)

Specifies the time attribute on which rows are grouped.

SlideWithSizeAndSlideOnTime.alias(alias)

Assigns an alias for this window that the following group_by() and select() clause can refer to.

Session Window#

The boundary of session windows are defined by intervals of inactivity, i.e., a session window is closes if no event appears for a defined gap period.

Example:

>>> from pyflink.table.expressions import col, lit
>>> Session.with_gap(lit(10).minutes) \\
...        .on(col("rowtime")) \\
...        .alias("w")

Session.with_gap(gap)

Creates a session window.

SessionWithGap.on(time_field)

Specifies the time attribute on which rows are grouped.

SessionWithGapOnTime.alias(alias)

Assigns an alias for this window that the following group_by() and select() clause can refer to.

Over Window#

Similar to SQL, over window aggregates compute an aggregate for each input row over a range of its neighboring rows.

Example:

>>> from pyflink.table.expressions import col, UNBOUNDED_RANGE
>>> Over.partition_by(col("a")) \
...     .order_by(col("rowtime")) \
...     .preceding(UNBOUNDED_RANGE) \
...     .alias("w")

Over.order_by(order_by)

Specifies the time attribute on which rows are ordered.

Over.partition_by(*partition_by)

Partitions the elements on some partition keys.

OverWindowPartitionedOrdered.alias(alias)

Set the preceding offset (based on time or row-count intervals) for over window.

OverWindowPartitionedOrdered.preceding(preceding)

Set the preceding offset (based on time or row-count intervals) for over window.

OverWindowPartitionedOrderedPreceding.alias(alias)

Assigns an alias for this window that the following select() clause can refer to.

OverWindowPartitionedOrderedPreceding.following(...)

Set the following offset (based on time or row-count intervals) for over window.

OverWindowPartitioned.order_by(order_by)

Specifies the time attribute on which rows are ordered.