MySQL
This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.

MySQL Connector #

MySQL connector allows reading snapshot data and incremental data from MySQL database and provides end-to-end full-database data synchronization capabilities. This document describes how to setup the MySQL connector.

Dependencies #

Since MySQL Connector’s GPLv2 license is incompatible with Flink CDC project, we can’t provide MySQL connector in prebuilt connector jar packages. You may need to configure the following dependencies manually, and pass it with --jar argument of Flink CDC CLI when submitting YAML pipeline jobs.

Dependency Item Description
mysql:mysql-connector-java:8.0.27 Used for connecting to MySQL database.

Example #

An example of the pipeline for reading data from MySQL and sink to Doris can be defined as follows:

source:
   type: mysql
   name: MySQL Source
   hostname: 127.0.0.1
   port: 3306
   username: admin
   password: pass
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404

sink:
  type: doris
  name: Doris Sink
  fenodes: 127.0.0.1:8030
  username: root
  password: pass

pipeline:
   name: MySQL to Doris Pipeline
   parallelism: 4

Connector Options #

Option Required Default Type Description
hostname required (none) String IP address or hostname of the MySQL database server.
port optional 3306 Integer Integer port number of the MySQL database server.
username required (none) String Name of the MySQL database to use when connecting to the MySQL database server.
password required (none) String Password to use when connecting to the MySQL database server.
tables required (none) String Table name of the MySQL database to monitor. The table-name also supports regular expressions to monitor multiple tables that satisfy the regular expressions.
It is important to note that the dot (.) is treated as a delimiter for database and table names. If there is a need to use a dot (.) in a regular expression to match any character, it is necessary to escape the dot with a backslash.
eg. db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*
tables.exclude optional (none) String Table name of the MySQL database to exclude, parameter will have an exclusion effect after the tables parameter. The table-name also supports regular expressions to exclude multiple tables that satisfy the regular expressions.
The usage is the same as the tables parameter
schema-change.enabled optional true Boolean Whether to send schema change events, so that downstream sinks can respond to schema changes and achieve table structure synchronization.
server-id optional (none) String A numeric ID or a numeric ID range of this database client, The numeric ID syntax is like '5400', the numeric ID range syntax is like '5400-5408', The numeric ID range syntax is recommended when 'scan.incremental.snapshot.enabled' enabled. Every ID must be unique across all currently-running database processes in the MySQL cluster. This connector joins the MySQL cluster as another server (with this unique ID) so it can read the binlog. By default, a random number is generated between 5400 and 6400, though we recommend setting an explicit value.
scan.incremental.snapshot.chunk.size optional 8096 Integer The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table.
scan.snapshot.fetch.size optional 1024 Integer The maximum fetch size for per poll when read table snapshot.
scan.startup.mode optional initial String Optional startup mode for MySQL CDC consumer, valid enumerations are "initial", "earliest-offset", "latest-offset", "specific-offset", "timestamp" and "snapshot".
scan.startup.specific-offset.file optional (none) String Optional binlog file name used in case of "specific-offset" startup mode
scan.startup.specific-offset.pos optional (none) Long Optional binlog file position used in case of "specific-offset" startup mode
scan.startup.specific-offset.gtid-set optional (none) String Optional GTID set used in case of "specific-offset" startup mode
scan.startup.timestamp-millis optional (none) Long Optional millisecond timestamp used in case of "timestamp" startup mode.
scan.startup.specific-offset.skip-events optional (none) Long Optional number of events to skip after the specific starting offset
scan.startup.specific-offset.skip-rows optional (none) Long Optional number of rows to skip after the specific starting offset
connect.timeout optional 30s Duration The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out. This value cannot be less than 250ms.
connect.max-retries optional 3 Integer The max retry times that the connector should retry to build MySQL database server connection.
connection.pool.size optional 20 Integer The connection pool size.
jdbc.properties.* optional 20 String Option to pass custom JDBC URL properties. User can pass custom properties like 'jdbc.properties.useSSL' = 'false'.
heartbeat.interval optional 30s Duration The interval of sending heartbeat event for tracing the latest available binlog offsets.
debezium.* optional (none) String Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from MySQL server. For example: 'debezium.snapshot.mode' = 'never'. See more about the Debezium's MySQL Connector properties
scan.incremental.close-idle-reader.enabled optional false Boolean Whether to close idle readers at the end of the snapshot phase.
The flink version is required to be greater than or equal to 1.14 when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to true.
If the flink version is greater than or equal to 1.15, the default value of 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' has been changed to true, so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
scan.newly-added-table.enabled optional false Boolean Whether to enable scan the newly added tables feature or not, by default is false. This option is only useful when we start the job from a savepoint/checkpoint.
scan.binlog.newly-added-table.enabled optional false Boolean In binlog reading stage, whether to scan the ddl and dml statements of newly added tables or not, by default is false.
The difference between scan.newly-added-table.enabled and scan.binlog.newly-added-table.enabled options is:
scan.newly-added-table.enabled: do re-snapshot & binlog-reading for newly added table when restored;
scan.binlog.newly-added-table.enabled: only do binlog-reading for newly added table during binlog reading phase.

Startup Reading Position #

The config option scan.startup.mode specifies the startup mode for MySQL CDC consumer. The valid enumerations are:

  • initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
  • earliest-offset: Skip snapshot phase and start reading binlog events from the earliest accessible binlog offset.
  • latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.
  • specific-offset: Skip snapshot phase and start reading binlog events from a specific offset. The offset could be specified with binlog filename and position, or a GTID set if GTID is enabled on server.
  • timestamp: Skip snapshot phase and start reading binlog events from a specific timestamp.
  • snapshot: Only the snapshot phase is performed and exits after the snapshot phase reading is completed.

For example in YAML definition:

source:
  type: mysql
  scan.startup.mode: earliest-offset                    # Start from earliest offset
  scan.startup.mode: latest-offset                      # Start from latest offset
  scan.startup.mode: specific-offset                    # Start from specific offset
  scan.startup.mode: timestamp                          # Start from timestamp
  scan.startup.mode: snapshot                          # Read snapshot only
  scan.startup.specific-offset.file: 'mysql-bin.000003' # Binlog filename under specific offset startup mode
  scan.startup.specific-offset.pos: 4                   # Binlog position under specific offset mode
  scan.startup.specific-offset.gtid-set: 24DA167-...    # GTID set under specific offset startup mode
  scan.startup.timestamp-millis: 1667232000000          # Timestamp under timestamp startup mode
  # ...

Data Type Mapping #

MySQL type Flink CDC type Note
TINYINT(n) TINYINT
SMALLINT
TINYINT UNSIGNED
TINYINT UNSIGNED ZEROFILL
SMALLINT
INT
YEAR
MEDIUMINT
MEDIUMINT UNSIGNED
MEDIUMINT UNSIGNED ZEROFILL
SMALLINT UNSIGNED
SMALLINT UNSIGNED ZEROFILL
INT
BIGINT
INT UNSIGNED
INT UNSIGNED ZEROFILL
BIGINT
BIGINT UNSIGNED
BIGINT UNSIGNED ZEROFILL
SERIAL
DECIMAL(20, 0)
FLOAT
FLOAT UNSIGNED
FLOAT UNSIGNED ZEROFILL
FLOAT
REAL
REAL UNSIGNED
REAL UNSIGNED ZEROFILL
DOUBLE
DOUBLE UNSIGNED
DOUBLE UNSIGNED ZEROFILL
DOUBLE PRECISION
DOUBLE PRECISION UNSIGNED
DOUBLE PRECISION UNSIGNED ZEROFILL
DOUBLE
NUMERIC(p, s)
NUMERIC(p, s) UNSIGNED
NUMERIC(p, s) UNSIGNED ZEROFILL
DECIMAL(p, s)
DECIMAL(p, s) UNSIGNED
DECIMAL(p, s) UNSIGNED ZEROFILL
FIXED(p, s)
FIXED(p, s) UNSIGNED
FIXED(p, s) UNSIGNED ZEROFILL
where p <= 38
DECIMAL(p, s)
NUMERIC(p, s)
NUMERIC(p, s) UNSIGNED
NUMERIC(p, s) UNSIGNED ZEROFILL
DECIMAL(p, s)
DECIMAL(p, s) UNSIGNED
DECIMAL(p, s) UNSIGNED ZEROFILL
FIXED(p, s)
FIXED(p, s) UNSIGNED
FIXED(p, s) UNSIGNED ZEROFILL
where 38 < p <= 65
STRING The precision for DECIMAL data type is up to 65 in MySQL, but the precision for DECIMAL is limited to 38 in Flink. So if you define a decimal column whose precision is greater than 38, you should map it to STRING to avoid precision loss.
BOOLEAN
TINYINT(1)
BIT(1)
BOOLEAN
DATE DATE
TIME [(p)] TIME [(p)]
TIMESTAMP [(p)] TIMESTAMP_LTZ [(p)]
DATETIME [(p)] TIMESTAMP [(p)]
CHAR(n) CHAR(n)
VARCHAR(n) VARCHAR(n)
BIT(n) BINARY(⌈(n + 7) / 8⌉)
BINARY(n) BINARY(n)
VARBINARY(N) VARBINARY(N)
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
STRING
TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
BYTES Currently, for BLOB data type in MySQL, only the blob whose length isn't greater than 2,147,483,647(2 ** 31 - 1) is supported.
ENUM STRING
JSON STRING The JSON data type will be converted into STRING with JSON format in Flink.
SET - Not supported yet.
GEOMETRY
POINT
LINESTRING
POLYGON
MULTIPOINT
MULTILINESTRING
MULTIPOLYGON
GEOMETRYCOLLECTION
STRING The spatial data types in MySQL will be converted into STRING with a fixed Json format. Please see MySQL Spatial Data Types Mapping section for more detailed information.

MySQL Spatial Data Types Mapping #

The spatial data types except for GEOMETRYCOLLECTION in MySQL will be converted into Json String with a fixed format like:

{"srid": 0 , "type": "xxx", "coordinates": [0, 0]}

The field srid identifies the SRS in which the geometry is defined, SRID 0 is the default for new geometry values if no SRID is specified. As only MySQL 8+ support to specific SRID when define spatial data type, the field srid will always be 0 in MySQL with a lower version.

The field type identifies the spatial data type, such as POINT/LINESTRING/POLYGON.

The field coordinates represents the coordinates of the spatial data.

For GEOMETRYCOLLECTION, it will be converted into Json String with a fixed format like:

{"srid": 0 , "type": "GeometryCollection", "geometries": [{"type":"Point","coordinates":[10,10]}]}

The field geometries is an array contains all spatial data.

The example for different spatial data types mapping is as follows:

Spatial data in MySQL Json String converted in Flink
POINT(1 1) {"coordinates":[1,1],"type":"Point","srid":0}
LINESTRING(3 0, 3 3, 3 5) {"coordinates":[[3,0],[3,3],[3,5]],"type":"LineString","srid":0}
POLYGON((1 1, 2 1, 2 2, 1 2, 1 1)) {"coordinates":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],"type":"Polygon","srid":0}
MULTIPOINT((1 1),(2 2)) {"coordinates":[[1,1],[2,2]],"type":"MultiPoint","srid":0}
MultiLineString((1 1,2 2,3 3),(4 4,5 5)) {"coordinates":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],"type":"MultiLineString","srid":0}
MULTIPOLYGON(((0 0, 10 0, 10 10, 0 10, 0 0)), ((5 5, 7 5, 7 7, 5 7, 5 5))) {"coordinates":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],"type":"MultiPolygon","srid":0}
GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20)) {"geometries":[{"type":"Point","coordinates":[10,10]},{"type":"Point","coordinates":[30,30]},{"type":"LineString","coordinates":[[15,15],[20,20]]}],"type":"GeometryCollection","srid":0}

Back to top