IN
- The type of the values that are aggregated (input values)ACC
- The type of the accumulator (intermediate aggregate state).OUT
- The type of the aggregated result@PublicEvolving public interface AggregateFunction<IN,ACC,OUT> extends Function, Serializable
AggregateFunction
is a flexible aggregation function, characterized by the
following features:
The AggregateFunction
's intermediate aggregate (in-progress aggregation state)
is called the accumulator. Values are added to the accumulator, and final aggregates are
obtained by finalizing the accumulator state. This supports aggregation functions where the
intermediate state needs to be different than the aggregated values and the final result type,
such as for example average (which typically keeps a count and sum).
Merging intermediate aggregates (partial aggregates) means merging the accumulators.
The AggregationFunction itself is stateless. To allow a single AggregationFunction instance to maintain multiple aggregates (such as one aggregate per key), the AggregationFunction creates a new accumulator whenever a new aggregation is started.
Aggregation functions must be Serializable
because they are sent around
between distributed processes during distributed execution.
// the accumulator, which holds the state of the in-flight aggregate
public class AverageAccumulator {
long count;
long sum;
}
// implementation of an aggregation function for an 'average'
public class Average implements AggregateFunction<Integer, AverageAccumulator, Double> {
public AverageAccumulator createAccumulator() {
return new AverageAccumulator();
}
public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
a.count += b.count;
a.sum += b.sum;
return a;
}
public void add(Integer value, AverageAccumulator acc) {
acc.sum += value;
acc.count++;
}
public Double getResult(AverageAccumulator acc) {
return acc.sum / (double) acc.count;
}
}
// implementation of a weighted average
// this reuses the same accumulator type as the aggregate function for 'average'
public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> {
public AverageAccumulator createAccumulator() {
return new AverageAccumulator();
}
public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
a.count += b.count;
a.sum += b.sum;
return a;
}
public void add(Datum value, AverageAccumulator acc) {
acc.count += value.getWeight();
acc.sum += value.getValue();
}
public Double getResult(AverageAccumulator acc) {
return acc.sum / (double) acc.count;
}
}
Modifier and Type | Method and Description |
---|---|
ACC |
add(IN value,
ACC accumulator)
Adds the given input value to the given accumulator, returning the
new accumulator value.
|
ACC |
createAccumulator()
Creates a new accumulator, starting a new aggregate.
|
OUT |
getResult(ACC accumulator)
Gets the result of the aggregation from the accumulator.
|
ACC |
merge(ACC a,
ACC b)
Merges two accumulators, returning an accumulator with the merged state.
|
ACC createAccumulator()
The new accumulator is typically meaningless unless a value is added
via add(Object, Object)
.
The accumulator is the state of a running aggregation. When a program has multiple aggregates in progress (such as per key and window), the state (per key and window) is the size of the accumulator.
ACC add(IN value, ACC accumulator)
For efficiency, the input accumulator may be modified and returned.
value
- The value to addaccumulator
- The accumulator to add the value toOUT getResult(ACC accumulator)
accumulator
- The accumulator of the aggregationACC merge(ACC a, ACC b)
This function may reuse any of the given accumulators as the target for the merge and return that. The assumption is that the given accumulators will not be used any more after having been passed to this function.
a
- An accumulator to mergeb
- Another accumulator to mergeCopyright © 2014–2018 The Apache Software Foundation. All rights reserved.