Kafka Pipeline 连接器 #
Kafka Pipeline 连接器可以用作 Pipeline 的 Data Sink,将数据写入Kafka。 本文档介绍如何设置 Kafka Pipeline 连接器。
连接器的功能 #
- 自动建表
- 表结构变更同步
- 数据实时同步
如何创建 Pipeline #
从 MySQL 读取数据同步到 Kafka 的 Pipeline 可以定义如下:
source:
type: mysql
name: MySQL Source
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: PLAINTEXT://localhost:62510
pipeline:
name: MySQL to Kafka Pipeline
parallelism: 2
Pipeline 连接器配置项 #
Option | Required | Default | Type | Description |
---|---|---|---|---|
type | required | (none) | String | 指定要使用的连接器, 这里需要设置成 'kafka' 。 |
name | optional | (none) | String | Sink 的名称。 |
value.format | optional | (none) | String | 用于序列化 Kafka 消息的值部分数据的格式。可选的填写值包括 debezium-json 和 canal-json, 默认值为 `debezium-json`,并且目前不支持用户自定义输出格式。 |
properties.bootstrap.servers | required | (none) | String | 用于建立与 Kafka 集群初始连接的主机/端口对列表。 |
topic | optional | (none) | String | 如果配置了这个参数,所有的消息都会发送到这一个主题。 |
sink.add-tableId-to-header-enabled | optional | (none) | Boolean | 如果配置了这个参数,所有的消息都会带上键为 `namespace`, 'schemaName', 'tableName',值为事件 TableId 里对应的 字符串的 header。 |
properties.* | optional | (none) | String | 将 Kafka 支持的参数传递给 pipeline,参考 Kafka consume options。 |
sink.custom-header | optional | (none) | String | Kafka 记录自定义的 Header。每个 Header 使用 ','分割, 键值使用 ':' 分割。举例来说,可以使用这种方式 'key1:value1,key2:value2'。 |
使用说明 #
- 写入 Kafka 的 topic 默认会是上游表
namespace.schemaName.tableName
对应的字符串,可以通过 pipeline 的 route 功能进行修改。 - 如果配置了
topic
参数,所有的消息都会发送到这一个主题。 - 写入 Kafka 的 topic 如果不存在,则会默认创建。
数据类型映射 #
CDC type | JSON type | NOTE |
---|---|---|
TINYINT | TINYINT | |
SMALLINT | SMALLINT | |
INT | INT | |
BIGINT | BIGINT | |
FLOAT | FLOAT | |
DOUBLE | DOUBLE | |
DECIMAL(p, s) | DECIMAL(p, s) | |
BOOLEAN | BOOLEAN | |
DATE | DATE | |
TIMESTAMP | DATETIME | |
TIMESTAMP_LTZ | TIMESTAMP_LTZ | |
CHAR(n) | CHAR(n) | |
VARCHAR(n) | VARCHAR(n) |