Interface SupportsAggregatePushDown


  • @PublicEvolving
    public interface SupportsAggregatePushDown
    Enables to push down the local aggregates into a ScanTableSource.

    Given the following example inventory table:

    
     CREATE TABLE inventory (
       id INT,
       name STRING,
       amount INT,
       price DOUBLE,
       type STRING
     )
     

    And we have a simple aggregate sql:

    
     SELECT
       SUM(amount),
       MAX(price),
       AVG(price),
       COUNT(1),
       name,
       type
     FROM inventory
     GROUP BY name, type
     

    In the example above, sum(amount), max(price), avg(price), count(1) and group by name, type are aggregate functions and grouping sets. By default, if this interface is not implemented, local aggregates are applied in a subsequent operation after the source. The optimized plan will be the following without local aggregate push down:

    
     Calc(select=[EXPR$0, EXPR$1, EXPR$2, EXPR$3, name, type])
     +- HashAggregate(groupBy=[name, type], select=[name, type, Final_SUM(sum$0) AS EXPR$0, Final_MAX(max$1) AS EXPR$1, Final_AVG(sum$2, count$3) AS EXPR$2, Final_COUNT(count1$4) AS EXPR$3])
        +- Exchange(distribution=[hash[name, type]])
           +- LocalHashAggregate(groupBy=[name, type], select=[name, type, Partial_SUM(amount) AS sum$0, Partial_MAX(price) AS max$1, Partial_AVG(price) AS (sum$2, count$3), Partial_COUNT(*) AS count1$4])
              +- TableSourceScan(table=[[inventory, project=[name, type, amount, price]]], fields=[name, type, amount, price])
     

    For efficiency, a source can push the local aggregates further down into underlying database or storage system to reduce the network and computing overhead. The passed aggregate functions and grouping sets are in the order defined by the query. The source can directly return the aggregated values if the underlying database or storage system has aggregation capability. The optimized plan will be changed to the following pattern with local aggregate push down:

    
     Calc(select=[EXPR$0, EXPR$1, EXPR$2, EXPR$3, name, type])
     +- HashAggregate(groupBy=[name, type], select=[name, type, Final_SUM(sum$0) AS EXPR$0, Final_MAX(max$1) AS EXPR$1, Final_AVG(sum$2, count$3) AS EXPR$2, Final_COUNT(count1$4) AS EXPR$3])
        +- Exchange(distribution=[hash[name, type]])
           +- TableSourceScan(table=[[inventory, project=[name, type, amount, price], aggregates=[grouping=[name,type], aggFunctions=[IntSumAggFunction(amount),DoubleMaxAggFunction(price),DoubleSum0AggFunction(price),CountAggFunction(price),Count1AggFunction()]]]], fields=[name, type, sum$0, max$1, sum$2, count$3, count1$4])
     

    We can see the original LocalHashAggregate has been removed and pushed down into the TableSourceScan. Meanwhile the output datatype of TableSourceScan has changed, which is the pattern of grouping sets + the output of aggregate functions.

    Due to the complexity of aggregate, the aggregate push down does not support a number of more complex statements at present:

    • complex grouping operations such as ROLLUP, CUBE, or GROUPING SETS.
    • expressions inside the aggregation function call: such as sum(a * b).
    • aggregations with ordering.
    • aggregations with filter.

    For the above example inventory table, and we have the following test sql:

    
     SELECT
       SUM(amount) FILTER(WHERE amount > 0),
       name,
       type
     FROM inventory
     GROUP BY name, type
     

    Since there is a filter after the sum aggregate function. And the optimized plan as shown below. The local aggregate will not be pushed down in this scenario.

    
     Calc(select=[EXPR$0, name, type])
     +- HashAggregate(isMerge=[true], groupBy=[name, type], select=[name, type, Final_SUM(sum$0) AS EXPR$0])
        +- Exchange(distribution=[hash[name, type]])
           +- LocalHashAggregate(groupBy=[name, type], select=[name, type, Partial_SUM(amount) FILTER $f3 AS sum$0])
              +- Calc(select=[name, type, amount, IS TRUE(>(amount, 0)) AS $f3])
                 +- TableSourceScan(table=[[inventory, project=[name, type, amount]]], fields=[name, type, amount])
     

    Note: The local aggregate push down strategy is all or nothing, it can only be pushed down if all aggregate functions are supported.

    Regardless if this interface is implemented or not, a final aggregation is always applied in a subsequent operation after the source.

    • Method Detail

      • applyAggregates

        boolean applyAggregates​(List<int[]> groupingSets,
                                List<AggregateExpression> aggregateExpressions,
                                DataType producedDataType)
        Provides a list of aggregate expressions and the grouping keys. The source should pick all the aggregates or nothing and return whether all the aggregates have been pushed down into the source.

        Note: Use the passed producedDataType instead of TableSchema.toPhysicalRowDataType() for describing the final output data type when creating TypeInformation. The projection of grouping keys and aggregate values is already considered in the given output data type. The passed data type pattern is grouping sets + aggregate function result, downstream storage need to organize the returned aggregate data strictly in this manner.

        Parameters:
        groupingSets - a array list of the grouping sets. In the example mentioned in SupportsAggregatePushDown, this method would receive the groupingSets of List([1, 4]) which is equivalent to List(["name", "type"]).
        aggregateExpressions - a list contains all of aggregates, you should check if all of aggregate functions can be processed by downstream system. The applying strategy is all or nothing.
        producedDataType - the final output type of the source.
        Returns:
        true if all the aggregates have been pushed down into source, false otherwise.