This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.
User-defined functions (UDFs) are extension points to call frequently used logic or custom logic that cannot be expressed otherwise in queries.
User-defined functions can be implemented in a JVM language (such as Java or Scala) or Python.
An implementer can use arbitrary third party libraries within a UDF.
This page will focus on JVM-based languages, please refer to the PyFlink documentation
for details on writing general
and vectorized UDFs in Python.
Currently, Flink distinguishes between the following kinds of functions:
Scalar functions map scalar values to a new scalar value.
Table functions map scalar values to new rows.
Aggregate functions map scalar values of multiple rows to a new scalar value.
Table aggregate functions map scalar values of multiple rows to new rows.
Async table functions are special functions for table sources that perform a lookup.
The following example shows how to create a simple scalar function and how to call the function in both Table API and SQL.
For SQL queries, a function must always be registered under a name. For Table API, a function can be registered or directly used inline.
For interactive sessions, it is also possible to parameterize functions before using or
registering them. In this case, function instances instead of function classes can be
used as temporary functions.
It requires that the parameters are serializable for shipping
function instances to the cluster.
Independent of the kind of function, all user-defined functions follow some basic implementation principles.
Function Class
An implementation class must extend from one of the available base classes (e.g. org.apache.flink.table.functions.ScalarFunction).
The class must be declared public, not abstract, and should be globally accessible. Thus, non-static inner or anonymous classes are not allowed.
For storing a user-defined function in a persistent catalog, the class must have a default constructor and must be instantiable during runtime.
Evaluation Methods
The base class provides a set of methods that can be overridden such as open(), close(), or isDeterministic().
However, in addition to those declared methods, the main runtime logic that is applied to every incoming record must be implemented through specialized evaluation methods.
Depending on the function kind, evaluation methods such as eval(), accumulate(), or retract() are called by code-generated operators during runtime.
The methods must be declared public and take a well-defined set of arguments.
Regular JVM method calling semantics apply. Therefore, it is possible to:
implement overloaded methods such as eval(Integer) and eval(LocalDateTime),
use var-args such as eval(Integer...),
use object inheritance such as eval(Object) that takes both LocalDateTime and Integer,
and combinations of the above such as eval(Object...) that takes all kinds of arguments.
If you intend to implement functions in Scala, please add the scala.annotation.varargs annotation in
case of variable arguments. Furthermore, it is recommended to use boxed primitives (e.g. java.lang.Integer
instead of Int) to support NULL.
The following snippets shows an example of an overloaded function:
Type Inference
The table ecosystem (similar to the SQL standard) is a strongly typed API. Therefore, both function parameters and return types must be mapped to a data type.
From a logical perspective, the planner needs information about expected types, precision, and scale. From a JVM perspective, the planner needs information about how internal data structures are represented as JVM objects when calling a user-defined function.
The logic for validating input arguments and deriving data types for both the parameters and the result of a function is summarized under the term type inference.
Flink’s user-defined functions implement an automatic type inference extraction that derives data types from the function’s class and its evaluation methods via reflection. If this implicit reflective extraction approach is not successful, the extraction process can be supported by annotating affected parameters, classes, or methods with @DataTypeHint and @FunctionHint. More examples on how to annotate functions are shown below.
If more advanced type inference logic is required, an implementer can explicitly override the getTypeInference() method in every user-defined function. However, the annotation approach is recommended because it keeps custom type inference logic close to the affected locations and falls back to the default behavior for the remaining implementation.
Automatic Type Inference
The automatic type inference inspects the function’s class and evaluation methods to derive data types for the arguments and result of a function. @DataTypeHint and @FunctionHint annotations support the automatic extraction.
In many scenarios, it is required to support the automatic extraction inline for paramaters and return types of a function
The following example shows how to use data type hints. More information can be found in the documentation of the annotation class.
@FunctionHint
In some scenarios, it is desirable that one evaluation method handles multiple different data types at the same time. Furthermore, in some scenarios, overloaded evaluation methods have a common result type that should be declared only once.
The @FunctionHint annotation can provide a mapping from argument data types to a result data type. It enables annotating entire function classes or evaluation methods for input, accumulator, and result data types. One or more annotations can be declared on top of a class or individually for each evaluation method for overloading function signatures. All hint parameters are optional. If a parameter is not defined, the default reflection-based extraction is used. Hint parameters defined on top of a function class are inherited by all evaluation methods.
The following example shows how to use function hints. More information can be found in the documentation of the annotation class.
Custom Type Inference
For most scenarios, @DataTypeHint and @FunctionHint should be sufficient to model user-defined functions. However, by overriding the automatic type inference defined in getTypeInference(), implementers can create arbitrary functions that behave like built-in system functions.
The following example implemented in Java illustrates the potential of a custom type inference logic. It uses a string literal argument to determine the result type of a function. The function takes two string arguments: the first argument represents the string to be parsed, the second argument represents the target type.
Determinism
Every user-defined function class can declare whether it produces deterministic results or not by overriding
the isDeterministic() method. If the function is not purely functional (like random(), date(), or now()),
the method must return false. By default, isDeterministic() returns true.
Furthermore, the isDeterministic() method might also influence the runtime behavior. A runtime
implementation might be called at two different stages:
During planning (i.e. pre-flight phase): If a function is called with constant expressions
or constant expressions can be derived from the given statement, a function is pre-evaluated
for constant expression reduction and might not be executed on the cluster anymore. Unless
isDeterministic() is used to disable constant expression reduction in this case. For example,
the following calls to ABS are executed during planning: SELECT ABS(-1) FROM t and
SELECT ABS(field) FROM t WHERE field = -1; whereas SELECT ABS(field) FROM t is not.
During runtime (i.e. cluster execution): If a function is called with non-constant expressions
or isDeterministic() returns false.
Runtime Integration
Sometimes it might be necessary for a user-defined function to get global runtime information or do some setup/clean-up work before the actual work. User-defined functions provide open() and close() methods that can be overridden and provide similar functionality as the methods in RichFunction of DataStream API.
The open() method is called once before the evaluation method. The close() method after the last call to the evaluation method.
The open() method provides a FunctionContext that contains information about the context in which user-defined functions are executed, such as the metric group, the distributed cache files, or the global job parameters.
The following information can be obtained by calling the corresponding methods of FunctionContext:
Method
Description
getMetricGroup()
Metric group for this parallel subtask.
getCachedFile(name)
Local temporary file copy of a distributed cache file.
getJobParameter(name, defaultValue)
Global job parameter value associated with given key.
getExternalResourceInfos(resourceName)
Returns a set of external resource infos associated with the given key.
Note: Depending on the context in which the function is executed, not all methods from above might be available. For example,
during constant expression reduction adding a metric is a no-op operation.
The following example snippet shows how to use FunctionContext in a scalar function for accessing a global job parameter:
A user-defined scalar function maps zero, one, or multiple scalar values to a new scalar value. Any data type listed in the data types section can be used as a parameter or return type of an evaluation method.
In order to define a scalar function, one has to extend the base class ScalarFunction in org.apache.flink.table.functions and implement one or more evaluation methods named eval(...).
The following example shows how to define your own hash code function and call it in a query. See the Implementation Guide for more details.
If you intend to implement or call functions in Python, please refer to the Python Scalar Functions documentation for more details.
Similar to a user-defined scalar function, a user-defined table function (UDTF) takes zero, one, or multiple scalar values as input arguments. However, it can return an arbitrary number of rows (or structured types) as output instead of a single value. The returned record may consist of one or more fields. If an output record consists of only a single field, the structured record can be omitted, and a scalar value can be emitted that will be implicitly wrapped into a row by the runtime.
In order to define a table function, one has to extend the base class TableFunction in org.apache.flink.table.functions and implement one or more evaluation methods named eval(...). Similar to other functions, input and output data types are automatically extracted using reflection. This includes the generic argument T of the class for determining an output data type. In contrast to scalar functions, the evaluation method itself must not have a return type, instead, table functions provide a collect(T) method that can be called within every evaluation method for emitting zero, one, or more records.
In the Table API, a table function is used with .joinLateral(...) or .leftOuterJoinLateral(...). The joinLateral operator (cross) joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the operator). The leftOuterJoinLateral operator joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the operator) and preserves outer rows for which the table function returns an empty table.
In SQL, use LATERAL TABLE(<TableFunction>) with JOIN or LEFT JOIN with an ON TRUE join condition.
The following example shows how to define your own split function and call it in a query. See the Implementation Guide for more details.
If you intend to implement functions in Scala, do not implement a table function as a Scala object. Scala objects are singletons and will cause concurrency issues.
If you intend to implement or call functions in Python, please refer to the Python Table Functions documentation for more details.
A user-defined aggregate function (UDAGG) maps scalar values of multiple rows to a new scalar value.
The behavior of an aggregate function is centered around the concept of an accumulator. The accumulator
is an intermediate data structure that stores the aggregated values until a final aggregation result
is computed.
For each set of rows that needs to be aggregated, the runtime will create an empty accumulator by calling
createAccumulator(). Subsequently, the accumulate(...) method of the function is called for each input
row to update the accumulator. Once all rows have been processed, the getValue(...) method of the
function is called to compute and return the final result.
The following example illustrates the aggregation process:
In the example, we assume a table that contains data about beverages. The table consists of three columns (id, name,
and price) and 5 rows. We would like to find the highest price of all beverages in the table, i.e., perform
a max() aggregation. We need to consider each of the 5 rows. The result is a single numeric value.
In order to define an aggregate function, one has to extend the base class AggregateFunction in
org.apache.flink.table.functions and implement one or more evaluation methods named accumulate(...).
An accumulate method must be declared publicly and not static. Accumulate methods can also be overloaded
by implementing multiple methods named accumulate.
By default, input, accumulator, and output data types are automatically extracted using reflection. This
includes the generic argument ACC of the class for determining an accumulator data type and the generic
argument T for determining an accumulator data type. Input arguments are derived from one or more
accumulate(...) methods. See the Implementation Guide for more details.
If you intend to implement or call functions in Python, please refer to the Python Functions
documentation for more details.
The following example shows how to define your own aggregate function and call it in a query.
The accumulate(...) method of our WeightedAvg class takes three inputs. The first one is the accumulator
and the other two are user-defined inputs. In order to calculate a weighted average value, the accumulator
needs to store the weighted sum and count of all the data that has been accumulated. In our example, we
define a class WeightedAvgAccumulator to be the accumulator. Accumulators are automatically managed
by Flink’s checkpointing mechanism and are restored in case of a failure to ensure exactly-once semantics.
Mandatory and Optional Methods
The following methods are mandatory for each AggregateFunction:
createAccumulator()
accumulate(...)
getValue(...)
Additionally, there are a few methods that can be optionally implemented. While some of these methods
allow the system more efficient query execution, others are mandatory for certain use cases. For instance,
the merge(...) method is mandatory if the aggregation function should be applied in the context of a
session group window (the accumulators of two session windows need to be joined when a row is observed
that “connects” them).
The following methods of AggregateFunction are required depending on the use case:
retract(...) is required for aggregations on OVER windows.
merge(...) is required for many bounded aggregations and session window and hop window aggregations. Besides, this method is also helpful for optimizations. For example, two phase aggregation optimization requires all the AggregateFunction support merge method.
If the aggregate function can only be applied in an OVER window, this can be declared by returning the
requirement FunctionRequirement.OVER_WINDOW_ONLY in getRequirements().
If an accumulator needs to store large amounts of data, org.apache.flink.table.api.dataview.ListView
and org.apache.flink.table.api.dataview.MapView provide advanced features for leveraging Flink’s state
backends in unbounded data scenarios. Please see the docs of the corresponding classes for more information
about this advanced feature.
Since some of the methods are optional, or can be overloaded, the runtime invokes aggregate function
methods via generated code. This means the base class does not always provide a signature to be overridden
by the concrete implementation. Nevertheless, all mentioned methods must be declared publicly, not static,
and named exactly as the names mentioned above to be called.
Detailed documentation for all methods that are not declared in AggregateFunction and called by generated
code is given below.
accumulate(...)
retract(...)
merge(...)
If you intend to implement or call functions in Python, please refer to the Python Aggregate Functions documentation for more details.
A user-defined table aggregate function (UDTAGG) maps scalar values of multiple rows to zero, one,
or multiple rows (or structured types). The returned record may consist of one or more fields. If an
output record consists of only a single field, the structured record can be omitted, and a scalar value
can be emitted that will be implicitly wrapped into a row by the runtime.
Similar to an aggregate function, the behavior of a table aggregate is centered
around the concept of an accumulator. The accumulator is an intermediate data structure that stores
the aggregated values until a final aggregation result is computed.
For each set of rows that needs to be aggregated, the runtime will create an empty accumulator by calling
createAccumulator(). Subsequently, the accumulate(...) method of the function is called for each
input row to update the accumulator. Once all rows have been processed, the emitValue(...) or emitUpdateWithRetract(...)
method of the function is called to compute and return the final result.
The following example illustrates the aggregation process:
In the example, we assume a table that contains data about beverages. The table consists of three columns (id, name,
and price) and 5 rows. We would like to find the 2 highest prices of all beverages in the table, i.e.,
perform a TOP2() table aggregation. We need to consider each of the 5 rows. The result is a table
with the top 2 values.
In order to define a table aggregate function, one has to extend the base class TableAggregateFunction in
org.apache.flink.table.functions and implement one or more evaluation methods named accumulate(...).
An accumulate method must be declared publicly and not static. Accumulate methods can also be overloaded
by implementing multiple methods named accumulate.
By default, input, accumulator, and output data types are automatically extracted using reflection. This
includes the generic argument ACC of the class for determining an accumulator data type and the generic
argument T for determining an accumulator data type. Input arguments are derived from one or more
accumulate(...) methods. See the Implementation Guide for more details.
If you intend to implement or call functions in Python, please refer to the Python Functions
documentation for more details.
The following example shows how to define your own table aggregate function and call it in a query.
The accumulate(...) method of our Top2 class takes two inputs. The first one is the accumulator
and the second one is the user-defined input. In order to calculate a result, the accumulator needs to
store the 2 highest values of all the data that has been accumulated. Accumulators are automatically managed
by Flink’s checkpointing mechanism and are restored in case of a failure to ensure exactly-once semantics.
The result values are emitted together with a ranking index.
Mandatory and Optional Methods
The following methods are mandatory for each TableAggregateFunction:
createAccumulator()
accumulate(...)
emitValue(...) or emitUpdateWithRetract(...)
Additionally, there are a few methods that can be optionally implemented. While some of these methods
allow the system more efficient query execution, others are mandatory for certain use cases. For instance,
the merge(...) method is mandatory if the aggregation function should be applied in the context of a
session group window (the accumulators of two session windows need to be joined when a row is observed
that “connects” them).
The following methods of TableAggregateFunction are required depending on the use case:
retract(...) is required for aggregations on OVER windows.
merge(...) is required for many bounded aggregations and unbounded session and hop window aggregations.
emitValue(...) is required for bounded and window aggregations.
The following methods of TableAggregateFunction are used to improve the performance of streaming jobs:
emitUpdateWithRetract(...) is used to emit values that have been updated under retract mode.
The emitValue(...) method always emits the full data according to the accumulator. In unbounded scenarios,
this may bring performance problems. Take a Top N function as an example. The emitValue(...) would emit
all N values each time. In order to improve the performance, one can implement emitUpdateWithRetract(...) which
outputs data incrementally in retract mode. In other words, once there is an update, the method can retract
old records before sending new, updated ones. The method will be used in preference to the emitValue(...)
method.
If the table aggregate function can only be applied in an OVER window, this can be declared by returning the
requirement FunctionRequirement.OVER_WINDOW_ONLY in getRequirements().
If an accumulator needs to store large amounts of data, org.apache.flink.table.api.dataview.ListView
and org.apache.flink.table.api.dataview.MapView provide advanced features for leveraging Flink’s state
backends in unbounded data scenarios. Please see the docs of the corresponding classes for more information
about this advanced feature.
Since some of methods are optional or can be overloaded, the methods are called by generated code. The
base class does not always provide a signature to be overridden by the concrete implementation class. Nevertheless,
all mentioned methods must be declared publicly, not static, and named exactly as the names mentioned above
to be called.
Detailed documentation for all methods that are not declared in TableAggregateFunction and called by generated
code is given below.
accumulate(...)
retract(...)
merge(...)
emitValue(...)
emitUpdateWithRetract(...)
Retraction Example
The following example shows how to use the emitUpdateWithRetract(...) method to emit only incremental
updates. In order to do so, the accumulator keeps both the old and new top 2 values.
If the N of Top N is big, it might be inefficient to keep both the old and new values. One way to
solve this case is to store only the input record in the accumulator in accumulate method and then perform
a calculation in emitUpdateWithRetract.