Kafka

Kafka Pipeline 连接器 #

Kafka Pipeline 连接器可以用作 Pipeline 的 Data Sink,将数据写入Kafka。 本文档介绍如何设置 Kafka Pipeline 连接器。

连接器的功能 #

  • 自动建表
  • 表结构变更同步
  • 数据实时同步

如何创建 Pipeline #

从 MySQL 读取数据同步到 Kafka 的 Pipeline 可以定义如下:

source:
  type: mysql
  name: MySQL Source
  hostname: 127.0.0.1
  port: 3306
  username: admin
  password: pass
  tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
  server-id: 5401-5404

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: PLAINTEXT://localhost:62510

pipeline:
  name: MySQL to Kafka Pipeline
  parallelism: 2

Pipeline 连接器配置项 #

Option Required Default Type Description
type required (none) String 指定要使用的连接器, 这里需要设置成 'kafka'
name optional (none) String Sink 的名称。
value.format optional (none) String 用于序列化 Kafka 消息的值部分数据的格式。可选的填写值包括 debezium-jsoncanal-json, 默认值为 `debezium-json`,并且目前不支持用户自定义输出格式。
properties.bootstrap.servers required (none) String 用于建立与 Kafka 集群初始连接的主机/端口对列表。
topic optional (none) String 如果配置了这个参数,所有的消息都会发送到这一个主题。
sink.add-tableId-to-header-enabled optional (none) Boolean 如果配置了这个参数,所有的消息都会带上键为 `namespace`, 'schemaName', 'tableName',值为事件 TableId 里对应的 字符串的 header。
properties.* optional (none) String 将 Kafka 支持的参数传递给 pipeline,参考 Kafka consume options
sink.custom-header optional (none) String Kafka 记录自定义的 Header。每个 Header 使用 ','分割, 键值使用 ':' 分割。举例来说,可以使用这种方式 'key1:value1,key2:value2'。

使用说明 #

  • 写入 Kafka 的 topic 默认会是上游表 namespace.schemaName.tableName 对应的字符串,可以通过 pipeline 的 route 功能进行修改。
  • 如果配置了 topic 参数,所有的消息都会发送到这一个主题。
  • 写入 Kafka 的 topic 如果不存在,则会默认创建。

数据类型映射 #

CDC type JSON type NOTE
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL(p, s) DECIMAL(p, s)
BOOLEAN BOOLEAN
DATE DATE
TIMESTAMP DATETIME
TIMESTAMP_LTZ TIMESTAMP_LTZ
CHAR(n) CHAR(n)
VARCHAR(n) VARCHAR(n)

Back to top