- 应用开发
- Python API
- Table API用户指南
- 连接器
连接器
本文档是 Apache Flink 的旧版本。建议访问 最新的稳定版本。
本篇描述了如何在 PyFlink 中使用连接器,并着重介绍了在 Python 程序中使用 Flink 连接器时需要注意的细节。
Note 想要了解常见的连接器信息和通用配置,请查阅相关的 Java/Scala 文档。
由于 Flink 是一个基于 Java/Scala 的项目,连接器(connector)和格式(format)的实现是作为 jar 包存在的,要在 PyFlink 作业中使用,首先需要将其指定为作业的 依赖。
如何使用连接器
在 PyFink Table API 中,DDL 是定义 source 和 sink 比较推荐的方式,这可以通过 TableEnvironment
中的 execute_sql()
方法来完成,然后就可以在作业中使用这张表了。
下面是如何在 PyFlink 中使用 Kafka source/sink 和 JSON 格式的完整示例。
内置的 Sources 和 Sinks
有些 source 和 sink 被内置在 Flink 中,可以直接使用。这些内置的 source 包括将 Pandas DataFrame 作为数据源,或者将一个元素集合作为数据源。内置的 sink 包括将数据转换为 Pandas DataFrame 等。
和 Pandas 之间互转
PyFlink 表支持与 Pandas DataFrame 之间互相转换。
from_elements()
from_elements()
用于从一个元素集合中创建一张表。元素类型必须是可支持的原子类型或者复杂类型。
以上查询返回的表如下:
用户自定义的 source 和 sink
在某些情况下,你可能想要自定义 source 或 sink。目前,source 和 sink 必须使用 Java/Scala 实现,你可以定义一个 TableFactory
,然后通过 DDL 在 PyFlink 作业中来使用它们。更多详情,可查阅 Java/Scala 文档。