连接器

连接器 #

本篇描述了如何在 PyFlink 中使用连接器,并着重介绍了在 Python 程序中使用 Flink 连接器时需要注意的细节。

Note 想要了解常见的连接器信息和通用配置,请查阅相关的 Java/Scala 文档

下载连接器(connector)和格式(format)jar 包 #

由于 Flink 是一个基于 Java/Scala 的项目,连接器(connector)和格式(format)的实现是作为 jar 包存在的, 要在 PyFlink 作业中使用,首先需要将其指定为作业的 依赖

table_env.get_config().set("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")

如何使用连接器 #

在 PyFlink Table API 中,DDL 是定义 source 和 sink 比较推荐的方式,这可以通过 TableEnvironment 中的 execute_sql() 方法来完成,然后就可以在作业中使用这张表了。

source_ddl = """
        CREATE TABLE source_table(
            a VARCHAR,
            b INT
        ) WITH (
          'connector' =' = 'kafka',
          'topic' = 'source_topic',
          'properties.bootstrap.servers' = 'kafka:9092',
          'properties.group.id' = 'test_3',
          'scan.startup.mode' = 'latest-offset',
          'format' = 'json'
        )
        """

sink_ddl = """
        CREATE TABLE sink_table(
            a VARCHAR
        ) WITH (
          'connector' = 'kafka',
          'topic' = 'sink_topic',
          'properties.bootstrap.servers' = 'kafka:9092',
          'format' = 'json'
        )
        """

t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)

t_env.sql_query("SELECT a FROM source_table") \
    .execute_insert("sink_table").wait()

下面是如何在 PyFlink 中使用 Kafka source/sink 和 JSON 格式的完整示例。

from pyflink.table import TableEnvironment, EnvironmentSettings

def log_processing():
    env_settings = EnvironmentSettings.in_streaming_mode()
    t_env = TableEnvironment.create(env_settings)
    # specify connector and format jars
    t_env.get_config().set("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")
    
    source_ddl = """
            CREATE TABLE source_table(
                a VARCHAR,
                b INT
            ) WITH (
              'connector' = 'kafka',
              'topic' = 'source_topic',
              'properties.bootstrap.servers' = 'kafka:9092',
              'properties.group.id' = 'test_3',
              'scan.startup.mode' = 'latest-offset',
              'format' = 'json'
            )
            """

    sink_ddl = """
            CREATE TABLE sink_table(
                a VARCHAR
            ) WITH (
              'connector' = 'kafka',
              'topic' = 'sink_topic',
              'properties.bootstrap.servers' = 'kafka:9092',
              'format' = 'json'
            )
            """

    t_env.execute_sql(source_ddl)
    t_env.execute_sql(sink_ddl)

    t_env.sql_query("SELECT a FROM source_table") \
        .execute_insert("sink_table").wait()


if __name__ == '__main__':
    log_processing()

内置的 Sources 和 Sinks #

有些 source 和 sink 被内置在 Flink 中,可以直接使用。这些内置的 source 包括将 Pandas DataFrame 作为数据源, 或者将一个元素集合作为数据源。内置的 sink 包括将数据转换为 Pandas DataFrame 等。

和 Pandas 之间互转 #

PyFlink 表支持与 Pandas DataFrame 之间互相转换。

from pyflink.table.expressions import col

import pandas as pd
import numpy as np

# 创建一个 PyFlink 表
pdf = pd.DataFrame(np.random.rand(1000, 2))
table = t_env.from_pandas(pdf, ["a", "b"]).filter(col('a') > 0.5)

# 将 PyFlink 表转换成 Pandas DataFrame
pdf = table.to_pandas()

from_elements() #

from_elements() 用于从一个元素集合中创建一张表。元素类型必须是可支持的原子类型或者复杂类型。

from pyflink.table import DataTypes

table_env.from_elements([(1, 'Hi'), (2, 'Hello')])

# 使用第二个参数指定自定义字段名
table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['a', 'b'])

# 使用第二个参数指定自定义表结构
table_env.from_elements([(1, 'Hi'), (2, 'Hello')],
                        DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()),
                                       DataTypes.FIELD("b", DataTypes.STRING())]))

以上查询返回的表如下:

+----+-------+
| a  |   b   |
+====+=======+
| 1  |  Hi   |
+----+-------+
| 2  | Hello |
+----+-------+

用户自定义的 source 和 sink #

在某些情况下,你可能想要自定义 source 或 sink。目前,source 和 sink 必须使用 Java/Scala 实现,你可以定义一个 TableFactory , 然后通过 DDL 在 PyFlink 作业中来使用它们。更多详情,可查阅 Java/Scala 文档