- 应用开发
- Python API
- Table API用户指南
- 自定义函数
- 普通自定义函数(UDF)
普通自定义函数(UDF)
本文档是 Apache Flink 的旧版本。建议访问 最新的稳定版本。
用户自定义函数是重要的功能,因为它们极大地扩展了Python Table API程序的表达能力。
注意: 要执行Python用户自定义函数,客户端和集群端都需要安装Python版本3.5、3.6或3.7,并安装PyFlink。
标量函数(ScalarFunction)
PyFlink支持在Python Table API程序中使用Python标量函数。 如果要定义Python标量函数,
可以继承pyflink.table.udf
中的基类ScalarFunction,并实现eval方法。
Python标量函数的行为由名为eval
的方法定义,eval方法支持可变长参数,例如eval(* args)
。
以下示例显示了如何定义自己的Python哈希函数、如何在TableEnvironment中注册它以及如何在作业中使用它。
注意当前不支持Python worker进程与RocksDB state backend同时使用managed memory。
如果作业中不使用RocksDB state backend的话, 您也可以将配置项python.fn-execution.memory.managed设置为true,
配置Python worker进程使用managed memory。这样的话,就不需要配置taskmanager.memory.task.off-heap.size了。
除此之外,还支持在Python Table API程序中使用Java / Scala标量函数。
注意当前不支持Python worker进程与RocksDB state backend同时使用managed memory。
如果作业中不使用RocksDB state backend的话, 您也可以将配置项python.fn-execution.memory.managed设置为true,
配置Python worker进程使用managed memory。这样的话,就不需要配置taskmanager.memory.task.off-heap.size了。
除了扩展基类ScalarFunction
之外,还支持多种方式来定义Python标量函数。
以下示例显示了多种定义Python标量函数的方式。该函数需要两个类型为bigint的参数作为输入参数,并返回它们的总和作为结果。
表值函数
与Python用户自定义标量函数类似,Python用户自定义表值函数以零个,一个或者多个列作为输入参数。但是,与标量函数不同的是,表值函数可以返回
任意数量的行作为输出而不是单个值。Python用户自定义表值函数的返回类型可以是Iterable,Iterator或generator类型。
以下示例说明了如何定义自己的Python自定义表值函数,将其注册到TableEnvironment中,并在作业中使用它。
注意当前不支持Python worker进程与RocksDB state backend同时使用managed memory。
如果作业中不使用RocksDB state backend的话, 您也可以将配置项python.fn-execution.memory.managed设置为true,
配置Python worker进程使用managed memory。这样的话,就不需要配置taskmanager.memory.task.off-heap.size了。
除此之外,还支持在Python Table API程序中使用Java / Scala表值函数。
注意当前不支持Python worker进程与RocksDB state backend同时使用managed memory。
如果作业中不使用RocksDB state backend的话, 您也可以将配置项python.fn-execution.memory.managed设置为true,
配置Python worker进程使用managed memory。这样的话,就不需要配置taskmanager.memory.task.off-heap.size了。
像Python标量函数一样,您可以使用上述五种方式来定义Python表值函数。
注意 唯一的区别是,Python表值函数的返回类型必须是iterable(可迭代子类), iterator(迭代器) or generator(生成器)。