This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.
User-defined Functions #
PyFlink Table API empowers users to do data transformations with Python user-defined functions.
Currently, it supports two kinds of Python user-defined functions: the general Python user-defined functions which process data one row at a time and vectorized Python user-defined functions which process data one batch at a time.
Bundling UDFs #
To run Python UDFs (as well as Pandas UDFs) in any non-local mode, it is strongly recommended
bundling your Python UDF definitions using the config option python-files
,
if your Python UDFs live outside the file where the main()
function is defined.
Otherwise, you may run into ModuleNotFoundError: No module named 'my_udf'
if you define Python UDFs in a file called my_udf.py
.
Loading resources in UDFs #
There are scenarios when you want to load some resources in UDFs first, then running computation
(i.e., eval
) over and over again, without having to re-load the resources.
For example, you may want to load a large deep learning model only once,
then run batch prediction against the model multiple times.
Overriding the open
method of UserDefinedFunction
is exactly what you need.
class Predict(ScalarFunction):
def open(self, function_context):
import pickle
with open("resources.zip/resources/model.pkl", "rb") as f:
self.model = pickle.load(f)
def eval(self, x):
return self.model.predict(x)
predict = udf(Predict(), result_type=DataTypes.DOUBLE(), func_type="pandas")
测试自定义函数 #
假如你定义了如下 Python 自定义函数:
add = udf(lambda i, j: i + j, result_type=DataTypes.BIGINT())
如果要对它进行单元测试,首先需要通过 ._func
从 UDF 对象中抽取原来的 Python 函数,然后才能测试:
f = add._func
assert f(1, 2) == 3