Ctrl+K
Logo image Logo image

Site Navigation

  • API Reference
  • Examples

Site Navigation

  • API Reference
  • Examples

Section Navigation

  • PyFlink Table
  • PyFlink DataStream
    • StreamExecutionEnvironment
    • DataStream
    • Functions
    • State
    • Timer
    • Window
    • Checkpoint
    • Side Outputs
    • Connectors
    • Formats
  • PyFlink Common

pyflink.datastream.data_stream.WindowedStream.aggregate#

WindowedStream.aggregate(aggregate_function: pyflink.datastream.functions.AggregateFunction, window_function: Optional[Union[pyflink.datastream.functions.WindowFunction, pyflink.datastream.functions.ProcessWindowFunction]] = None, accumulator_type: Optional[pyflink.common.typeinfo.TypeInformation] = None, output_type: Optional[pyflink.common.typeinfo.TypeInformation] = None) → pyflink.datastream.data_stream.DataStream[source]#

Applies the given window function to each window. The window function is called for each evaluation of the window for each key individually. The output of the window function is interpreted as a regular non-windowed stream.

Arriving data is incrementally aggregated using the given aggregate function. This means that the window function typically has only a single value to process when called.

Example:

>>> class AverageAggregate(AggregateFunction):
...     def create_accumulator(self) -> Tuple[int, int]:
...         return 0, 0
...
...     def add(self, value: Tuple[str, int], accumulator: Tuple[int, int]) \
...             -> Tuple[int, int]:
...         return accumulator[0] + value[1], accumulator[1] + 1
...
...     def get_result(self, accumulator: Tuple[int, int]) -> float:
...         return accumulator[0] / accumulator[1]
...
...     def merge(self, a: Tuple[int, int], b: Tuple[int, int]) -> Tuple[int, int]:
...         return a[0] + b[0], a[1] + b[1]
>>> ds.key_by(lambda x: x[1]) \
...     .window(TumblingEventTimeWindows.of(Time.seconds(5))) \
...     .aggregate(AverageAggregate(),
...                accumulator_type=Types.TUPLE([Types.LONG(), Types.LONG()]),
...                output_type=Types.DOUBLE())
Parameters
  • aggregate_function – The aggregation function that is used for incremental aggregation.

  • window_function – The window function.

  • accumulator_type – Type information for the internal accumulator type of the aggregation function.

  • output_type – Type information for the result type of the window function.

Returns

The data stream that is the result of applying the window function to the window.

New in version 1.16.0.

previous

pyflink.datastream.data_stream.WindowedStream.reduce

next

pyflink.datastream.data_stream.WindowedStream.apply

Show Source

Created using Sphinx 4.5.0.