Interface Aggregator<T extends Value>

  • Type Parameters:
    T - The type of the aggregated value.
    All Superinterfaces:
    Serializable
    All Known Implementing Classes:
    BulkIterationBase.TerminationCriterionAggregator, DoubleSumAggregator, LongSumAggregator

    @PublicEvolving
    public interface Aggregator<T extends Value>
    extends Serializable
    Aggregators are a means of aggregating values across parallel instances of a function. Aggregators collect simple statistics (such as the number of processed elements) about the actual work performed in a function. Aggregators are specific to iterations and are commonly used to check the convergence of an iteration by using a ConvergenceCriterion. In contrast to the Accumulator (whose result is available at the end of a job, the aggregators are computed once per iteration superstep. Their value can be used to check for convergence (at the end of the iteration superstep) and it can be accessed in the next iteration superstep.

    Aggregators must be registered at the iteration inside which they are used via the function. In the Java API, the method is "IterativeDataSet.registerAggregator(...)" or "IterativeDataSet.registerAggregationConvergenceCriterion(...)" when using the aggregator together with a convergence criterion. Aggregators are always registered under a name. That name can be used to access the aggregator at runtime from within a function. The following code snippet shows a typical case. Here, it count across all parallel instances how many elements are filtered out by a function.

     // the user-defined function
     public class MyFilter extends FilterFunction<Double> {
         private LongSumAggregator agg;
    
         public void open(OpenContext openContext) {
             agg = getIterationRuntimeContext().getIterationAggregator("numFiltered");
         }
    
         public boolean filter (Double value) {
             if (value > 1000000.0) {
                 agg.aggregate(1);
                 return false
             }
    
             return true;
         }
     }
    
     // the iteration where the aggregator is registered
     IterativeDataSet<Double> iteration = input.iterate(100).registerAggregator("numFiltered", LongSumAggregator.class);
     ...
     DataSet<Double> filtered = someIntermediateResult.filter(new MyFilter);
     ...
     DataSet<Double> result = iteration.closeWith(filtered);
     ...
     

    Aggregators must be distributive: An aggregator must be able to pre-aggregate values and it must be able to aggregate these pre-aggregated values to form the final aggregate. Many aggregation functions fulfill this condition (sum, min, max) and others can be brought into that form: One can expressing count as a sum over values of one, and one can express average through a sum and a count.

    • Method Detail

      • getAggregate

        T getAggregate()
        Gets the aggregator's current aggregate.
        Returns:
        The aggregator's current aggregate.
      • aggregate

        void aggregate​(T element)
        Aggregates the given element. In the case of a sum aggregator, this method adds the given value to the sum.
        Parameters:
        element - The element to aggregate.
      • reset

        void reset()
        Resets the internal state of the aggregator. This must bring the aggregator into the same state as if it was newly initialized.