pyflink.table.Table.flat_aggregate#
- Table.flat_aggregate(func: Union[pyflink.table.expression.Expression, pyflink.table.udf.UserDefinedAggregateFunctionWrapper]) pyflink.table.table.FlatAggregateTable [source]#
Perform a global flat_aggregate without group_by. flat_aggregate takes a
TableAggregateFunction
which returns multiple rows. Use a selection after the flat_aggregate.Example:
>>> table_agg = udtaf(MyTableAggregateFunction()) >>> tab.flat_aggregate(table_agg(col('a')).alias("a", "b")).select(col('a'), col('b')) >>> # take all the columns as inputs >>> class Top2(TableAggregateFunction): ... def emit_value(self, accumulator): ... yield Row(accumulator[0]) ... yield Row(accumulator[1]) ... ... def create_accumulator(self): ... return [None, None] ... ... def accumulate(self, accumulator, *args): ... args[0] # type: Row ... if args[0][0] is not None: ... if accumulator[0] is None or args[0][0] > accumulator[0]: ... accumulator[1] = accumulator[0] ... accumulator[0] = args[0][0] ... elif accumulator[1] is None or args[0][0] > accumulator[1]: ... accumulator[1] = args[0][0] ... ... def get_accumulator_type(self): ... return DataTypes.ARRAY(DataTypes.BIGINT()) ... ... def get_result_type(self): ... return DataTypes.ROW( ... [DataTypes.FIELD("a", DataTypes.BIGINT())]) >>> top2 = udtaf(Top2()) >>> tab.flat_aggregate(top2.alias("a", "b")).select(col('a'), col('b'))
- Parameters
func – user-defined table aggregate function.
- Returns
The result table.
New in version 1.13.0.