This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.
User-defined Functions #
PyFlink Table API empowers users to do data transformations with Python user-defined functions.
Currently, it supports two kinds of Python user-defined functions: the general Python user-defined functions which process data one row at a time and vectorized Python user-defined functions which process data one batch at a time.
Bundling UDFs #
To run Python UDFs (as well as Pandas UDFs) in any non-local mode, it is strongly recommended
bundling your Python UDF definitions using the config option
if your Python UDFs live outside the file where the
main() function is defined.
Otherwise, you may run into
ModuleNotFoundError: No module named 'my_udf'
if you define Python UDFs in a file called
Loading resources in UDFs #
There are scenarios when you want to load some resources in UDFs first, then running computation
eval) over and over again, without having to re-load the resources.
For example, you may want to load a large deep learning model only once,
then run batch prediction against the model multiple times.
open method of
UserDefinedFunction is exactly what you need.
class Predict(ScalarFunction): def open(self, function_context): import pickle with open("resources.zip/resources/model.pkl", "rb") as f: self.model = pickle.load(f) def eval(self, x): return self.model.predict(x) predict = udf(Predict(), result_type=DataTypes.DOUBLE(), func_type="pandas")
Testing User-Defined Functions #
Suppose you have defined a Python user-defined function as following:
add = udf(lambda i, j: i + j, result_type=DataTypes.BIGINT())
To unit test it, you need to extract the original Python function using
._func and then unit test it:
f = add._func assert f(1, 2) == 3