Class AggregateFunction<T,ACC>
- java.lang.Object
-
- org.apache.flink.table.functions.UserDefinedFunction
-
- org.apache.flink.table.functions.ImperativeAggregateFunction<T,ACC>
-
- org.apache.flink.table.functions.AggregateFunction<T,ACC>
-
- Type Parameters:
T
- final result type of the aggregationACC
- intermediate result type during the aggregation
- All Implemented Interfaces:
Serializable
,FunctionDefinition
- Direct Known Subclasses:
BuiltInAggregateFunction
,CountAggFunction
,HiveGenericUDAF
,LastDatedValueFunction
,PythonAggregateFunction
@PublicEvolving public abstract class AggregateFunction<T,ACC> extends ImperativeAggregateFunction<T,ACC>
Base class for a user-defined aggregate function. A user-defined aggregate function maps scalar values of multiple rows to a new scalar value.The behavior of an
AggregateFunction
is centered around the concept of an accumulator. The accumulator is an intermediate data structure that stores the aggregated values until a final aggregation result is computed.For each set of rows that needs to be aggregated, the runtime will create an empty accumulator by calling
ImperativeAggregateFunction.createAccumulator()
. Subsequently, theaccumulate()
method of the function is called for each input row to update the accumulator. Once all rows have been processed, thegetValue(Object)
method of the function is called to compute and return the final result.The main behavior of an
AggregateFunction
can be defined by implementing a custom accumulate method. An accumulate method must be declared publicly, not static, and namedaccumulate
. Accumulate methods can also be overloaded by implementing multiple methods namedaccumulate
.By default, input, accumulator, and output data types are automatically extracted using reflection. This includes the generic argument
ACC
of the class for determining an accumulator data type and the generic argumentT
for determining an accumulator data type. Input arguments are derived from one or moreaccumulate()
methods. If the reflective information is not sufficient, it can be supported and enriched withDataTypeHint
andFunctionHint
annotations.An
AggregateFunction
needs at least three methods:createAccumulator
accumulate
getValue
There are a few other methods that are optional:
retract
merge
All these methods must be declared publicly, not static, and named exactly as the names mentioned above to be called by generated code.
For storing a user-defined function in a catalog, the class must have a default constructor and must be instantiable during runtime. Anonymous functions in Table API can only be persisted if the function is not stateful (i.e. containing only transient and static fields).
Processes the input values and updates the provided accumulator instance. The method accumulate can be overloaded with different custom types and arguments. An aggregate function requires at least one accumulate() method. param: accumulator the accumulator which contains the current aggregated results param: [user defined inputs] the input value (usually obtained from new arrived data). public void accumulate(ACC accumulator, [user defined inputs])
Retracts the input values from the accumulator instance. The current design assumes the inputs are the values that have been previously accumulated. The method retract can be overloaded with different custom types and arguments. This method must be implemented for bounded OVER aggregates over unbounded tables. param: accumulator the accumulator which contains the current aggregated results param: [user defined inputs] the input value (usually obtained from new arrived data). public void retract(ACC accumulator, [user defined inputs])
Merges a group of accumulator instances into one accumulator instance. This method must be implemented for unbounded session window and hop window grouping aggregates and bounded grouping aggregates. Besides, implementing this method will be helpful for optimizations. For example, two phase aggregation optimization requires all the {@link AggregateFunction}s support "merge" method. param: accumulator the accumulator which will keep the merged aggregate results. It should be noted that the accumulator may contain the previous aggregated results. Therefore user should not replace or clean this instance in the custom merge method. param: iterable an java.lang.Iterable pointed to a group of accumulators that will be merged. public void merge(ACC accumulator, java.lang.Iterable<ACC> iterable)
If this aggregate function can only be applied in an OVER window, this can be declared by returning the requirement
FunctionRequirement.OVER_WINDOW_ONLY
inFunctionDefinition.getRequirements()
.If an accumulator needs to store large amounts of data,
ListView
andMapView
provide advanced features for leveraging Flink's state backends in unbounded data scenarios.The following examples show how to specify an aggregate function:
// a function that counts STRING arguments that are not null and emits them as STRING // the accumulator is BIGINT public static class CountFunction extends AggregateFunction<String, CountFunction.MyAccumulator> { public static class MyAccumulator { public long count = 0L; } {@literal @}Override public MyAccumulator createAccumulator() { return new MyAccumulator(); } public void accumulate(MyAccumulator accumulator, Integer i) { if (i != null) { accumulator.count += i; } } {@literal @}Override public String getValue(MyAccumulator accumulator) { return "Result: " + accumulator.count; } } // a function that determines the maximum of either BIGINT or STRING arguments // the accumulator and the output is either BIGINT or STRING public static class MaxFunction extends AggregateFunction<Object, Row> { {@literal @}Override public Row createAccumulator() { return new Row(1); } {@literal @}FunctionHint( accumulator = {@literal @}DataTypeHint("ROW<max BIGINT>"), output = {@literal @}DataTypeHint("BIGINT") ) public void accumulate(Row accumulator, Long l) { final Long max = (Long) accumulator.getField(0); if (max == null || l > max) { accumulator.setField(0, l); } } {@literal @}FunctionHint( accumulator = {@literal @}DataTypeHint("ROW<max STRING>"), output = {@literal @}DataTypeHint("STRING") ) public void accumulate(Row accumulator, String s) { final String max = (String) accumulator.getField(0); if (max == null || s.compareTo(max) > 0) { accumulator.setField(0, s); } } {@literal @}Override public Object getValue(Row accumulator) { return accumulator.getField(0); } }
- See Also:
- Serialized Form
-
-
Constructor Summary
Constructors Constructor Description AggregateFunction()
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description FunctionKind
getKind()
Returns the kind of function this definition describes.TypeInference
getTypeInference(DataTypeFactory typeFactory)
Returns the logic for performing type inference of a call to this function definition.abstract T
getValue(ACC accumulator)
Called every time when an aggregation result should be materialized.-
Methods inherited from class org.apache.flink.table.functions.ImperativeAggregateFunction
createAccumulator, getAccumulatorType, getResultType
-
Methods inherited from class org.apache.flink.table.functions.UserDefinedFunction
close, functionIdentifier, open, toString
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
-
Methods inherited from interface org.apache.flink.table.functions.FunctionDefinition
getRequirements, isDeterministic, supportsConstantFolding
-
-
-
-
Method Detail
-
getValue
public abstract T getValue(ACC accumulator)
Called every time when an aggregation result should be materialized. The returned value could be either an early and incomplete result (periodically emitted as data arrives) or the final result of the aggregation.- Parameters:
accumulator
- the accumulator which contains the current intermediate results- Returns:
- the aggregation result
-
getKind
public final FunctionKind getKind()
Description copied from interface:FunctionDefinition
Returns the kind of function this definition describes.
-
getTypeInference
public TypeInference getTypeInference(DataTypeFactory typeFactory)
Description copied from class:UserDefinedFunction
Returns the logic for performing type inference of a call to this function definition.The type inference process is responsible for inferring unknown types of input arguments, validating input arguments, and producing result types. The type inference process happens independent of a function body. The output of the type inference is used to search for a corresponding runtime implementation.
Instances of type inference can be created by using
TypeInference.newBuilder()
.See
BuiltInFunctionDefinitions
for concrete usage examples.The type inference for user-defined functions is automatically extracted using reflection. It does this by analyzing implementation methods such as
eval() or accumulate()
and the generic parameters of a function class if present. If the reflective information is not sufficient, it can be supported and enriched withDataTypeHint
andFunctionHint
annotations.Note: Overriding this method is only recommended for advanced users. If a custom type inference is specified, it is the responsibility of the implementer to make sure that the output of the type inference process matches with the implementation method:
The implementation method must comply with each
DataType.getConversionClass()
returned by the type inference. For example, ifDataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class)
is an expected argument type, the method must accept a calleval(java.sql.Timestamp)
.Regular Java calling semantics (including type widening and autoboxing) are applied when calling an implementation method which means that the signature can be
eval(java.lang.Object)
.The runtime will take care of converting the data to the data format specified by the
DataType.getConversionClass()
coming from the type inference logic.- Specified by:
getTypeInference
in interfaceFunctionDefinition
- Specified by:
getTypeInference
in classUserDefinedFunction
-
-