User-defined Functions #
PyFlink Table API empowers users to do data transformations with Python user-defined functions.
Currently, it supports two kinds of Python user-defined functions: the general Python user-defined functions which process data one row at a time and vectorized Python user-defined functions which process data one batch at a time.
Bundling UDFs #
To run Python UDFs (as well as Pandas UDFs) in any non-local mode, it is strongly recommended
bundling your Python UDF definitions using the config option
if your Python UDFs live outside the file where the
main() function is defined.
Otherwise, you may run into
ModuleNotFoundError: No module named 'my_udf'
if you define Python UDFs in a file called
Loading resources in UDFs #
There are scenarios when you want to load some resources in UDFs first, then running computation
eval) over and over again, without having to re-load the resources.
For example, you may want to load a large deep learning model only once,
then run batch prediction against the model multiple times.
open method of
UserDefinedFunction is exactly what you need.
class Predict(ScalarFunction): def open(self, function_context): import pickle with open("resources.zip/resources/model.pkl", "rb") as f: self.model = pickle.load(f) def eval(self, x): return self.model.predict(x) predict = udf(Predict(), result_type=DataTypes.DOUBLE(), func_type="pandas")
假如你定义了如下 Python 自定义函数：
add = udf(lambda i, j: i + j, result_type=DataTypes.BIGINT())
._func 从 UDF 对象中抽取原来的 Python 函数，然后才能测试：
f = add._func assert f(1, 2) == 3