This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.
MySQL
MySQL Connector #
MySQL CDC Pipeline 连接器允许从 MySQL 数据库读取快照数据和增量数据,并提供端到端的整库数据同步能力。 本文描述了如何设置 MySQL CDC Pipeline 连接器。
依赖配置 #
由于 MySQL Connector 采用的 GPLv2 协议与 Flink CDC 项目不兼容,我们无法在 jar 包中提供 MySQL 连接器。
您可能需要手动配置以下依赖,并在提交 YAML 作业时使用 Flink CDC CLI 的 --jar
参数将其传入:
依赖名称 | 说明 |
---|---|
mysql:mysql-connector-java:8.0.27 | 用于连接到 MySQL 数据库。 |
示例 #
从 MySQL 读取数据同步到 Doris 的 Pipeline 可以定义如下:
source:
type: mysql
name: MySQL Source
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: doris
name: Doris Sink
fenodes: 127.0.0.1:8030
username: root
password: pass
pipeline:
name: MySQL to Doris Pipeline
parallelism: 4
连接器配置项 #
Option | Required | Default | Type | Description |
---|---|---|---|---|
hostname | required | (none) | String | MySQL 数据库服务器的 IP 地址或主机名。 |
port | optional | 3306 | Integer | MySQL 数据库服务器的整数端口号。 |
username | required | (none) | String | 连接到 MySQL 数据库服务器时要使用的 MySQL 用户的名称。 |
password | required | (none) | String | 连接 MySQL 数据库服务器时使用的密码。 |
tables | required | (none) | String | 需要监视的 MySQL 数据库的表名。表名支持正则表达式,以监视满足正则表达式的多个表。 需要注意的是,点号(.)被视为数据库和表名的分隔符。 如果需要在正则表达式中使用点(.)来匹配任何字符,必须使用反斜杠对点进行转义。 例如,db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.* |
tables.exclude | optional | (none) | String | 需要排除的 MySQL 数据库的表名,参数会在tables参数后发生排除作用。表名支持正则表达式,以排除满足正则表达式的多个表。 用法和tables参数相同 |
schema-change.enabled | optional | true | Boolean | 是否发送模式更改事件,下游 sink 可以响应模式变更事件实现表结构同步,默认为true。 |
server-id | optional | (none) | String | 读取数据使用的 server id,server id 可以是个整数或者一个整数范围,比如 '5400' 或 '5400-5408', 建议在 'scan.incremental.snapshot.enabled' 参数为启用时,配置成整数范围。因为在当前 MySQL 集群中运行的所有 slave 节点,标记每个 salve 节点的 id 都必须是唯一的。 所以当连接器加入 MySQL 集群作为另一个 slave 节点(并且具有唯一 id 的情况下),它就可以读取 binlog。 默认情况下,连接器会在 5400 和 6400 之间生成一个随机数,但是我们建议用户明确指定 Server id。 |
scan.incremental.snapshot.chunk.size | optional | 8096 | Integer | 表快照的块大小(行数),读取表的快照时,捕获的表被拆分为多个块。 |
scan.snapshot.fetch.size | optional | 1024 | Integer | 读取表快照时每次读取数据的最大条数。 |
scan.startup.mode | optional | initial | String | MySQL CDC 消费者可选的启动模式, 合法的模式为 "initial","earliest-offset","latest-offset","specific-offset","timestamp" 和 ""snapshot"。 |
scan.startup.specific-offset.file | optional | (none) | String | 在 "specific-offset" 启动模式下,启动位点的 binlog 文件名。 |
scan.startup.specific-offset.pos | optional | (none) | Long | 在 "specific-offset" 启动模式下,启动位点的 binlog 文件位置。 |
scan.startup.specific-offset.gtid-set | optional | (none) | String | 在 "specific-offset" 启动模式下,启动位点的 GTID 集合。 |
scan.startup.timestamp-millis | optional | (none) | Long | 在 "timestamp" 启动模式下,启动位点的毫秒时间戳。 |
scan.startup.specific-offset.skip-events | optional | (none) | Long | 在指定的启动位点后需要跳过的事件数量。 |
scan.startup.specific-offset.skip-rows | optional | (none) | Long | 在指定的启动位点后需要跳过的数据行数量。 |
connect.timeout | optional | 30s | Duration | 连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。该时长不能少于250毫秒。 |
connect.max-retries | optional | 3 | Integer | 连接器应重试以建立 MySQL 数据库服务器连接的最大重试次数。 |
connection.pool.size | optional | 20 | Integer | 连接池大小。 |
jdbc.properties.* | optional | 20 | String | 传递自定义 JDBC URL 属性的选项。用户可以传递自定义属性,如 'jdbc.properties.useSSL' = 'false'. |
heartbeat.interval | optional | 30s | Duration | 用于跟踪最新可用 binlog 偏移的发送心跳事件的间隔。 |
debezium.* | optional | (none) | String | 将 Debezium 的属性传递给 Debezium 嵌入式引擎,该引擎用于从 MySQL 服务器捕获数据更改。
例如: 'debezium.snapshot.mode' = 'never' .
查看更多关于 Debezium 的 MySQL 连接器属性 |
scan.incremental.close-idle-reader.enabled | optional | false | Boolean | 是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。 若 flink 版本大于等于 1.15,'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 默认值变更为 true,可以不用显式配置 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = true。 |
scan.newly-added-table.enabled | optional | false | Boolean | 是否启用动态加表特性,默认关闭。 此配置项只有作业从savepoint/checkpoint启动时才生效。 |
scan.binlog.newly-added-table.enabled | optional | false | Boolean | 在 binlog 读取阶段,是否读取新增表的表结构变更和数据变更,默认值是 false。 scan.newly-added-table.enabled 和 scan.binlog.newly-added-table.enabled 参数的不同在于: scan.newly-added-table.enabled: 在作业重启后,对新增表的全量和增量数据进行读取; scan.binlog.newly-added-table.enabled: 只在 binlog 读取阶段读取新增表的增量数据。 |
启动模式 #
配置选项scan.startup.mode
指定 MySQL CDC 使用者的启动模式。有效枚举包括:
initial
(默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。earliest-offset
:跳过快照阶段,从可读取的最早 binlog 位点开始读取latest-offset
:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。specific-offset
:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。timestamp
:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。snapshot
: 只进行快照阶段,跳过增量阶段,快照阶段读取结束后退出。
例如,可以在 YAML 配置文件中这样指定启动模式:
source:
type: mysql
scan.startup.mode: earliest-offset # Start from earliest offset
scan.startup.mode: latest-offset # Start from latest offset
scan.startup.mode: specific-offset # Start from specific offset
scan.startup.mode: timestamp # Start from timestamp
scan.startup.mode: snapshot # Read snapshot only
scan.startup.specific-offset.file: 'mysql-bin.000003' # Binlog filename under specific offset startup mode
scan.startup.specific-offset.pos: 4 # Binlog position under specific offset mode
scan.startup.specific-offset.gtid-set: 24DA167-... # GTID set under specific offset startup mode
scan.startup.timestamp-millis: 1667232000000 # Timestamp under timestamp startup mode
# ...
数据类型映射 #
MySQL type | CDC type | NOTE |
---|---|---|
TINYINT(n) | TINYINT | |
SMALLINT TINYINT UNSIGNED TINYINT UNSIGNED ZEROFILL |
SMALLINT | |
INT YEAR MEDIUMINT MEDIUMINT UNSIGNED MEDIUMINT UNSIGNED ZEROFILL SMALLINT UNSIGNED SMALLINT UNSIGNED ZEROFILL |
INT | |
BIGINT INT UNSIGNED INT UNSIGNED ZEROFILL |
BIGINT | |
BIGINT UNSIGNED BIGINT UNSIGNED ZEROFILL SERIAL |
DECIMAL(20, 0) | |
FLOAT FLOAT UNSIGNED FLOAT UNSIGNED ZEROFILL |
FLOAT | |
REAL REAL UNSIGNED REAL UNSIGNED ZEROFILL DOUBLE DOUBLE UNSIGNED DOUBLE UNSIGNED ZEROFILL DOUBLE PRECISION DOUBLE PRECISION UNSIGNED DOUBLE PRECISION UNSIGNED ZEROFILL FLOAT(p, s) REAL(p, s) DOUBLE(p, s) |
DOUBLE | |
NUMERIC(p, s) NUMERIC(p, s) UNSIGNED NUMERIC(p, s) UNSIGNED ZEROFILL DECIMAL(p, s) DECIMAL(p, s) UNSIGNED DECIMAL(p, s) UNSIGNED ZEROFILL FIXED(p, s) FIXED(p, s) UNSIGNED FIXED(p, s) UNSIGNED ZEROFILL where p <= 38 |
DECIMAL(p, s) | |
NUMERIC(p, s) NUMERIC(p, s) UNSIGNED NUMERIC(p, s) UNSIGNED ZEROFILL DECIMAL(p, s) DECIMAL(p, s) UNSIGNED DECIMAL(p, s) UNSIGNED ZEROFILL FIXED(p, s) FIXED(p, s) UNSIGNED FIXED(p, s) UNSIGNED ZEROFILL where 38 < p <= 65 |
STRING | 在 MySQL 中,十进制数据类型的精度高达 65,但在 Flink 中,十进制数据类型的精度仅限于 38。所以,如果定义精度大于 38 的十进制列,则应将其映射到字符串以避免精度损失。 |
BOOLEAN TINYINT(1) BIT(1) |
BOOLEAN | |
DATE | DATE | |
TIME [(p)] | TIME [(p)] | |
TIMESTAMP [(p)] | TIMESTAMP_LTZ [(p)] | |
DATETIME [(p)] | TIMESTAMP [(p)] | |
CHAR(n) | CHAR(n) | |
VARCHAR(n) | VARCHAR(n) | |
BIT(n) | BINARY(⌈(n + 7) / 8⌉) | |
BINARY(n) | BINARY(n) | |
VARBINARY(N) | VARBINARY(N) | |
TINYTEXT TEXT MEDIUMTEXT LONGTEXT |
STRING | |
TINYBLOB BLOB MEDIUMBLOB LONGBLOB |
BYTES | 目前,对于 MySQL 中的 BLOB 数据类型,仅支持长度不大于 2147483647(2**31-1)的 blob。 |
ENUM | STRING | |
JSON | STRING | JSON 数据类型将在 Flink 中转换为 JSON 格式的字符串。 |
SET | - | 暂不支持 |
GEOMETRY POINT LINESTRING POLYGON MULTIPOINT MULTILINESTRING MULTIPOLYGON GEOMETRYCOLLECTION |
STRING | MySQL 中的空间数据类型将转换为具有固定 Json 格式的字符串。 请参考 MySQL 空间数据类型映射 章节了解更多详细信息。 |
空间数据类型映射 #
MySQL中除GEOMETRYCOLLECTION
之外的空间数据类型都会转换为 Json 字符串,格式固定,如:
{"srid": 0 , "type": "xxx", "coordinates": [0, 0]}
字段srid
标识定义几何体的 SRS,如果未指定 SRID,则 SRID 0 是新几何体值的默认值。
由于 MySQL 8+ 在定义空间数据类型时只支持特定的 SRID,因此在版本较低的MySQL中,字段srid
将始终为 0。
字段type
标识空间数据类型,例如POINT
/LINESTRING
/POLYGON
。
字段coordinates
表示空间数据的坐标
。
对于GEOMETRYCOLLECTION
,它将转换为 Json 字符串,格式固定,如:
{"srid": 0 , "type": "GeometryCollection", "geometries": [{"type":"Point","coordinates":[10,10]}]}
Geometrics
字段是一个包含所有空间数据的数组。
不同空间数据类型映射的示例如下:
Spatial data in MySQL | Json String converted in Flink |
---|---|
POINT(1 1) | {"coordinates":[1,1],"type":"Point","srid":0} |
LINESTRING(3 0, 3 3, 3 5) | {"coordinates":[[3,0],[3,3],[3,5]],"type":"LineString","srid":0} |
POLYGON((1 1, 2 1, 2 2, 1 2, 1 1)) | {"coordinates":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],"type":"Polygon","srid":0} |
MULTIPOINT((1 1),(2 2)) | {"coordinates":[[1,1],[2,2]],"type":"MultiPoint","srid":0} |
MultiLineString((1 1,2 2,3 3),(4 4,5 5)) | {"coordinates":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],"type":"MultiLineString","srid":0} |
MULTIPOLYGON(((0 0, 10 0, 10 10, 0 10, 0 0)), ((5 5, 7 5, 7 7, 5 7, 5 5))) | {"coordinates":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],"type":"MultiPolygon","srid":0} |
GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20)) | {"geometries":[{"type":"Point","coordinates":[10,10]},{"type":"Point","coordinates":[30,30]},{"type":"LineString","coordinates":[[15,15],[20,20]]}],"type":"GeometryCollection","srid":0} |