Vectorized Python user-defined functions are functions which are executed by transferring a batch of elements between JVM and Python VM in Arrow columnar format.
The performance of vectorized Python user-defined functions are usually much higher than non-vectorized Python user-defined functions as the serialization/deserialization
overhead and invocation overhead are much reduced. Besides, users could leverage the popular Python libraries such as Pandas, Numpy, etc for the vectorized Python user-defined functions implementation.
These Python libraries are highly optimized and provide high-performance data structures and functions. It shares the similar way as the
non-vectorized user-defined functions on how to define vectorized user-defined functions.
Users only need to add an extra parameter func_type="pandas"
in the decorator udf
or udaf
to mark it as a vectorized user-defined function.
NOTE: Python UDF execution requires Python version (3.5, 3.6, 3.7 or 3.8) with PyFlink installed. It’s required on both the client side and the cluster side.
Vectorized Python scalar functions take pandas.Series
as the inputs and return a pandas.Series
of the same length as the output.
Internally, Flink will split the input elements into batches, convert a batch of input elements into Pandas.Series
and then call user-defined vectorized Python scalar functions for each batch of input elements. Please refer to the config option
python.fn-execution.arrow.batch.size for more details
on how to configure the batch size.
Vectorized Python scalar function could be used in any places where non-vectorized Python scalar functions could be used.
The following example shows how to define your own vectorized Python scalar function which computes the sum of two columns, and use it in a query:
Vectorized Python aggregate functions takes one or more pandas.Series
as the inputs and return one scalar value as output.
Note The return type does not support RowType
and MapType
for the time being.
Vectorized Python aggregate function could be used in GroupBy Aggregation
(Batch), GroupBy Window Aggregation
(Batch and Stream) and
Over Window Aggregation
(Batch and Stream bounded over window). For more details on the usage of Aggregations, you can refer
to the relevant documentation.
Note Pandas UDAF does not support partial aggregation. Besides, all the data for a group or window will be loaded into memory at the same time during execution and so you must make sure that the data of a group or window could fit into the memory.
Note Pandas UDAF is only supported in Blink Planner.
The following example shows how to define your own vectorized Python aggregate function which computes mean,
and use it in GroupBy Aggregation
, GroupBy Window Aggregation
and Over Window Aggregation
:
There are many ways to define a vectorized Python aggregate functions. The following examples show the different ways to define a vectorized Python aggregate function which takes two columns of bigint as the inputs and returns the sum of the maximum of them as the result.