This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.
MySQL Connector #
MySQL connector allows reading snapshot data and incremental data from MySQL database and provides end-to-end full-database data synchronization capabilities. This document describes how to setup the MySQL connector.
Dependencies #
Since MySQL Connector’s GPLv2 license is incompatible with Flink CDC project, we can’t provide MySQL connector in prebuilt connector jar packages.
You may need to configure the following dependencies manually, and pass it with --jar
argument of Flink CDC CLI when submitting YAML pipeline jobs.
Dependency Item | Description |
---|---|
mysql:mysql-connector-java:8.0.27 | Used for connecting to MySQL database. |
Example #
An example of the pipeline for reading data from MySQL and sink to Doris can be defined as follows:
source:
type: mysql
name: MySQL Source
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: doris
name: Doris Sink
fenodes: 127.0.0.1:8030
username: root
password: pass
pipeline:
name: MySQL to Doris Pipeline
parallelism: 4
Connector Options #
Option | Required | Default | Type | Description |
---|---|---|---|---|
hostname | required | (none) | String | IP address or hostname of the MySQL database server. |
port | optional | 3306 | Integer | Integer port number of the MySQL database server. |
username | required | (none) | String | Name of the MySQL database to use when connecting to the MySQL database server. |
password | required | (none) | String | Password to use when connecting to the MySQL database server. |
tables | required | (none) | String | Table name of the MySQL database to monitor. The table-name also supports regular expressions to monitor multiple tables that satisfy the regular expressions. It is important to note that the dot (.) is treated as a delimiter for database and table names. If there is a need to use a dot (.) in a regular expression to match any character, it is necessary to escape the dot with a backslash. eg. db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.* |
tables.exclude | optional | (none) | String | Table name of the MySQL database to exclude, parameter will have an exclusion effect after the tables parameter. The table-name also supports regular expressions to exclude multiple tables that satisfy the regular expressions. The usage is the same as the tables parameter |
schema-change.enabled | optional | true | Boolean | Whether to send schema change events, so that downstream sinks can respond to schema changes and achieve table structure synchronization. |
server-id | optional | (none) | String | A numeric ID or a numeric ID range of this database client, The numeric ID syntax is like '5400', the numeric ID range syntax is like '5400-5408', The numeric ID range syntax is recommended when 'scan.incremental.snapshot.enabled' enabled. Every ID must be unique across all currently-running database processes in the MySQL cluster. This connector joins the MySQL cluster as another server (with this unique ID) so it can read the binlog. By default, a random number is generated between 5400 and 6400, though we recommend setting an explicit value. |
scan.incremental.snapshot.chunk.size | optional | 8096 | Integer | The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table. |
scan.snapshot.fetch.size | optional | 1024 | Integer | The maximum fetch size for per poll when read table snapshot. |
scan.startup.mode | optional | initial | String | Optional startup mode for MySQL CDC consumer, valid enumerations are "initial", "earliest-offset", "latest-offset", "specific-offset", "timestamp" and "snapshot". |
scan.startup.specific-offset.file | optional | (none) | String | Optional binlog file name used in case of "specific-offset" startup mode |
scan.startup.specific-offset.pos | optional | (none) | Long | Optional binlog file position used in case of "specific-offset" startup mode |
scan.startup.specific-offset.gtid-set | optional | (none) | String | Optional GTID set used in case of "specific-offset" startup mode |
scan.startup.timestamp-millis | optional | (none) | Long | Optional millisecond timestamp used in case of "timestamp" startup mode. |
scan.startup.specific-offset.skip-events | optional | (none) | Long | Optional number of events to skip after the specific starting offset |
scan.startup.specific-offset.skip-rows | optional | (none) | Long | Optional number of rows to skip after the specific starting offset |
connect.timeout | optional | 30s | Duration | The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out. This value cannot be less than 250ms. |
connect.max-retries | optional | 3 | Integer | The max retry times that the connector should retry to build MySQL database server connection. |
connection.pool.size | optional | 20 | Integer | The connection pool size. |
jdbc.properties.* | optional | 20 | String | Option to pass custom JDBC URL properties. User can pass custom properties like 'jdbc.properties.useSSL' = 'false'. |
heartbeat.interval | optional | 30s | Duration | The interval of sending heartbeat event for tracing the latest available binlog offsets. |
debezium.* | optional | (none) | String | Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from MySQL server.
For example: 'debezium.snapshot.mode' = 'never' .
See more about the Debezium's MySQL Connector properties |
scan.incremental.close-idle-reader.enabled | optional | false | Boolean | Whether to close idle readers at the end of the snapshot phase. The flink version is required to be greater than or equal to 1.14 when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to true. If the flink version is greater than or equal to 1.15, the default value of 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' has been changed to true, so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true' |
scan.newly-added-table.enabled | optional | false | Boolean | Whether to enable scan the newly added tables feature or not, by default is false. This option is only useful when we start the job from a savepoint/checkpoint. |
scan.binlog.newly-added-table.enabled | optional | false | Boolean | In binlog reading stage, whether to scan the ddl and dml statements of newly added tables or not, by default is false. The difference between scan.newly-added-table.enabled and scan.binlog.newly-added-table.enabled options is: scan.newly-added-table.enabled: do re-snapshot & binlog-reading for newly added table when restored; scan.binlog.newly-added-table.enabled: only do binlog-reading for newly added table during binlog reading phase. |
Startup Reading Position #
The config option scan.startup.mode
specifies the startup mode for MySQL CDC consumer. The valid enumerations are:
initial
(default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.earliest-offset
: Skip snapshot phase and start reading binlog events from the earliest accessible binlog offset.latest-offset
: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.specific-offset
: Skip snapshot phase and start reading binlog events from a specific offset. The offset could be specified with binlog filename and position, or a GTID set if GTID is enabled on server.timestamp
: Skip snapshot phase and start reading binlog events from a specific timestamp.snapshot
: Only the snapshot phase is performed and exits after the snapshot phase reading is completed.
For example in YAML definition:
source:
type: mysql
scan.startup.mode: earliest-offset # Start from earliest offset
scan.startup.mode: latest-offset # Start from latest offset
scan.startup.mode: specific-offset # Start from specific offset
scan.startup.mode: timestamp # Start from timestamp
scan.startup.mode: snapshot # Read snapshot only
scan.startup.specific-offset.file: 'mysql-bin.000003' # Binlog filename under specific offset startup mode
scan.startup.specific-offset.pos: 4 # Binlog position under specific offset mode
scan.startup.specific-offset.gtid-set: 24DA167-... # GTID set under specific offset startup mode
scan.startup.timestamp-millis: 1667232000000 # Timestamp under timestamp startup mode
# ...
Data Type Mapping #
MySQL type | Flink CDC type | Note |
---|---|---|
TINYINT(n) | TINYINT | |
SMALLINT TINYINT UNSIGNED TINYINT UNSIGNED ZEROFILL |
SMALLINT | |
INT YEAR MEDIUMINT MEDIUMINT UNSIGNED MEDIUMINT UNSIGNED ZEROFILL SMALLINT UNSIGNED SMALLINT UNSIGNED ZEROFILL |
INT | |
BIGINT INT UNSIGNED INT UNSIGNED ZEROFILL |
BIGINT | |
BIGINT UNSIGNED BIGINT UNSIGNED ZEROFILL SERIAL |
DECIMAL(20, 0) | |
FLOAT FLOAT UNSIGNED FLOAT UNSIGNED ZEROFILL |
FLOAT | |
REAL REAL UNSIGNED REAL UNSIGNED ZEROFILL DOUBLE DOUBLE UNSIGNED DOUBLE UNSIGNED ZEROFILL DOUBLE PRECISION DOUBLE PRECISION UNSIGNED DOUBLE PRECISION UNSIGNED ZEROFILL FLOAT(p, s) REAL(p, s) DOUBLE(p, s) |
DOUBLE | |
NUMERIC(p, s) NUMERIC(p, s) UNSIGNED NUMERIC(p, s) UNSIGNED ZEROFILL DECIMAL(p, s) DECIMAL(p, s) UNSIGNED DECIMAL(p, s) UNSIGNED ZEROFILL FIXED(p, s) FIXED(p, s) UNSIGNED FIXED(p, s) UNSIGNED ZEROFILL where p <= 38 |
DECIMAL(p, s) | |
NUMERIC(p, s) NUMERIC(p, s) UNSIGNED NUMERIC(p, s) UNSIGNED ZEROFILL DECIMAL(p, s) DECIMAL(p, s) UNSIGNED DECIMAL(p, s) UNSIGNED ZEROFILL FIXED(p, s) FIXED(p, s) UNSIGNED FIXED(p, s) UNSIGNED ZEROFILL where 38 < p <= 65 |
STRING | The precision for DECIMAL data type is up to 65 in MySQL, but the precision for DECIMAL is limited to 38 in Flink. So if you define a decimal column whose precision is greater than 38, you should map it to STRING to avoid precision loss. |
BOOLEAN TINYINT(1) BIT(1) |
BOOLEAN | |
DATE | DATE | |
TIME [(p)] | TIME [(p)] | |
TIMESTAMP [(p)] | TIMESTAMP_LTZ [(p)] | |
DATETIME [(p)] | TIMESTAMP [(p)] | |
CHAR(n) | CHAR(n) | |
VARCHAR(n) | VARCHAR(n) | |
BIT(n) | BINARY(⌈(n + 7) / 8⌉) | |
BINARY(n) | BINARY(n) | |
VARBINARY(N) | VARBINARY(N) | |
TINYTEXT TEXT MEDIUMTEXT LONGTEXT |
STRING | |
TINYBLOB BLOB MEDIUMBLOB LONGBLOB |
BYTES | Currently, for BLOB data type in MySQL, only the blob whose length isn't greater than 2,147,483,647(2 ** 31 - 1) is supported. |
ENUM | STRING | |
JSON | STRING | The JSON data type will be converted into STRING with JSON format in Flink. |
SET | - | Not supported yet. |
GEOMETRY POINT LINESTRING POLYGON MULTIPOINT MULTILINESTRING MULTIPOLYGON GEOMETRYCOLLECTION |
STRING | The spatial data types in MySQL will be converted into STRING with a fixed Json format. Please see MySQL Spatial Data Types Mapping section for more detailed information. |
MySQL Spatial Data Types Mapping #
The spatial data types except for GEOMETRYCOLLECTION
in MySQL will be converted into Json String with a fixed format like:
{"srid": 0 , "type": "xxx", "coordinates": [0, 0]}
The field srid
identifies the SRS in which the geometry is defined, SRID 0 is the default for new geometry values if no SRID is specified.
As only MySQL 8+ support to specific SRID when define spatial data type, the field srid
will always be 0 in MySQL with a lower version.
The field type
identifies the spatial data type, such as POINT
/LINESTRING
/POLYGON
.
The field coordinates
represents the coordinates
of the spatial data.
For GEOMETRYCOLLECTION
, it will be converted into Json String with a fixed format like:
{"srid": 0 , "type": "GeometryCollection", "geometries": [{"type":"Point","coordinates":[10,10]}]}
The field geometries
is an array contains all spatial data.
The example for different spatial data types mapping is as follows:
Spatial data in MySQL | Json String converted in Flink |
---|---|
POINT(1 1) | {"coordinates":[1,1],"type":"Point","srid":0} |
LINESTRING(3 0, 3 3, 3 5) | {"coordinates":[[3,0],[3,3],[3,5]],"type":"LineString","srid":0} |
POLYGON((1 1, 2 1, 2 2, 1 2, 1 1)) | {"coordinates":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],"type":"Polygon","srid":0} |
MULTIPOINT((1 1),(2 2)) | {"coordinates":[[1,1],[2,2]],"type":"MultiPoint","srid":0} |
MultiLineString((1 1,2 2,3 3),(4 4,5 5)) | {"coordinates":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],"type":"MultiLineString","srid":0} |
MULTIPOLYGON(((0 0, 10 0, 10 10, 0 10, 0 0)), ((5 5, 7 5, 7 7, 5 7, 5 5))) | {"coordinates":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],"type":"MultiPolygon","srid":0} |
GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20)) | {"geometries":[{"type":"Point","coordinates":[10,10]},{"type":"Point","coordinates":[30,30]},{"type":"LineString","coordinates":[[15,15],[20,20]]}],"type":"GeometryCollection","srid":0} |