Interface AggregateFunction<IN,​ACC,​OUT>

  • Type Parameters:
    IN - The type of the values that are aggregated (input values)
    ACC - The type of the accumulator (intermediate aggregate state).
    OUT - The type of the aggregated result
    All Superinterfaces:
    Function, Serializable
    All Known Implementing Classes:
    RichAggregateFunction

    @PublicEvolving
    public interface AggregateFunction<IN,​ACC,​OUT>
    extends Function, Serializable
    The AggregateFunction is a flexible aggregation function, characterized by the following features:
    • The aggregates may use different types for input values, intermediate aggregates, and result type, to support a wide range of aggregation types.
    • Support for distributive aggregations: Different intermediate aggregates can be merged together, to allow for pre-aggregation/final-aggregation optimizations.

    The AggregateFunction's intermediate aggregate (in-progress aggregation state) is called the accumulator. Values are added to the accumulator, and final aggregates are obtained by finalizing the accumulator state. This supports aggregation functions where the intermediate state needs to be different than the aggregated values and the final result type, such as for example average (which typically keeps a count and sum). Merging intermediate aggregates (partial aggregates) means merging the accumulators.

    The AggregationFunction itself is stateless. To allow a single AggregationFunction instance to maintain multiple aggregates (such as one aggregate per key), the AggregationFunction creates a new accumulator whenever a new aggregation is started.

    Aggregation functions must be Serializable because they are sent around between distributed processes during distributed execution.

    Example: Average and Weighted Average

    
     // the accumulator, which holds the state of the in-flight aggregate
     public class AverageAccumulator {
         long count;
         long sum;
     }
    
     // implementation of an aggregation function for an 'average'
     public class Average implements AggregateFunction<Integer, AverageAccumulator, Double> {
    
         public AverageAccumulator createAccumulator() {
             return new AverageAccumulator();
         }
    
         public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
             a.count += b.count;
             a.sum += b.sum;
             return a;
         }
    
         public AverageAccumulator add(Integer value, AverageAccumulator acc) {
             acc.sum += value;
             acc.count++;
             return acc;
         }
    
         public Double getResult(AverageAccumulator acc) {
             return acc.sum / (double) acc.count;
         }
     }
    
     // implementation of a weighted average
     // this reuses the same accumulator type as the aggregate function for 'average'
     public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> {
    
         public AverageAccumulator createAccumulator() {
             return new AverageAccumulator();
         }
    
         public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
             a.count += b.count;
             a.sum += b.sum;
             return a;
         }
    
         public AverageAccumulator add(Datum value, AverageAccumulator acc) {
             acc.count += value.getWeight();
             acc.sum += value.getValue();
             return acc;
         }
    
         public Double getResult(AverageAccumulator acc) {
             return acc.sum / (double) acc.count;
         }
     }
     
    • Method Summary

      All Methods Instance Methods Abstract Methods 
      Modifier and Type Method Description
      ACC add​(IN value, ACC accumulator)
      Adds the given input value to the given accumulator, returning the new accumulator value.
      ACC createAccumulator()
      Creates a new accumulator, starting a new aggregate.
      OUT getResult​(ACC accumulator)
      Gets the result of the aggregation from the accumulator.
      ACC merge​(ACC a, ACC b)
      Merges two accumulators, returning an accumulator with the merged state.
    • Method Detail

      • createAccumulator

        ACC createAccumulator()
        Creates a new accumulator, starting a new aggregate.

        The new accumulator is typically meaningless unless a value is added via add(Object, Object).

        The accumulator is the state of a running aggregation. When a program has multiple aggregates in progress (such as per key and window), the state (per key and window) is the size of the accumulator.

        Returns:
        A new accumulator, corresponding to an empty aggregate.
      • add

        ACC add​(IN value,
                ACC accumulator)
        Adds the given input value to the given accumulator, returning the new accumulator value.

        For efficiency, the input accumulator may be modified and returned.

        Parameters:
        value - The value to add
        accumulator - The accumulator to add the value to
        Returns:
        The accumulator with the updated state
      • getResult

        OUT getResult​(ACC accumulator)
        Gets the result of the aggregation from the accumulator.
        Parameters:
        accumulator - The accumulator of the aggregation
        Returns:
        The final aggregation result.
      • merge

        ACC merge​(ACC a,
                  ACC b)
        Merges two accumulators, returning an accumulator with the merged state.

        This function may reuse any of the given accumulators as the target for the merge and return that. The assumption is that the given accumulators will not be used any more after having been passed to this function.

        Parameters:
        a - An accumulator to merge
        b - Another accumulator to merge
        Returns:
        The accumulator with the merged state