In most cases, a user-defined function must be registered before it can be used in an query. It is not necessary to register functions for the Scala Table API.
Functions are registered at the TableEnvironment by calling a registerFunction() method. When a user-defined function is registered, it is inserted into the function catalog of the TableEnvironment such that the Table API or SQL parser can recognize and properly translate it.
Please find detailed examples of how to register and how to call each type of user-defined function
(ScalarFunction, TableFunction, and AggregateFunction) in the following sub-sessions.
If a required scalar function is not contained in the built-in functions, it is possible to define custom, user-defined scalar functions for both the Table API and SQL. A user-defined scalar functions maps zero, one, or multiple scalar values to a new scalar value.
In order to define a scalar function, one has to extend the base class ScalarFunction in org.apache.flink.table.functions and implement (one or more) evaluation methods. The behavior of a scalar function is determined by the evaluation method. An evaluation method must be declared publicly and named eval. The parameter types and return type of the evaluation method also determine the parameter and return types of the scalar function. Evaluation methods can also be overloaded by implementing multiple methods named eval. Evaluation methods can also support variable arguments, such as eval(String... strs).
The following example shows how to define your own hash code function, register it in the TableEnvironment, and call it in a query. Note that you can configure your scalar function via a constructor before it is registered:
By default the result type of an evaluation method is determined by Flink’s type extraction facilities. This is sufficient for basic types or simple POJOs but might be wrong for more complex, custom, or composite types. In these cases TypeInformation of the result type can be manually defined by overriding ScalarFunction#getResultType().
The following example shows an advanced example which takes the internal timestamp representation and also returns the internal timestamp representation as a long value. By overriding ScalarFunction#getResultType() we define that the returned long value should be interpreted as a Types.TIMESTAMP by the code generation.
In order to define a scalar function, one has to extend the base class ScalarFunction in org.apache.flink.table.functions and implement (one or more) evaluation methods. The behavior of a scalar function is determined by the evaluation method. An evaluation method must be declared publicly and named eval. The parameter types and return type of the evaluation method also determine the parameter and return types of the scalar function. Evaluation methods can also be overloaded by implementing multiple methods named eval. Evaluation methods can also support variable arguments, such as @varargs def eval(str: String*).
The following example shows how to define your own hash code function, register it in the TableEnvironment, and call it in a query. Note that you can configure your scalar function via a constructor before it is registered:
By default the result type of an evaluation method is determined by Flink’s type extraction facilities. This is sufficient for basic types or simple POJOs but might be wrong for more complex, custom, or composite types. In these cases TypeInformation of the result type can be manually defined by overriding ScalarFunction#getResultType().
The following example shows an advanced example which takes the internal timestamp representation and also returns the internal timestamp representation as a long value. By overriding ScalarFunction#getResultType() we define that the returned long value should be interpreted as a Types.TIMESTAMP by the code generation.
In order to define a Python scalar function, one can extend the base class ScalarFunction in pyflink.table.udf and implement an evaluation method. The behavior of a Python scalar function is determined by the evaluation method which is named eval.
The following example shows how to define your own Python hash code function, register it in the TableEnvironment, and call it in a query. Note that you can configure your scalar function via a constructor before it is registered:
There are many ways to define a Python scalar function besides extending the base class ScalarFunction.
Please refer to the Python Scalar Function documentation for more details.
Similar to a user-defined scalar function, a user-defined table function takes zero, one, or multiple scalar values as input parameters. However in contrast to a scalar function, it can return an arbitrary number of rows as output instead of a single value. The returned rows may consist of one or more columns.
In order to define a table function one has to extend the base class TableFunction in org.apache.flink.table.functions and implement (one or more) evaluation methods. The behavior of a table function is determined by its evaluation methods. An evaluation method must be declared public and named eval. The TableFunction can be overloaded by implementing multiple methods named eval. The parameter types of the evaluation methods determine all valid parameters of the table function. Evaluation methods can also support variable arguments, such as eval(String... strs). The type of the returned table is determined by the generic type of TableFunction. Evaluation methods emit output rows using the protected collect(T) method.
In the Table API, a table function is used with .joinLateral or .leftOuterJoinLateral. The joinLateral operator (cross) joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the operator). The leftOuterJoinLateral operator joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the operator) and preserves outer rows for which the table function returns an empty table. In SQL use LATERAL TABLE(<TableFunction>) with CROSS JOIN and LEFT JOIN with an ON TRUE join condition (see examples below).
The following example shows how to define table-valued function, register it in the TableEnvironment, and call it in a query. Note that you can configure your table function via a constructor before it is registered:
IMPORTANT: Do not implement TableFunction as a Scala object. Scala object is a singleton and will cause concurrency issues.
Please note that POJO types do not have a deterministic field order. Therefore, you cannot rename the fields of POJO returned by a table function using AS.
By default the result type of a TableFunction is determined by Flink’s automatic type extraction facilities. This works well for basic types and simple POJOs but might be wrong for more complex, custom, or composite types. In such a case, the type of the result can be manually specified by overriding TableFunction#getResultType() which returns its TypeInformation.
The following example shows an example of a TableFunction that returns a Row type which requires explicit type information. We define that the returned table type should be RowTypeInfo(String, Integer) by overriding TableFunction#getResultType().
User-Defined Aggregate Functions (UDAGGs) aggregate a table (one or more rows with one or more attributes) to a scalar value.
The above figure shows an example of an aggregation. Assume you have a table that contains data about beverages. The table consists of three columns, id, name and price and 5 rows. Imagine you need to find the highest price of all beverages in the table, i.e., perform a max() aggregation. You would need to check each of the 5 rows and the result would be a single numeric value.
User-defined aggregation functions are implemented by extending the AggregateFunction class. An AggregateFunction works as follows. First, it needs an accumulator, which is the data structure that holds the intermediate result of the aggregation. An empty accumulator is created by calling the createAccumulator() method of the AggregateFunction. Subsequently, the accumulate() method of the function is called for each input row to update the accumulator. Once all rows have been processed, the getValue() method of the function is called to compute and return the final result.
The following methods are mandatory for each AggregateFunction:
createAccumulator()
accumulate()
getValue()
Flink’s type extraction facilities can fail to identify complex data types, e.g., if they are not basic types or simple POJOs. So similar to ScalarFunction and TableFunction, AggregateFunction provides methods to specify the TypeInformation of the result type (through
AggregateFunction#getResultType()) and the type of the accumulator (through AggregateFunction#getAccumulatorType()).
Besides the above methods, there are a few contracted methods that can be
optionally implemented. While some of these methods allow the system more efficient query execution, others are mandatory for certain use cases. For instance, the merge() method is mandatory if the aggregation function should be applied in the context of a session group window (the accumulators of two session windows need to be joined when a row is observed that “connects” them).
The following methods of AggregateFunction are required depending on the use case:
retract() is required for aggregations on bounded OVER windows.
merge() is required for many batch aggregations and session window aggregations.
resetAccumulator() is required for many batch aggregations.
All methods of AggregateFunction must be declared as public, not static and named exactly as the names mentioned above. The methods createAccumulator, getValue, getResultType, and getAccumulatorType are defined in the AggregateFunction abstract class, while others are contracted methods. In order to define a aggregate function, one has to extend the base class org.apache.flink.table.functions.AggregateFunction and implement one (or more) accumulate methods. The method accumulate can be overloaded with different parameter types and supports variable arguments.
Detailed documentation for all methods of AggregateFunction is given below.
The following example shows how to
define an AggregateFunction that calculates the weighted average on a given column,
register the function in the TableEnvironment, and
use the function in a query.
To calculate an weighted average value, the accumulator needs to store the weighted sum and count of all the data that has been accumulated. In our example we define a class WeightedAvgAccum to be the accumulator. Accumulators are automatically backup-ed by Flink’s checkpointing mechanism and restored in case of a failure to ensure exactly-once semantics.
The accumulate() method of our WeightedAvgAggregateFunction has three inputs. The first one is the WeightedAvgAccum accumulator, the other two are user-defined inputs: input value ivalue and weight of the input iweight. Although the retract(), merge(), and resetAccumulator() methods are not mandatory for most aggregation types, we provide them below as examples. Please note that we used Java primitive types and defined getResultType() and getAccumulatorType() methods in the Scala example because Flink type extraction does not work very well for Scala types.
User-Defined Table Aggregate Functions (UDTAGGs) aggregate a table (one or more rows with one or more attributes) to a result table with multi rows and columns.
The above figure shows an example of a table aggregation. Assume you have a table that contains data about beverages. The table consists of three columns, id, name and price and 5 rows. Imagine you need to find the top 2 highest prices of all beverages in the table, i.e., perform a top2() table aggregation. You would need to check each of the 5 rows and the result would be a table with the top 2 values.
User-defined table aggregation functions are implemented by extending the TableAggregateFunction class. A TableAggregateFunction works as follows. First, it needs an accumulator, which is the data structure that holds the intermediate result of the aggregation. An empty accumulator is created by calling the createAccumulator() method of the TableAggregateFunction. Subsequently, the accumulate() method of the function is called for each input row to update the accumulator. Once all rows have been processed, the emitValue() method of the function is called to compute and return the final results.
The following methods are mandatory for each TableAggregateFunction:
createAccumulator()
accumulate()
Flink’s type extraction facilities can fail to identify complex data types, e.g., if they are not basic types or simple POJOs. So similar to ScalarFunction and TableFunction, TableAggregateFunction provides methods to specify the TypeInformation of the result type (through
TableAggregateFunction#getResultType()) and the type of the accumulator (through TableAggregateFunction#getAccumulatorType()).
Besides the above methods, there are a few contracted methods that can be
optionally implemented. While some of these methods allow the system more efficient query execution, others are mandatory for certain use cases. For instance, the merge() method is mandatory if the aggregation function should be applied in the context of a session group window (the accumulators of two session windows need to be joined when a row is observed that “connects” them).
The following methods of TableAggregateFunction are required depending on the use case:
retract() is required for aggregations on bounded OVER windows.
merge() is required for many batch aggregations and session window aggregations.
resetAccumulator() is required for many batch aggregations.
emitValue() is required for batch and window aggregations.
The following methods of TableAggregateFunction are used to improve the performance of streaming jobs:
emitUpdateWithRetract() is used to emit values that have been updated under retract mode.
For emitValue method, it emits full data according to the accumulator. Take TopN as an example, emitValue emit all top n values each time. This may bring performance problems for streaming jobs. To improve the performance, a user can also implement emitUpdateWithRetract method to improve the performance. The method outputs data incrementally in retract mode, i.e., once there is an update, we have to retract old records before sending new updated ones. The method will be used in preference to the emitValue method if they are all defined in the table aggregate function, because emitUpdateWithRetract is treated to be more efficient than emitValue as it can output values incrementally.
All methods of TableAggregateFunction must be declared as public, not static and named exactly as the names mentioned above. The methods createAccumulator, getResultType, and getAccumulatorType are defined in the parent abstract class of TableAggregateFunction, while others are contracted methods. In order to define a table aggregate function, one has to extend the base class org.apache.flink.table.functions.TableAggregateFunction and implement one (or more) accumulate methods. The method accumulate can be overloaded with different parameter types and supports variable arguments.
Detailed documentation for all methods of TableAggregateFunction is given below.
The following example shows how to
define a TableAggregateFunction that calculates the top 2 values on a given column,
register the function in the TableEnvironment, and
use the function in a Table API query(TableAggregateFunction is only supported by Table API).
To calculate the top 2 values, the accumulator needs to store the biggest 2 values of all the data that has been accumulated. In our example we define a class Top2Accum to be the accumulator. Accumulators are automatically backup-ed by Flink’s checkpointing mechanism and restored in case of a failure to ensure exactly-once semantics.
The accumulate() method of our Top2TableAggregateFunction has two inputs. The first one is the Top2Accum accumulator, the other one is the user-defined input: input value v. Although the merge() method is not mandatory for most table aggregation types, we provide it below as examples. Please note that we used Java primitive types and defined getResultType() and getAccumulatorType() methods in the Scala example because Flink type extraction does not work very well for Scala types.
The following example shows how to use emitUpdateWithRetract method to emit only updates. To emit only updates, in our example, the accumulator keeps both old and new top 2 values. Note: if the N of topN is big, it may inefficient to keep both old and new values. One way to solve this case is to store the input record into the accumulator in accumulate method and then perform calculation in emitUpdateWithRetract.
The Table API and SQL code generation internally tries to work with primitive values as much as possible. A user-defined function can introduce much overhead through object creation, casting, and (un)boxing. Therefore, it is highly recommended to declare parameters and result types as primitive types instead of their boxed classes. Types.DATE and Types.TIME can also be represented as int. Types.TIMESTAMP can be represented as long.
We recommended that user-defined functions should be written by Java instead of Scala as Scala types pose a challenge for Flink’s type extractor.
Sometimes it might be necessary for a user-defined function to get global runtime information or do some setup/clean-up work before the actual work. User-defined functions provide open() and close() methods that can be overridden and provide similar functionality as the methods in RichFunction of DataSet or DataStream API.
The open() method is called once before the evaluation method. The close() method after the last call to the evaluation method.
The open() method provides a FunctionContext that contains information about the context in which user-defined functions are executed, such as the metric group, the distributed cache files, or the global job parameters.
The following information can be obtained by calling the corresponding methods of FunctionContext:
Method
Description
getMetricGroup()
Metric group for this parallel subtask.
getCachedFile(name)
Local temporary file copy of a distributed cache file.
getJobParameter(name, defaultValue)
Global job parameter value associated with given key.
The following example snippet shows how to use FunctionContext in a scalar function for accessing a global job parameter: