MySQL

MySQL CDC Connector #

The MySQL CDC connector allows for reading snapshot data and incremental data from MySQL database. This document describes how to setup the MySQL CDC connector to run SQL queries against MySQL databases.

Supported Databases #

Connector Database Driver
mysql-cdc
  • MySQL: 5.6, 5.7, 8.0.x
  • RDS MySQL: 5.6, 5.7, 8.0.x
  • PolarDB MySQL: 5.6, 5.7, 8.0.x
  • Aurora MySQL: 5.6, 5.7, 8.0.x
  • MariaDB: 10.x
  • PolarDB X: 2.0.1
  • JDBC Driver: 8.0.27

    Dependencies #

    In order to setup the MySQL CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

    Maven dependency #

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>3.2.0</version>
    </dependency>

    SQL Client JAR #

    Download link is available only for stable releases.

    Download flink-sql-connector-mysql-cdc-3.1.0.jar and put it under <FLINK_HOME>/lib/.

    Note: Refer to flink-sql-connector-mysql-cdc, more released versions will be available in the Maven central warehouse.

    Since MySQL Connector’s GPLv2 license is incompatible with Flink CDC project, we can’t provide MySQL connector in prebuilt connector jar packages. You may need to configure the following dependencies manually.

    Dependency Item Description
    mysql:mysql-connector-java:8.0.27 Used for connecting to MySQL database.

    Setup MySQL server #

    You have to define a MySQL user with appropriate permissions on all databases that the Debezium MySQL connector monitors.

    1. Create the MySQL user:
    mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
    
    1. Grant the required permissions to the user:
    mysql> GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
    

    Note: The RELOAD permissions is not required any more when scan.incremental.snapshot.enabled is enabled (enabled by default).

    1. Finalize the user’s permissions:
    mysql> FLUSH PRIVILEGES;
    

    See more about the permission explanation.

    Notes #

    Set a different SERVER ID for each reader #

    Every MySQL database client for reading binlog should have a unique id, called server id. MySQL server will use this id to maintain network connection and the binlog position. Therefore, if different jobs share a same server id, it may result to read from wrong binlog position. Thus, it is recommended to set different server id for each reader via the SQL Hints, e.g. assuming the source parallelism is 4, then we can use SELECT * FROM source_table /*+ OPTIONS('server-id'='5401-5404') */ ; to assign unique server id for each of the 4 source readers.

    Setting up MySQL session timeouts #

    When an initial consistent snapshot is made for large databases, your established connection could timeout while the tables are being read. You can prevent this behavior by configuring interactive_timeout and wait_timeout in your MySQL configuration file.

    • interactive_timeout: The number of seconds the server waits for activity on an interactive connection before closing it. See MySQL documentations.
    • wait_timeout: The number of seconds the server waits for activity on a noninteractive connection before closing it. See MySQL documentations.

    How to create a MySQL CDC table #

    The MySQL CDC table can be defined as following:

    -- checkpoint every 3000 milliseconds                       
    Flink SQL> SET 'execution.checkpointing.interval' = '3s';   
    
    -- register a MySQL table 'orders' in Flink SQL
    Flink SQL> CREATE TABLE orders (
         order_id INT,
         order_date TIMESTAMP(0),
         customer_name STRING,
         price DECIMAL(10, 5),
         product_id INT,
         order_status BOOLEAN,
         PRIMARY KEY(order_id) NOT ENFORCED
         ) WITH (
         'connector' = 'mysql-cdc',
         'hostname' = 'localhost',
         'port' = '3306',
         'username' = 'root',
         'password' = '123456',
         'database-name' = 'mydb',
         'table-name' = 'orders');
      
    -- read snapshot and binlogs from orders table
    Flink SQL> SELECT * FROM orders;
    

    Connector Options #

    Option Required Default Type Description
    connector required (none) String Specify what connector to use, here should be 'mysql-cdc'.
    hostname required (none) String IP address or hostname of the MySQL database server.
    username required (none) String Name of the MySQL database to use when connecting to the MySQL database server.
    password required (none) String Password to use when connecting to the MySQL database server.
    database-name required (none) String Database name of the MySQL server to monitor. The database-name also supports regular expressions to monitor multiple tables matches the regular expression.
    table-name required (none) String Table name of the MySQL database to monitor. The table-name also supports regular expressions to monitor multiple tables that satisfy the regular expressions. Note: When the MySQL CDC connector regularly matches the table name, it will concat the database-name and table-name filled in by the user through the string `\\.` to form a full-path regular expression, and then use the regular expression to match the fully qualified name of the table in the MySQL database.
    port optional 3306 Integer Integer port number of the MySQL database server.
    server-id optional (none) String A numeric ID or a numeric ID range of this database client, The numeric ID syntax is like '5400', the numeric ID range syntax is like '5400-5408', The numeric ID range syntax is recommended when 'scan.incremental.snapshot.enabled' enabled. Every ID must be unique across all currently-running database processes in the MySQL cluster. This connector joins the MySQL cluster as another server (with this unique ID) so it can read the binlog. By default, a random number is generated between 5400 and 6400, though we recommend setting an explicit value.
    scan.incremental.snapshot.enabled optional true Boolean Incremental snapshot is a new mechanism to read snapshot of a table. Compared to the old snapshot mechanism, the incremental snapshot has many advantages, including: (1) source can be parallel during snapshot reading, (2) source can perform checkpoints in the chunk granularity during snapshot reading, (3) source doesn't need to acquire global read lock (FLUSH TABLES WITH READ LOCK) before snapshot reading. If you would like the source run in parallel, each parallel reader should have an unique server id, so the 'server-id' must be a range like '5400-6400', and the range must be larger than the parallelism. Please see Incremental Snapshot Readingsection for more detailed information.
    scan.incremental.snapshot.chunk.size optional 8096 Integer The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table.
    scan.snapshot.fetch.size optional 1024 Integer The maximum fetch size for per poll when read table snapshot.
    scan.startup.mode optional initial String Optional startup mode for MySQL CDC consumer, valid enumerations are "initial", "earliest-offset", "latest-offset", "specific-offset", "timestamp" and "snapshot". Please see Startup Reading Position section for more detailed information.
    scan.startup.specific-offset.file optional (none) String Optional binlog file name used in case of "specific-offset" startup mode
    scan.startup.specific-offset.pos optional (none) Long Optional binlog file position used in case of "specific-offset" startup mode
    scan.startup.specific-offset.gtid-set optional (none) String Optional GTID set used in case of "specific-offset" startup mode
    scan.startup.timestamp-millis optional (none) Long Optional millisecond timestamp used in case of "timestamp" startup mode.
    scan.startup.specific-offset.skip-events optional (none) Long Optional number of events to skip after the specific starting offset
    scan.startup.specific-offset.skip-rows optional (none) Long Optional number of rows to skip after the specific starting offset
    server-time-zone optional (none) String The session time zone in database server, e.g. "Asia/Shanghai". It controls how the TIMESTAMP type in MYSQL converted to STRING. See more here. If not set, then ZoneId.systemDefault() is used to determine the server time zone.
    debezium.min.row. count.to.stream.result optional 1000 Integer During a snapshot operation, the connector will query each included table to produce a read event for all rows in that table. This parameter determines whether the MySQL connection will pull all results for a table into memory (which is fast but requires large amounts of memory), or whether the results will instead be streamed (can be slower, but will work for very large tables). The value specifies the minimum number of rows a table must contain before the connector will stream results, and defaults to 1,000. Set this parameter to '0' to skip all table size checks and always stream all results during a snapshot.
    connect.timeout optional 30s Duration The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out. This value cannot be less than 250ms.
    connect.max-retries optional 3 Integer The max retry times that the connector should retry to build MySQL database server connection.
    connection.pool.size optional 20 Integer The connection pool size.
    jdbc.properties.* optional 20 String Option to pass custom JDBC URL properties. User can pass custom properties like 'jdbc.properties.useSSL' = 'false'.
    heartbeat.interval optional 30s Duration The interval of sending heartbeat event for tracing the latest available binlog offsets.
    debezium.* optional (none) String Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from MySQL server. For example: 'debezium.snapshot.mode' = 'never'. See more about the Debezium's MySQL Connector properties
    scan.incremental.close-idle-reader.enabled optional false Boolean Whether to close idle readers at the end of the snapshot phase.
    The flink version is required to be greater than or equal to 1.14 when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to true.
    If the flink version is greater than or equal to 1.15, the default value of 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' has been changed to true, so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
    debezium.binary.handling.mode optional (none) String debezium.binary.handling.mode can be set to one of the following values: none: No processing is performed, and the binary data type is transmitted as a byte array (byte array). base64: The binary data type is converted to a Base64-encoded string and transmitted. hex: The binary data type is converted to a hexadecimal string and transmitted. The default value is none. Depending on your requirements and data types, you can choose the appropriate processing mode. If your database contains a large number of binary data types, it is recommended to use base64 or hex mode to make it easier to handle during transmission.

    Available Metadata #

    The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.

    Key DataType Description
    table_name STRING NOT NULL Name of the table that contain the row.
    database_name STRING NOT NULL Name of the database that contain the row.
    op_ts TIMESTAMP_LTZ(3) NOT NULL It indicates the time that the change was made in the database.
    If the record is read from snapshot of the table instead of the binlog, the value is always 0.
    row_kind STRING NOT NULL It indicates the row kind of the changelog,Note: The downstream SQL operator may fail to compare due to this new added column when processing the row retraction if the source operator chooses to output the 'row_kind' column for each record. It is recommended to use this metadata column only in simple synchronization jobs.
    '+I' means INSERT message, '-D' means DELETE message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message.

    The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:

    CREATE TABLE products
    (
        db_name       STRING METADATA FROM 'database_name' VIRTUAL,
        table_name    STRING METADATA FROM 'table_name' VIRTUAL,
        operation_ts  TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
        operation     STRING METADATA FROM 'row_kind' VIRTUAL,
        order_id      INT,
        order_date    TIMESTAMP(0),
        customer_name STRING,
        price         DECIMAL(10, 5),
        product_id    INT,
        order_status  BOOLEAN,
        PRIMARY KEY (order_id) NOT ENFORCED
    ) WITH (
          'connector' = 'mysql-cdc',
          'hostname' = 'localhost',
          'port' = '3306',
          'username' = 'root',
          'password' = '123456',
          'database-name' = 'mydb',
          'table-name' = 'orders'
          );
    

    The extended CREATE TABLE example demonstrates the usage of regex to match multi-tables:

    CREATE TABLE products
    (
        db_name       STRING METADATA FROM 'database_name' VIRTUAL,
        table_name    STRING METADATA FROM 'table_name' VIRTUAL,
        operation_ts  TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
        operation     STRING METADATA FROM 'row_kind' VIRTUAL,
        order_id      INT,
        order_date    TIMESTAMP(0),
        customer_name STRING,
        price         DECIMAL(10, 5),
        product_id    INT,
        order_status  BOOLEAN,
        PRIMARY KEY (order_id) NOT ENFORCED
    ) WITH (
          'connector' = 'mysql-cdc',
          'hostname' = 'localhost',
          'port' = '3306',
          'username' = 'root',
          'password' = '123456',
          'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})',
          'table-name' = '(t[5-8]|tt)'
          );
    
    example expression description
    prefix match ^(test).* This matches the database name or table name starts with prefix of test, e.g test1、test2.
    suffix match .*[p$] This matches the database name or table name ends with suffix of p, e.g cdcp、edcp.
    specific match txc This matches the database name or table name according to a specific name, e.g txc.

    It will use database-name\\.table-name as a pattern to match tables, as above examples using pattern (^(test).*|^(tpc).*|txc|.*[p$]|t{2})\\.(t[5-8]|tt) matches txc.tt、test2.test5.

    Features #

    Incremental Snapshot Reading #

    Incremental snapshot reading is a new mechanism to read snapshot of a table. Compared to the old snapshot mechanism, the incremental snapshot has many advantages, including:

    • (1) MySQL CDC Source can be parallel during snapshot reading
    • (2) MySQL CDC Source can perform checkpoints in the chunk granularity during snapshot reading
    • (3) MySQL CDC Source doesn’t need to acquire global read lock (FLUSH TABLES WITH READ LOCK) before snapshot reading

    If you would like the source run in parallel, each parallel reader should have an unique server id, so the ‘server-id’ must be a range like ‘5400-6400’, and the range must be larger than the parallelism.

    During the incremental snapshot reading, the MySQL CDC Source firstly splits snapshot chunks (splits) by user specified chunk key of table, and then MySQL CDC Source assigns the chunks to multiple readers to read the data of snapshot chunk.

    Controlling Parallelism #

    Incremental snapshot reading provides the ability to read snapshot data parallelly. You can control the source parallelism by setting the job parallelism parallelism.default. For example, in SQL CLI:

    Flink SQL> SET 'parallelism.default' = 8;
    

    Checkpoint #

    Incremental snapshot reading provides the ability to perform checkpoint in chunk level. It resolves the checkpoint timeout problem in previous version with old snapshot reading mechanism.

    Lock-free #

    The MySQL CDC source use incremental snapshot algorithm, which avoid acquiring global read lock (FLUSH TABLES WITH READ LOCK) and thus doesn’t need RELOAD permission.

    MySQL High Availability Support #

    The mysql-cdc connector offers high availability of MySQL high available cluster by using the GTID information. To obtain the high availability, the MySQL cluster need enable the GTID mode, the GTID mode in your mysql config file should contain following settings:

    gtid_mode = on
    enforce_gtid_consistency = on
    

    If the monitored MySQL server address contains slave instance, you need set following settings to the MySQL conf file. The setting log-slave-updates = 1 enables the slave instance to also write the data that synchronized from master to its binlog, this makes sure that the mysql-cdc connector can consume entire data from the slave instance.

    gtid_mode = on
    enforce_gtid_consistency = on
    log-slave-updates = 1
    

    After the server you monitored fails in MySQL cluster, you only need to change the monitored server address to other available server and then restart the job from the latest checkpoint/savepoint, the job will restore from the checkpoint/savepoint and won’t miss any records.

    It’s recommended to configure a DNS(Domain Name Service) or VIP(Virtual IP Address) for your MySQL cluster, using the DNS or VIP address for mysql-cdc connector, the DNS or VIP would automatically route the network request to the active MySQL server. In this way, you don’t need to modify the address and restart your pipeline anymore.

    MySQL Heartbeat Event Support #

    If the table updates infrequently, the binlog file or GTID set may have been cleaned in its last committed binlog position. The CDC job may restart fails in this case. So the heartbeat event will help update binlog position. By default heartbeat event is enabled in MySQL CDC source and the interval is set to 30 seconds. You can specify the interval by using table option heartbeat.interval, or set the option to 0s to disable heartbeat events.

    How Incremental Snapshot Reading works #

    When the MySQL CDC source is started, it reads snapshot of table parallelly and then reads binlog of table with single parallelism.

    In snapshot phase, the snapshot is cut into multiple snapshot chunks according to chunk key of table and the size of table rows. Snapshot chunks is assigned to multiple snapshot readers. Each snapshot reader reads its received chunks with chunk reading algorithm and send the read data to downstream. The source manages the process status (finished or not) of chunks, thus the source of snapshot phase can support checkpoint in chunk level. If a failure happens, the source can be restored and continue to read chunks from last finished chunks.

    After all snapshot chunks finished, the source will continue to read binlog in a single task. In order to guarantee the global data order of snapshot records and binlog records, binlog reader will start to read data until there is a complete checkpoint after snapshot chunks finished to make sure all snapshot data has been consumed by downstream. The binlog reader tracks the consumed binlog position in state, thus source of binlog phase can support checkpoint in row level.

    Flink performs checkpoints for the source periodically, in case of failover, the job will restart and restore from the last successful checkpoint state and guarantees the exactly once semantic.

    Snapshot Chunk Splitting #

    When performing incremental snapshot reading, MySQL CDC source need a criterion which used to split the table. MySQL CDC Source use a splitting column to split the table to multiple splits (chunks). By default, MySQL CDC source will identify the primary key column of the table and use the first column in primary key as the splitting column. If there is no primary key in the table, user must specify scan.incremental.snapshot.chunk.key-column, otherwise incremental snapshot reading will fail and you can disable scan.incremental.snapshot.enabled to fallback to old snapshot reading mechanism. Please note that using a column not in primary key as a chunk key can result in slower table query performance.

    For numeric and auto incremental splitting column, MySQL CDC Source efficiently splits chunks by fixed step length. For example, if you had a table with a primary key column of id which is auto-incremental BIGINT type, the minimum value was 0 and maximum value was 100, and the table option scan.incremental.snapshot.chunk.size value is 25, the table would be split into following chunks:

     (-∞, 25),
     [25, 50),
     [50, 75),
     [75, 100),
     [100, +∞)
    

    For other primary key column type, MySQL CDC Source executes the statement in the form of SELECT MAX(STR_ID) AS chunk_high FROM (SELECT * FROM TestTable WHERE STR_ID > 'uuid-001' limit 25) to get the low and high value for each chunk, the splitting chunks set would be like:

    (-∞, 'uuid-001'),
    ['uuid-001', 'uuid-009'),
    ['uuid-009', 'uuid-abc'),
    ['uuid-abc', 'uuid-def'),
    [uuid-def, +∞).
    
    Chunk Reading Algorithm #

    For above example MyTable, if the MySQL CDC Source parallelism was set to 4, MySQL CDC Source would run 4 readers which each executes Offset Signal Algorithm to get a final consistent output of the snapshot chunk. The Offset Signal Algorithm simply describes as following:

    • (1) Record current binlog position as LOW offset
    • (2) Read and buffer the snapshot chunk records by executing statement SELECT * FROM MyTable WHERE id > chunk_low AND id <= chunk_high
    • (3) Record current binlog position as HIGH offset
    • (4) Read the binlog records that belong to the snapshot chunk from LOW offset to HIGH offset
    • (5) Upsert the read binlog records into the buffered chunk records, and emit all records in the buffer as final output (all as INSERT records) of the snapshot chunk
    • (6) Continue to read and emit binlog records belong to the chunk after the HIGH offset in single binlog reader.

    The algorithm is inspired by DBLog Paper, please refer it for more detail.

    Note: If the actual values for the primary key are not uniformly distributed across its range, this may lead to unbalanced tasks when incremental snapshot read.

    Exactly-Once Processing #

    The MySQL CDC connector is a Flink Source connector which will read table snapshot chunks first and then continues to read binlog, both snapshot phase and binlog phase, MySQL CDC connector read with exactly-once processing even failures happen.

    Startup Reading Position #

    The config option scan.startup.mode specifies the startup mode for MySQL CDC consumer. The valid enumerations are:

    • initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
    • earliest-offset: Skip snapshot phase and start reading binlog events from the earliest accessible binlog offset.
    • latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.
    • specific-offset: Skip snapshot phase and start reading binlog events from a specific offset. The offset could be specified with binlog filename and position, or a GTID set if GTID is enabled on server.
    • timestamp: Skip snapshot phase and start reading binlog events from a specific timestamp.
    • snapshot: Only the snapshot phase is performed and exits after the snapshot phase reading is completed.

    For example in DataStream API:

    MySQLSource.builder()
        .startupOptions(StartupOptions.earliest()) // Start from earliest offset
        .startupOptions(StartupOptions.latest()) // Start from latest offset
        .startupOptions(StartupOptions.specificOffset("mysql-bin.000003", 4L) // Start from binlog file and offset
        .startupOptions(StartupOptions.specificOffset("24DA167-0C0C-11E8-8442-00059A3C7B00:1-19")) // Start from GTID set
        .startupOptions(StartupOptions.timestamp(1667232000000L) // Start from timestamp
        .startupOptions(StartupOptions.snapshot()) // Read snapshot only
        ...
        .build()
    

    and with SQL:

    CREATE TABLE mysql_source (...) WITH (
        'connector' = 'mysql-cdc',
        'scan.startup.mode' = 'earliest-offset', -- Start from earliest offset
        'scan.startup.mode' = 'latest-offset', -- Start from latest offset
        'scan.startup.mode' = 'specific-offset', -- Start from specific offset
        'scan.startup.mode' = 'timestamp', -- Start from timestamp
        'scan.startup.mode' = 'snapshot', -- Read snapshot only
        'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- Binlog filename under specific offset startup mode
        'scan.startup.specific-offset.pos' = '4', -- Binlog position under specific offset mode
        'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- GTID set under specific offset startup mode
        'scan.startup.timestamp-millis' = '1667232000000' -- Timestamp under timestamp startup mode
        ...
    )
    

    Notes:

    1. MySQL source will print the current binlog position into logs with INFO level on checkpoint, with the prefix “Binlog offset on checkpoint {checkpoint-id}”. It could be useful if you want to restart the job from a specific checkpointed position.
    2. If schema of capturing tables was changed previously, starting with earliest offset, specific offset or timestamp could fail as the Debezium reader keeps the current latest table schema internally and earlier records with unmatched schema cannot be correctly parsed.

    DataStream Source #

    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
    import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
    
    public class MySqlSourceExample {
      public static void main(String[] args) throws Exception {
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
            .hostname("yourHostname")
            .port(yourPort)
            .databaseList("yourDatabaseName") // set captured database, If you need to synchronize the whole database, Please set tableList to ".*".
            .tableList("yourDatabaseName.yourTableName") // set captured table
            .username("yourUsername")
            .password("yourPassword")
            .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
            .build();
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        // enable checkpoint
        env.enableCheckpointing(3000);
    
        env
          .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
          // set 4 parallel source tasks
          .setParallelism(4)
          .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
    
        env.execute("Print MySQL Snapshot + Binlog");
      }
    }
    

    Scan Newly Added Tables #

    Scan Newly Added Tables feature enables you add new tables to monitor for existing running pipeline, the newly added tables will read theirs snapshot data firstly and then read their changelog automatically.

    Imaging this scenario: At the beginning, a Flink job monitor tables [product, user, address], but after some days we would like the job can also monitor tables [order, custom] which contains history data, and we need the job can still reuse existing state of the job, this feature can resolve this case gracefully.

    The following operations show how to enable this feature to resolve above scenario. An existing Flink job which uses CDC Source like:

        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
            .hostname("yourHostname")
            .port(yourPort)
            .scanNewlyAddedTableEnabled(true) // enable scan the newly added tables feature
            .databaseList("db") // set captured database
            .tableList("db.product, db.user, db.address") // set captured tables [product, user, address]
            .username("yourUsername")
            .password("yourPassword")
            .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
            .build();
       // your business code
    

    If we would like to add new tables [order, custom] to an existing Flink job,just need to update the tableList() value of the job to include [order, custom] and restore the job from previous savepoint.

    Step 1: Stop the existing Flink job with savepoint.

    $ ./bin/flink stop $Existing_Flink_JOB_ID
    
    Suspending job "cca7bc1061d61cf15238e92312c2fc20" with a savepoint.
    Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab
    

    Step 2: Update the table list option for the existing Flink job .

    1. update tableList() value.
    2. build the jar of updated job.
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
            .hostname("yourHostname")
            .port(yourPort)
            .scanNewlyAddedTableEnabled(true) 
            .databaseList("db") 
            .tableList("db.product, db.user, db.address, db.order, db.custom") // set captured tables [product, user, address ,order, custom]
            .username("yourUsername")
            .password("yourPassword")
            .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
            .build();
       // your business code
    

    Step 3: Restore the updated Flink job from savepoint.

    $ ./bin/flink run \
          --detached \ 
          --from-savepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
          ./FlinkCDCExample.jar
    

    Note: Please refer the doc Restore the job from previous savepoint for more details.

    Tables Without primary keys #

    Starting from version 2.4.0, MySQL CDC support tables that do not have a primary key. To use a table without primary keys, you must configure the scan.incremental.snapshot.chunk.key-column option and specify one non-null field.

    There are two places that need to be taken care of.

    1. If there is an index in the table, try to use a column which is contained in the index in scan.incremental.snapshot.chunk.key-column. This will increase the speed of select statement.
    2. The processing semantics of a MySQL CDC table without primary keys is determined based on the behavior of the column that are specified by the scan.incremental.snapshot.chunk.key-column.
    • If no update operation is performed on the specified column, the exactly-once semantics is ensured.
    • If the update operation is performed on the specified column, only the at-least-once semantics is ensured. However, you can specify primary keys at downstream and perform the idempotence operation to ensure data correctness.

    About converting binary type data to base64 encoded data #

    CREATE TABLE products (
        db_name STRING METADATA FROM 'database_name' VIRTUAL,
        table_name STRING METADATA  FROM 'table_name' VIRTUAL,
        operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
        order_id INT,
        order_date TIMESTAMP(0),
        customer_name STRING,
        price DECIMAL(10, 5),
        product_id INT,
        order_status BOOLEAN,
        binary_data STRING,
        PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = 'localhost',
        'port' = '3306',
        'username' = 'root',
        'password' = '123456',
        'database-name' = 'test_db',
        'table-name' = 'test_tb',
        'debezium.binary.handling.mode' = 'base64'
    );
    

    binary_data field in the database is of type VARBINARY(N). In some scenarios, we need to convert binary data to base64 encoded string data. This feature can be enabled by adding the parameter ‘debezium.binary.handling.mode’=‘base64’, By adding this parameter, we can map the binary field type to ‘STRING’ in Flink SQL, thereby obtaining base64 encoded string data.

    Data Type Mapping #

    MySQL type Flink SQL type NOTE
    TINYINT TINYINT
    SMALLINT
    TINYINT UNSIGNED
    TINYINT UNSIGNED ZEROFILL
    SMALLINT
    INT
    MEDIUMINT
    SMALLINT UNSIGNED
    SMALLINT UNSIGNED ZEROFILL
    INT
    BIGINT
    INT UNSIGNED
    INT UNSIGNED ZEROFILL
    MEDIUMINT UNSIGNED
    MEDIUMINT UNSIGNED ZEROFILL
    BIGINT
    BIGINT UNSIGNED
    BIGINT UNSIGNED ZEROFILL
    SERIAL
    DECIMAL(20, 0)
    FLOAT
    FLOAT UNSIGNED
    FLOAT UNSIGNED ZEROFILL
    FLOAT
    REAL
    REAL UNSIGNED
    REAL UNSIGNED ZEROFILL
    DOUBLE
    DOUBLE UNSIGNED
    DOUBLE UNSIGNED ZEROFILL
    DOUBLE PRECISION
    DOUBLE PRECISION UNSIGNED
    DOUBLE PRECISION UNSIGNED ZEROFILL
    DOUBLE
    NUMERIC(p, s)
    NUMERIC(p, s) UNSIGNED
    NUMERIC(p, s) UNSIGNED ZEROFILL
    DECIMAL(p, s)
    DECIMAL(p, s) UNSIGNED
    DECIMAL(p, s) UNSIGNED ZEROFILL
    FIXED(p, s)
    FIXED(p, s) UNSIGNED
    FIXED(p, s) UNSIGNED ZEROFILL
    where p <= 38
    DECIMAL(p, s)
    NUMERIC(p, s)
    NUMERIC(p, s) UNSIGNED
    NUMERIC(p, s) UNSIGNED ZEROFILL
    DECIMAL(p, s)
    DECIMAL(p, s) UNSIGNED
    DECIMAL(p, s) UNSIGNED ZEROFILL
    FIXED(p, s)
    FIXED(p, s) UNSIGNED
    FIXED(p, s) UNSIGNED ZEROFILL
    where 38 < p <= 65
    STRING The precision for DECIMAL data type is up to 65 in MySQL, but the precision for DECIMAL is limited to 38 in Flink. So if you define a decimal column whose precision is greater than 38, you should map it to STRING to avoid precision loss.
    BOOLEAN
    TINYINT(1)
    BIT(1)
    BOOLEAN
    DATE DATE
    TIME [(p)] TIME [(p)]
    TIMESTAMP [(p)]
    DATETIME [(p)]
    TIMESTAMP [(p)]
    CHAR(n) CHAR(n)
    VARCHAR(n) VARCHAR(n)
    BIT(n) BINARY(⌈(n + 7) / 8⌉)
    BINARY(n) BINARY(n)
    VARBINARY(N) VARBINARY(N)
    TINYTEXT
    TEXT
    MEDIUMTEXT
    LONGTEXT
    STRING
    TINYBLOB
    BLOB
    MEDIUMBLOB
    LONGBLOB
    BYTES Currently, for BLOB data type in MySQL, only the blob whose length isn't greater than 2,147,483,647(2 ** 31 - 1) is supported.
    YEAR INT
    ENUM STRING
    JSON STRING The JSON data type will be converted into STRING with JSON format in Flink.
    SET ARRAY<STRING> As the SET data type in MySQL is a string object that can have zero or more values, it should always be mapped to an array of string
    GEOMETRY
    POINT
    LINESTRING
    POLYGON
    MULTIPOINT
    MULTILINESTRING
    MULTIPOLYGON
    GEOMETRYCOLLECTION
    STRING The spatial data types in MySQL will be converted into STRING with a fixed Json format. Please see MySQL Spatial Data Types Mapping section for more detailed information.

    MySQL Spatial Data Types Mapping #

    The spatial data types except for GEOMETRYCOLLECTION in MySQL will be converted into Json String with a fixed format like:

    {"srid": 0 , "type": "xxx", "coordinates": [0, 0]}
    

    The field srid identifies the SRS in which the geometry is defined, SRID 0 is the default for new geometry values if no SRID is specified. As only MySQL 8+ support to specific SRID when define spatial data type, the field srid will always be 0 in MySQL with a lower version.

    The field type identifies the spatial data type, such as POINT/LINESTRING/POLYGON.

    The field coordinates represents the coordinates of the spatial data.

    For GEOMETRYCOLLECTION, it will be converted into Json String with a fixed format like:

    {"srid": 0 , "type": "GeometryCollection", "geometries": [{"type":"Point","coordinates":[10,10]}]}
    

    The field geometries is an array contains all spatial data.

    The example for different spatial data types mapping is as follows:

    Spatial data in MySQL Json String converted in Flink
    POINT(1 1) {"coordinates":[1,1],"type":"Point","srid":0}
    LINESTRING(3 0, 3 3, 3 5) {"coordinates":[[3,0],[3,3],[3,5]],"type":"LineString","srid":0}
    POLYGON((1 1, 2 1, 2 2, 1 2, 1 1)) {"coordinates":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],"type":"Polygon","srid":0}
    MULTIPOINT((1 1),(2 2)) {"coordinates":[[1,1],[2,2]],"type":"MultiPoint","srid":0}
    MultiLineString((1 1,2 2,3 3),(4 4,5 5)) {"coordinates":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],"type":"MultiLineString","srid":0}
    MULTIPOLYGON(((0 0, 10 0, 10 10, 0 10, 0 0)), ((5 5, 7 5, 7 7, 5 7, 5 5))) {"coordinates":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],"type":"MultiPolygon","srid":0}
    GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20)) {"geometries":[{"type":"Point","coordinates":[10,10]},{"type":"Point","coordinates":[30,30]},{"type":"LineString","coordinates":[[15,15],[20,20]]}],"type":"GeometryCollection","srid":0}

    Back to top