向量化Python用户自定义函数,是在执行时,通过在JVM和Python VM之间以Arrow列存格式批量传输数据,来执行的函数。
向量化Python用户自定义函数的性能通常比非向量化Python用户自定义函数要高得多,因为向量化Python用户自定义函数可以大大减少序列化/反序列化的开销和调用开销。
此外,用户可以利用流行的Python库(例如Pandas,Numpy等)来实现向量化Python用户自定义函数的逻辑。这些Python库通常经过高度优化,并提供了高性能的数据结构和功能。
向量化用户自定义函数的定义,与非向量化用户自定义函数具有相似的方式,
用户只需要在调用udf
装饰器时添加一个额外的参数udf_type="pandas"
,将其标记为一个向量化用户自定义函数即可。
注意:要执行Python UDF,需要安装PyFlink的Python版本(3.5、3.6或3.7)。客户端和群集端都需要安装它。
向量化Python标量函数以pandas.Series
类型的参数作为输入,并返回与输入长度相同的pandas.Series
。
在内部实现中,Flink会将输入数据拆分为多个批次,并将每一批次的输入数据转换为Pandas.Series
类型,
然后为每一批输入数据调用用户自定义的向量化Python标量函数。请参阅配置选项
python.fn-execution.arrow.batch.size,
以获取有关如何配置批次大小的更多详细信息。
向量化Python标量函数可以在任何可以使用非向量化Python标量函数的地方使用。
以下示例显示了如何定义自己的向量化Python标量函数,该函数计算两列的总和,并在查询中使用它:
@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT(), udf_type="pandas")
def add(i, j):
return i + j
table_env = BatchTableEnvironment.create(env)
# configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory.
table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
# register the vectorized Python scalar function
table_env.register_function("add", add)
# use the vectorized Python scalar function in Python Table API
my_table.select("add(bigint, bigint)")
# 在SQL API中使用Python向量化标量函数
table_env.sql_query("SELECT add(bigint, bigint) FROM MyTable")
注意如果不使用RocksDB作为状态后端,则还可以通过 将python.fn-execution.memory.managed设置为true ,来配置python worker以使用taskmanager的托管内存, 则无需配置taskmanager.memory.task.off-heap.size 。