Class TableAggregateFunction<T,​ACC>

  • Type Parameters:
    T - final result type of the aggregation
    ACC - intermediate result type during the aggregation
    All Implemented Interfaces:
    Serializable, FunctionDefinition
    Direct Known Subclasses:
    PythonTableAggregateFunction

    @PublicEvolving
    public abstract class TableAggregateFunction<T,​ACC>
    extends ImperativeAggregateFunction<T,​ACC>
    Base class for a user-defined table aggregate function. A user-defined table aggregate function maps scalar values of multiple rows to zero, one, or multiple rows (or structured types). If an output record consists of only one field, the structured record can be omitted, and a scalar value can be emitted that will be implicitly wrapped into a row by the runtime.

    Similar to an AggregateFunction, the behavior of a TableAggregateFunction is centered around the concept of an accumulator. The accumulator is an intermediate data structure that stores the aggregated values until a final aggregation result is computed.

    For each set of rows that needs to be aggregated, the runtime will create an empty accumulator by calling ImperativeAggregateFunction.createAccumulator(). Subsequently, the accumulate() method of the function is called for each input row to update the accumulator. Once all rows have been processed, the emitValue() or emitUpdateWithRetract() method of the function is called to compute and return the final result.

    The main behavior of an TableAggregateFunction can be defined by implementing a custom accumulate method. An accumulate method must be declared publicly, not static, and named accumulate. Accumulate methods can also be overloaded by implementing multiple methods named accumulate.

    By default, input, accumulator, and output data types are automatically extracted using reflection. This includes the generic argument ACC of the class for determining an accumulator data type and the generic argument T for determining an accumulator data type. Input arguments are derived from one or more accumulate() methods. If the reflective information is not sufficient, it can be supported and enriched with DataTypeHint and FunctionHint annotations.

    A TableAggregateFunction needs at least three methods:

    • createAccumulator
    • accumulate
    • emitValue or emitUpdateWithRetract

    There are a few other methods that are optional:

    • retract
    • merge

    All these methods must be declared publicly, not static, and named exactly as the names mentioned above to be called by generated code.

    For storing a user-defined function in a catalog, the class must have a default constructor and must be instantiable during runtime. Anonymous functions in Table API can only be persisted if the function is not stateful (i.e. containing only transient and static fields).

    
     Processes the input values and updates the provided accumulator instance. The method
     accumulate can be overloaded with different custom types and arguments. A table aggregate function
     requires at least one accumulate() method.
    
     param: accumulator           the accumulator which contains the current aggregated results
     param: [user defined inputs] the input value (usually obtained from new arrived data).
    
     public void accumulate(ACC accumulator, [user defined inputs])
     
    
     Retracts the input values from the accumulator instance. The current design assumes the
     inputs are the values that have been previously accumulated. The method retract can be
     overloaded with different custom types and arguments. This method must be implemented for
     bounded OVER aggregates over unbounded tables.
    
     param: accumulator           the accumulator which contains the current aggregated results
     param: [user defined inputs] the input value (usually obtained from a new arrived data).
    
     public void retract(ACC accumulator, [user defined inputs])
     
    
     Merges a group of accumulator instances into one accumulator instance. This method must be
     implemented for unbounded session and hop window grouping aggregates and
     bounded grouping aggregates.
    
     param: accumulator the accumulator which will keep the merged aggregate results. It should
                        be noted that the accumulator may contain the previous aggregated
                        results. Therefore user should not replace or clean this instance in the
                        custom merge method.
     param: iterable    an java.lang.Iterable pointed to a group of accumulators that will be
                        merged.
    
     public void merge(ACC accumulator, java.lang.Iterable<ACC> iterable)
     
    
     Called every time when an aggregation result should be materialized. The returned value could
     be either an early and incomplete result (periodically emitted as data arrives) or the final
     result of the aggregation.
    
     param: accumulator           the accumulator which contains the current aggregated results
     param: out                   the collector used to output data.
    
     public void emitValue(ACC accumulator, org.apache.flink.util.Collector<T> out)
     
    
     Called every time when an aggregation result should be materialized. The returned value could
     be either an early and incomplete result (periodically emitted as data arrives) or the final
     result of the aggregation.
    
     Compared to emitValue(), emitUpdateWithRetract() is used to emit values that have been updated. This method
     outputs data incrementally in retraction mode (also known as "update before" and "update after"). Once
     there is an update, we have to retract old records before sending new updated ones. The emitUpdateWithRetract()
     method will be used in preference to the emitValue() method if both methods are defined in the table aggregate
     function, because the method is treated to be more efficient than emitValue as it can output
     values incrementally.
    
     param: accumulator           the accumulator which contains the current aggregated results
     param: out                   the retractable collector used to output data. Use the collect() method
                                  to output(add) records and use retract method to retract(delete)
                                  records.
    
     public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> out)
     

    If an accumulator needs to store large amounts of data, ListView and MapView provide advanced features for leveraging Flink's state backends in unbounded data scenarios.

    See Also:
    Serialized Form
    • Constructor Detail

      • TableAggregateFunction

        public TableAggregateFunction()
    • Method Detail

      • getKind

        public final FunctionKind getKind()
        Description copied from interface: FunctionDefinition
        Returns the kind of function this definition describes.
      • getTypeInference

        public TypeInference getTypeInference​(DataTypeFactory typeFactory)
        Description copied from class: UserDefinedFunction
        Returns the logic for performing type inference of a call to this function definition.

        The type inference process is responsible for inferring unknown types of input arguments, validating input arguments, and producing result types. The type inference process happens independent of a function body. The output of the type inference is used to search for a corresponding runtime implementation.

        Instances of type inference can be created by using TypeInference.newBuilder().

        See BuiltInFunctionDefinitions for concrete usage examples.

        The type inference for user-defined functions is automatically extracted using reflection. It does this by analyzing implementation methods such as eval() or accumulate() and the generic parameters of a function class if present. If the reflective information is not sufficient, it can be supported and enriched with DataTypeHint and FunctionHint annotations.

        Note: Overriding this method is only recommended for advanced users. If a custom type inference is specified, it is the responsibility of the implementer to make sure that the output of the type inference process matches with the implementation method:

        The implementation method must comply with each DataType.getConversionClass() returned by the type inference. For example, if DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class) is an expected argument type, the method must accept a call eval(java.sql.Timestamp).

        Regular Java calling semantics (including type widening and autoboxing) are applied when calling an implementation method which means that the signature can be eval(java.lang.Object).

        The runtime will take care of converting the data to the data format specified by the DataType.getConversionClass() coming from the type inference logic.

        Specified by:
        getTypeInference in interface FunctionDefinition
        Specified by:
        getTypeInference in class UserDefinedFunction