Interface AggregateFunction<IN,ACC,OUT>
-
- Type Parameters:
IN
- The type of the values that are aggregated (input values)ACC
- The type of the accumulator (intermediate aggregate state).OUT
- The type of the aggregated result
- All Superinterfaces:
Function
,Serializable
- All Known Implementing Classes:
RichAggregateFunction
@PublicEvolving public interface AggregateFunction<IN,ACC,OUT> extends Function, Serializable
TheAggregateFunction
is a flexible aggregation function, characterized by the following features:- The aggregates may use different types for input values, intermediate aggregates, and result type, to support a wide range of aggregation types.
- Support for distributive aggregations: Different intermediate aggregates can be merged together, to allow for pre-aggregation/final-aggregation optimizations.
The
AggregateFunction
's intermediate aggregate (in-progress aggregation state) is called the accumulator. Values are added to the accumulator, and final aggregates are obtained by finalizing the accumulator state. This supports aggregation functions where the intermediate state needs to be different than the aggregated values and the final result type, such as for example average (which typically keeps a count and sum). Merging intermediate aggregates (partial aggregates) means merging the accumulators.The AggregationFunction itself is stateless. To allow a single AggregationFunction instance to maintain multiple aggregates (such as one aggregate per key), the AggregationFunction creates a new accumulator whenever a new aggregation is started.
Aggregation functions must be
Serializable
because they are sent around between distributed processes during distributed execution.Example: Average and Weighted Average
// the accumulator, which holds the state of the in-flight aggregate public class AverageAccumulator { long count; long sum; } // implementation of an aggregation function for an 'average' public class Average implements AggregateFunction<Integer, AverageAccumulator, Double> { public AverageAccumulator createAccumulator() { return new AverageAccumulator(); } public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { a.count += b.count; a.sum += b.sum; return a; } public AverageAccumulator add(Integer value, AverageAccumulator acc) { acc.sum += value; acc.count++; return acc; } public Double getResult(AverageAccumulator acc) { return acc.sum / (double) acc.count; } } // implementation of a weighted average // this reuses the same accumulator type as the aggregate function for 'average' public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> { public AverageAccumulator createAccumulator() { return new AverageAccumulator(); } public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { a.count += b.count; a.sum += b.sum; return a; } public AverageAccumulator add(Datum value, AverageAccumulator acc) { acc.count += value.getWeight(); acc.sum += value.getValue(); return acc; } public Double getResult(AverageAccumulator acc) { return acc.sum / (double) acc.count; } }
-
-
Method Summary
All Methods Instance Methods Abstract Methods Modifier and Type Method Description ACC
add(IN value, ACC accumulator)
Adds the given input value to the given accumulator, returning the new accumulator value.ACC
createAccumulator()
Creates a new accumulator, starting a new aggregate.OUT
getResult(ACC accumulator)
Gets the result of the aggregation from the accumulator.ACC
merge(ACC a, ACC b)
Merges two accumulators, returning an accumulator with the merged state.
-
-
-
Method Detail
-
createAccumulator
ACC createAccumulator()
Creates a new accumulator, starting a new aggregate.The new accumulator is typically meaningless unless a value is added via
add(Object, Object)
.The accumulator is the state of a running aggregation. When a program has multiple aggregates in progress (such as per key and window), the state (per key and window) is the size of the accumulator.
- Returns:
- A new accumulator, corresponding to an empty aggregate.
-
add
ACC add(IN value, ACC accumulator)
Adds the given input value to the given accumulator, returning the new accumulator value.For efficiency, the input accumulator may be modified and returned.
- Parameters:
value
- The value to addaccumulator
- The accumulator to add the value to- Returns:
- The accumulator with the updated state
-
getResult
OUT getResult(ACC accumulator)
Gets the result of the aggregation from the accumulator.- Parameters:
accumulator
- The accumulator of the aggregation- Returns:
- The final aggregation result.
-
merge
ACC merge(ACC a, ACC b)
Merges two accumulators, returning an accumulator with the merged state.This function may reuse any of the given accumulators as the target for the merge and return that. The assumption is that the given accumulators will not be used any more after having been passed to this function.
- Parameters:
a
- An accumulator to mergeb
- Another accumulator to merge- Returns:
- The accumulator with the merged state
-
-