pyflink.table.GroupedTable.flat_aggregate#
- GroupedTable.flat_aggregate(func: Union[pyflink.table.expression.Expression, pyflink.table.udf.UserDefinedAggregateFunctionWrapper]) pyflink.table.table.FlatAggregateTable [source]#
Performs a flat_aggregate operation on a grouped table. flat_aggregate takes a
TableAggregateFunction
which returns multiple rows. Use a selection after flatAggregate.Example:
>>> table_agg = udtaf(MyTableAggregateFunction()) >>> tab.group_by(col('c')).flat_aggregate(table_agg(col('a')).alias("a")).select( ... col('c'), col('a')) >>> # take all the columns as inputs >>> class Top2(TableAggregateFunction): ... def emit_value(self, accumulator): ... yield Row(accumulator[0]) ... yield Row(accumulator[1]) ... ... def create_accumulator(self): ... return [None, None] ... ... def accumulate(self, accumulator, *args): ... args[0] # type: Row ... if args[0][0] is not None: ... if accumulator[0] is None or args[0][0] > accumulator[0]: ... accumulator[1] = accumulator[0] ... accumulator[0] = args[0][0] ... elif accumulator[1] is None or args[0][0] > accumulator[1]: ... accumulator[1] = args[0][0] ... ... def get_accumulator_type(self): ... return DataTypes.ARRAY(DataTypes.BIGINT()) ... ... def get_result_type(self): ... return DataTypes.ROW( ... [DataTypes.FIELD("a", DataTypes.BIGINT())]) >>> top2 = udtaf(Top2()) >>> tab.group_by(col('c')) \ ... .flat_aggregate(top2.alias("a", "b")) \ ... .select(col('a'), col('b'))
- Parameters
func – user-defined table aggregate function.
- Returns
The result table.
New in version 1.13.0.