Kafka

Kafka Pipeline 连接器 #

Kafka Pipeline 连接器可以用作 Pipeline 的 Data Sink,将数据写入Kafka。 本文档介绍如何设置 Kafka Pipeline 连接器。

连接器的功能 #

  • 自动建表
  • 表结构变更同步
  • 数据实时同步

如何创建 Pipeline #

从 MySQL 读取数据同步到 Kafka 的 Pipeline 可以定义如下:

source:
  type: mysql
  name: MySQL Source
  hostname: 127.0.0.1
  port: 3306
  username: admin
  password: pass
  tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
  server-id: 5401-5404

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: PLAINTEXT://localhost:62510

pipeline:
  name: MySQL to Kafka Pipeline
  parallelism: 2

Pipeline 连接器配置项 #

Option Required Default Type Description
type required (none) String 指定要使用的连接器, 这里需要设置成 'kafka'
name optional (none) String Sink 的名称。
partition.strategy optional (none) String 定义发送数据到 Kafka 分区的策略, 可以设置的选项有 `all-to-zero`(将所有数据发送到 0 号分区) 以及 `hash-by-key`(所有数据根据主键的哈希值分发),默认值为 `all-to-zero`。
key.format optional (none) String 用于序列化 Kafka 消息的键部分数据的格式。可以设置的选项有 `csv` 以及 `json`, 默认值为 `json`。
value.format optional (none) String 用于序列化 Kafka 消息的值部分数据的格式。可选的填写值包括 debezium-jsoncanal-json, 默认值为 `debezium-json`,并且目前不支持用户自定义输出格式。
properties.bootstrap.servers required (none) String 用于建立与 Kafka 集群初始连接的主机/端口对列表。
topic optional (none) String 如果配置了这个参数,所有的消息都会发送到这一个主题。
sink.add-tableId-to-header-enabled optional (none) Boolean 如果配置了这个参数,所有的消息都会带上键为 `namespace`, 'schemaName', 'tableName',值为事件 TableId 里对应的 字符串的 header。
properties.* optional (none) String 将 Kafka 支持的参数传递给 pipeline,参考 Kafka consume options
sink.custom-header optional (none) String Kafka 记录自定义的 Header。每个 Header 使用 ','分割, 键值使用 ':' 分割。举例来说,可以使用这种方式 'key1:value1,key2:value2'。

使用说明 #

  • 写入 Kafka 的 topic 默认会是上游表 namespace.schemaName.tableName 对应的字符串,可以通过 pipeline 的 route 功能进行修改。
  • 如果配置了 topic 参数,所有的消息都会发送到这一个主题。
  • 写入 Kafka 的 topic 如果不存在,则会默认创建。

输出格式 #

对于不同的内置 value.format 选项,输出的格式也是不同的:

debezium-json #

参考 Debezium docs, debezium-json 格式会包含 before,after,op,source 几个元素, 但是 ts_ms 字段并不会包含在 source 元素中。
一个输出的示例是:

{
  "before": null,
  "after": {
    "col1": "1",
    "col2": "1"
  },
  "op": "c",
  "source": {
    "db": "default_namespace",
    "table": "table1"
  }
}

canal-json #

参考 Canal | Apache Flink, canal-json 格式会包含 old,data,type,database,table,pkNames 几个元素, 但是 ts 并不会包含在其中。
一个输出的示例是:

{
    "old": null,
    "data": [
        {
            "col1": "1",
            "col2": "1"
        }
    ],
    "type": "INSERT",
    "database": "default_schema",
    "table": "table1",
    "pkNames": [
        "col1"
    ]
}

数据类型映射 #

CDC type JSON type NOTE
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL(p, s) DECIMAL(p, s)
BOOLEAN BOOLEAN
DATE DATE
TIMESTAMP DATETIME
TIMESTAMP_LTZ TIMESTAMP_LTZ
CHAR(n) CHAR(n)
VARCHAR(n) VARCHAR(n)

Back to top