This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.
Paimon
Paimon Pipeline 连接器 #
Paimon Pipeline 连接器可以用作 Pipeline 的 Data Sink,将数据写入Paimon。 本文档介绍如何设置 Paimon Pipeline 连接器。
连接器的功能 #
- 自动建表
- 表结构变更同步
- 数据实时同步
如何创建 Pipeline #
从 MySQL 读取数据同步到 Paimon 的 Pipeline 可以定义如下:
source:
type: mysql
name: MySQL Source
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /path/warehouse
pipeline:
name: MySQL to Paimon Pipeline
parallelism: 2
Pipeline 连接器配置项 #
Option | Required | Default | Type | Description |
---|---|---|---|---|
type | required | (none) | String | 指定要使用的连接器, 这里需要设置成 'paimon' . |
name | optional | (none) | String | Sink 的名称. |
catalog.properties.metastore | optional | "filesystem" |
String | 用于构建 Paimon Catalog 的类型。可选填值 filesystem 或者 hive。 |
catalog.properties.warehouse | optional | (none) | String | Paimon 仓库存储数据的根目录。 |
catalog.properties.uri | optional | (none) | String | Hive metastore 的 uri,在 metastore 设置为 hive 的时候需要。 |
commit.user | optional | admin |
String | 提交数据文件时的用户名。 |
partition.key | optional | (none) | String | 设置每个分区表的分区字段,允许填写成多个分区表的多个分区字段。 不同的表使用 ';'分割, 而不同的字段则使用 ','分割。举个例子, 我们可以为两张表的不同分区字段作如下的设置 'testdb.table1:id1,id2;testdb.table2:name'。 |
catalog.properties.* | optional | (none) | String | 将 Paimon catalog 支持的参数传递给 pipeline,参考 Paimon catalog options。 |
table.properties.* | optional | (none) | String | 将 Paimon table 支持的参数传递给 pipeline,参考 Paimon table options。 |
使用说明 #
-
只支持主键表,因此源表必须有主键
-
暂不支持 exactly-once,连接器 通过 at-least-once 和主键表实现幂等写
数据类型映射 #
CDC type | Paimon type | NOTE |
---|---|---|
TINYINT | TINYINT | |
SMALLINT | SMALLINT | |
INT | INT | |
BIGINT | BIGINT | |
FLOAT | FLOAT | |
DOUBLE | DOUBLE | |
DECIMAL(p, s) | DECIMAL(p, s) | |
BOOLEAN | BOOLEAN | |
DATE | DATE | |
TIMESTAMP | DATETIME | |
TIMESTAMP_LTZ | TIMESTAMP_LTZ | |
CHAR(n) | CHAR(n) | |
VARCHAR(n) | VARCHAR(n) |