Windows
Windows on Keyed Data Streams
Flink offers a variety of methods for defining windows on a KeyedStream
. All of these group elements per key,
i.e., each window will contain elements with the same key value.
Basic Window Constructs
Flink offers a general window mechanism that provides flexibility, as well as a number of pre-defined windows for common use cases. See first if your use case can be served by the pre-defined windows below before moving to defining your own windows.
Transformation | Description |
---|---|
Tumbling time window KeyedStream → WindowedStream |
Defines a window of 5 seconds, that "tumbles". This means that elements are grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. The notion of time is specified by the selected TimeCharacteristic (see time).
|
Sliding time window KeyedStream → WindowedStream |
Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than one window (since windows overlap by at most 4 seconds) The notion of time is specified by the selected TimeCharacteristic (see time).
|
Tumbling count window KeyedStream → WindowedStream |
Defines a window of 1000 elements, that "tumbles". This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element belongs to exactly one window.
|
Sliding count window KeyedStream → WindowedStream |
Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element can belong to more than one window (as windows overlap by at most 900 elements).
|
Transformation | Description |
---|---|
Tumbling time window KeyedStream → WindowedStream |
Defines a window of 5 seconds, that "tumbles". This means that elements are grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. The notion of time is specified by the selected TimeCharacteristic (see time).
|
Sliding time window KeyedStream → WindowedStream |
Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than one window (since windows overlap by at most 4 seconds) The notion of time is specified by the selected TimeCharacteristic (see time).
|
Tumbling count window KeyedStream → WindowedStream |
Defines a window of 1000 elements, that "tumbles". This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element belongs to exactly one window.
|
Sliding count window KeyedStream → WindowedStream |
Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element can belong to more than one window (as windows overlap by at most 900 elements).
|
Advanced Window Constructs
The general mechanism can define more powerful windows at the cost of more verbose syntax. For example, below is a window definition where windows hold elements of the last 5 seconds and slides every 1 second, but the execution of the window function is triggered when 100 elements have been added to the window, and every time execution is triggered, 10 elements are retained in the window:
keyedStream
.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
.trigger(CountTrigger.of(100))
.evictor(CountEvictor.of(10));
keyedStream
.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
.trigger(CountTrigger.of(100))
.evictor(CountEvictor.of(10))
The general recipe for building a custom window is to specify (1) a WindowAssigner
, (2) a Trigger
(optionally),
and (3) an Evictor
(optionally).
The WindowAssigner
defines how incoming elements are assigned to windows. A window is a logical group of elements
that has a begin-value, and an end-value corresponding to a begin-time and end-time. Elements with timestamp (according
to some notion of time described above within these values are part of the window).
For example, the SlidingEventTimeWindows
assigner in the code above defines a window of size 5 seconds, and a slide of 1 second. Assume that
time starts from 0 and is measured in milliseconds. Then, we have 6 windows
that overlap: [0,5000], [1000,6000], [2000,7000], [3000, 8000], [4000, 9000], and [5000, 10000]. Each incoming
element is assigned to the windows according to its timestamp. For example, an element with timestamp 2000 will be
assigned to the first three windows. Flink comes bundled with window assigners that cover the most common use cases. You can write your
own window types by extending the WindowAssigner
class.
Transformation | Description |
---|---|
Global window KeyedStream → WindowedStream |
All incoming elements of a given key are assigned to the same window. The window does not contain a default trigger, hence it will never be triggered if a trigger is not explicitly specified.
|
Tumbling time windows KeyedStream → WindowedStream |
Incoming elements are assigned to a window of a certain size (1 second below) based on their timestamp. Windows do not overlap, i.e., each element is assigned to exactly one window. This assigner comes with a default trigger that fires for a window when a watermark with value higher than its end-value is received.
|
Sliding time windows KeyedStream → WindowedStream |
Incoming elements are assigned to a window of a certain size (5 seconds below) based on their timestamp. Windows "slide" by the provided value (1 second in the example), and hence overlap. This assigner comes with a default trigger that fires for a window when a watermark with value higher than its end-value is received.
|
Tumbling processing time windows KeyedStream → WindowedStream |
Incoming elements are assigned to a window of a certain size (1 second below) based on the current processing time. Windows do not overlap, i.e., each element is assigned to exactly one window. This assigner comes with a default trigger that fires for a window a window when the current processing time exceeds its end-value.
|
Sliding processing time windows KeyedStream → WindowedStream |
Incoming elements are assigned to a window of a certain size (5 seconds below) based on their timestamp. Windows "slide" by the provided value (1 second in the example), and hence overlap. This assigner comes with a default trigger that fires for a window a window when the current processing time exceeds its end-value.
|
Transformation | Description |
---|---|
Global window KeyedStream → WindowedStream |
All incoming elements of a given key are assigned to the same window. The window does not contain a default trigger, hence it will never be triggered if a trigger is not explicitly specified.
|
Tumbling time windows KeyedStream → WindowedStream |
Incoming elements are assigned to a window of a certain size (1 second below) based on their timestamp. Windows do not overlap, i.e., each element is assigned to exactly one window. This assigner comes with a default trigger that fires for a window when a watermark with value higher than its end-value is received.
|
Sliding time windows KeyedStream → WindowedStream |
Incoming elements are assigned to a window of a certain size (5 seconds below) based on their timestamp. Windows "slide" by the provided value (1 second in the example), and hence overlap. This assigner comes with a default trigger that fires for a window when a watermark with value higher than its end-value is received.
|
Tumbling processing time windows KeyedStream → WindowedStream |
Incoming elements are assigned to a window of a certain size (1 second below) based on the current processing time. Windows do not overlap, i.e., each element is assigned to exactly one window. This assigner comes with a default trigger that fires for a window a window when the current processing time exceeds its end-value.
|
Sliding processing time windows KeyedStream → WindowedStream |
Incoming elements are assigned to a window of a certain size (5 seconds below) based on their timestamp. Windows "slide" by the provided value (1 second in the example), and hence overlap. This assigner comes with a default trigger that fires for a window a window when the current processing time exceeds its end-value.
|
The Trigger
specifies when the function that comes after the window clause (e.g., sum
, count
) is evaluated (“fires”)
for each window. If a trigger is not specified, a default trigger for each window type is used (that is part of the
definition of the WindowAssigner
). Flink comes bundled with a set of triggers if the ones that windows use by
default do not fit the application. You can write your own trigger by implementing the Trigger
interface. Note that
specifying a trigger will override the default trigger of the window assigner.
Transformation | Description |
---|---|
Processing time trigger |
A window is fired when the current processing time exceeds its end-value. The elements on the triggered window are henceforth discarded.
|
Watermark trigger |
A window is fired when a watermark with value that exceeds the window's end-value has been received. The elements on the triggered window are henceforth discarded.
|
Continuous processing time trigger |
A window is periodically considered for being fired (every 5 seconds in the example). The window is actually fired only when the current processing time exceeds its end-value. The elements on the triggered window are retained.
|
Continuous watermark time trigger |
A window is periodically considered for being fired (every 5 seconds in the example). A window is actually fired when a watermark with value that exceeds the window's end-value has been received. The elements on the triggered window are retained.
|
Count trigger |
A window is fired when it has more than a certain number of elements (1000 below). The elements of the triggered window are retained.
|
Purging trigger |
Takes any trigger as an argument and forces the triggered window elements to be "purged" (discarded) after triggering.
|
Delta trigger |
A window is periodically considered for being fired (every 5000 milliseconds in the example). A window is actually fired when the value of the last added element exceeds the value of the first element inserted in the window according to a `DeltaFunction`.
|
Transformation | Description |
---|---|
Processing time trigger |
A window is fired when the current processing time exceeds its end-value. The elements on the triggered window are henceforth discarded.
|
Watermark trigger |
A window is fired when a watermark with value that exceeds the window's end-value has been received. The elements on the triggered window are henceforth discarded.
|
Continuous processing time trigger |
A window is periodically considered for being fired (every 5 seconds in the example). The window is actually fired only when the current processing time exceeds its end-value. The elements on the triggered window are retained.
|
Continuous watermark time trigger |
A window is periodically considered for being fired (every 5 seconds in the example). A window is actually fired when a watermark with value that exceeds the window's end-value has been received. The elements on the triggered window are retained.
|
Count trigger |
A window is fired when it has more than a certain number of elements (1000 below). The elements of the triggered window are retained.
|
Purging trigger |
Takes any trigger as an argument and forces the triggered window elements to be "purged" (discarded) after triggering.
|
Delta trigger |
A window is periodically considered for being fired (every 5000 milliseconds in the example). A window is actually fired when the value of the last added element exceeds the value of the first element inserted in the window according to a `DeltaFunction`.
|
After the trigger fires, and before the function (e.g., sum
, count
) is applied to the window contents, an
optional Evictor
removes some elements from the beginning of the window before the remaining elements
are passed on to the function. Flink comes bundled with a set of evictors You can write your own evictor by
implementing the Evictor
interface.
Transformation | Description |
---|---|
Time evictor |
Evict all elements from the beginning of the window, so that elements from end-value - 1 second until end-value are retained (the resulting window size is 1 second).
|
Count evictor |
Retain 1000 elements from the end of the window backwards, evicting all others.
|
Delta evictor |
Starting from the beginning of the window, evict elements until an element with value lower than the value of the last element is found (by a threshold and a DeltaFunction).
|
Transformation | Description |
---|---|
Time evictor |
Evict all elements from the beginning of the window, so that elements from end-value - 1 second until end-value are retained (the resulting window size is 1 second).
|
Count evictor |
Retain 1000 elements from the end of the window backwards, evicting all others.
|
Delta evictor |
Starting from the beginning of the window, evict elements until an element with value lower than the value of the last element is found (by a threshold and a DeltaFunction).
|
Recipes for Building Windows
The mechanism of window assigner, trigger, and evictor is very powerful, and it allows you to define many different kinds of windows. Flink’s basic window constructs are, in fact, syntactic sugar on top of the general mechanism. Below is how some common types of windows can be constructed using the general mechanism
Window type | Definition |
---|---|
Tumbling count window
|
|
Sliding count window
|
|
Tumbling event time window
|
|
Sliding event time window
|
|
Tumbling processing time window
|
|
Sliding processing time window
|
|
Windows on Unkeyed Data Streams
You can also define windows on regular (non-keyed) data streams using the windowAll
transformation. These
windowed data streams have all the capabilities of keyed windowed data streams, but are evaluated at a single
task (and hence at a single computing node). The syntax for defining triggers and evictors is exactly the
same:
nonKeyedStream
.windowAll(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
.trigger(CountTrigger.of(100))
.evictor(CountEvictor.of(10));
nonKeyedStream
.windowAll(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
.trigger(CountTrigger.of(100))
.evictor(CountEvictor.of(10))
Basic window definitions are also available for windows on non-keyed streams:
Transformation | Description |
---|---|
Tumbling time window all DataStream → WindowedStream |
Defines a window of 5 seconds, that "tumbles". This means that elements are grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. The notion of time used is controlled by the StreamExecutionEnvironment.
|
Sliding time window all DataStream → WindowedStream |
Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than one window (since windows overlap by at least 4 seconds) The notion of time used is controlled by the StreamExecutionEnvironment.
|
Tumbling count window all DataStream → WindowedStream |
Defines a window of 1000 elements, that "tumbles". This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element belongs to exactly one window.
|
Sliding count window all DataStream → WindowedStream |
Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element can belong to more than one window (as windows overlap by at least 900 elements).
|
Transformation | Description |
---|---|
Tumbling time window all DataStream → WindowedStream |
Defines a window of 5 seconds, that "tumbles". This means that elements are grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. The notion of time used is controlled by the StreamExecutionEnvironment.
|
Sliding time window all DataStream → WindowedStream |
Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than one window (since windows overlap by at least 4 seconds) The notion of time used is controlled by the StreamExecutionEnvironment.
|
Tumbling count window all DataStream → WindowedStream |
Defines a window of 1000 elements, that "tumbles". This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element belongs to exactly one window.
|
Sliding count window all DataStream → WindowedStream |
Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element can belong to more than one window (as windows overlap by at least 900 elements).
|