Kafka Pipeline 连接器 #
Kafka Pipeline 连接器可以用作 Pipeline 的 Data Sink,将数据写入Kafka。 本文档介绍如何设置 Kafka Pipeline 连接器。
连接器的功能 #
- 自动建表
- 表结构变更同步
- 数据实时同步
如何创建 Pipeline #
从 MySQL 读取数据同步到 Kafka 的 Pipeline 可以定义如下:
source:
type: mysql
name: MySQL Source
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: PLAINTEXT://localhost:62510
pipeline:
name: MySQL to Kafka Pipeline
parallelism: 2
Pipeline 连接器配置项 #
Option | Required | Default | Type | Description |
---|---|---|---|---|
type | required | (none) | String | 指定要使用的连接器, 这里需要设置成 'kafka' 。 |
name | optional | (none) | String | Sink 的名称。 |
partition.strategy | optional | (none) | String | 定义发送数据到 Kafka 分区的策略, 可以设置的选项有 `all-to-zero`(将所有数据发送到 0 号分区) 以及 `hash-by-key`(所有数据根据主键的哈希值分发),默认值为 `all-to-zero`。 |
key.format | optional | (none) | String | 用于序列化 Kafka 消息的键部分数据的格式。可以设置的选项有 `csv` 以及 `json`, 默认值为 `json`。 |
value.format | optional | (none) | String | 用于序列化 Kafka 消息的值部分数据的格式。可选的填写值包括 debezium-json 和 canal-json, 默认值为 `debezium-json`,并且目前不支持用户自定义输出格式。 |
properties.bootstrap.servers | required | (none) | String | 用于建立与 Kafka 集群初始连接的主机/端口对列表。 |
topic | optional | (none) | String | 如果配置了这个参数,所有的消息都会发送到这一个主题。 |
sink.add-tableId-to-header-enabled | optional | (none) | Boolean | 如果配置了这个参数,所有的消息都会带上键为 `namespace`, 'schemaName', 'tableName',值为事件 TableId 里对应的 字符串的 header。 |
properties.* | optional | (none) | String | 将 Kafka 支持的参数传递给 pipeline,参考 Kafka consume options。 |
sink.custom-header | optional | (none) | String | Kafka 记录自定义的 Header。每个 Header 使用 ','分割, 键值使用 ':' 分割。举例来说,可以使用这种方式 'key1:value1,key2:value2'。 |
使用说明 #
- 写入 Kafka 的 topic 默认会是上游表
namespace.schemaName.tableName
对应的字符串,可以通过 pipeline 的 route 功能进行修改。 - 如果配置了
topic
参数,所有的消息都会发送到这一个主题。 - 写入 Kafka 的 topic 如果不存在,则会默认创建。
输出格式 #
对于不同的内置 value.format
选项,输出的格式也是不同的:
debezium-json #
参考 Debezium docs, debezium-json 格式会包含 before
,after
,op
,source
几个元素, 但是 ts_ms
字段并不会包含在 source
元素中。
一个输出的示例是:
{
"before": null,
"after": {
"col1": "1",
"col2": "1"
},
"op": "c",
"source": {
"db": "default_namespace",
"table": "table1"
}
}
canal-json #
参考 Canal | Apache Flink, canal-json 格式会包含 old
,data
,type
,database
,table
,pkNames
几个元素, 但是 ts
并不会包含在其中。
一个输出的示例是:
{
"old": null,
"data": [
{
"col1": "1",
"col2": "1"
}
],
"type": "INSERT",
"database": "default_schema",
"table": "table1",
"pkNames": [
"col1"
]
}
数据类型映射 #
CDC type | JSON type | NOTE |
---|---|---|
TINYINT | TINYINT | |
SMALLINT | SMALLINT | |
INT | INT | |
BIGINT | BIGINT | |
FLOAT | FLOAT | |
DOUBLE | DOUBLE | |
DECIMAL(p, s) | DECIMAL(p, s) | |
BOOLEAN | BOOLEAN | |
DATE | DATE | |
TIMESTAMP | DATETIME | |
TIMESTAMP_LTZ | TIMESTAMP_LTZ | |
CHAR(n) | CHAR(n) | |
VARCHAR(n) | VARCHAR(n) |