TiDB
This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.

TiDB CDC Connector #

The TiDB CDC connector allows for reading snapshot data and incremental data from TiDB database. This document describes how to setup the TiDB CDC connector to run SQL queries against TiDB databases.

Dependencies #

In order to setup the TiDB CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependency #

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-tidb-cdc</artifactId>
    <version>3.3-SNAPSHOT</version>
</dependency>

SQL Client JAR #

Download link is available only for stable releases.

Download flink-sql-connector-tidb-cdc-3.3-SNAPSHOT.jar and put it under <FLINK_HOME>/lib/.

Note: Refer to flink-sql-connector-tidb-cdc, more released versions will be available in the Maven central warehouse.

How to create a TiDB CDC table #

The TiDB CDC table can be defined as following:

-- checkpoint every 3000 milliseconds                       
Flink SQL> SET 'execution.checkpointing.interval' = '3s';   

-- register a TiDB table 'orders' in Flink SQL
Flink SQL> CREATE TABLE orders (
     order_id INT,
     order_date TIMESTAMP(3),
     customer_name STRING,
     price DECIMAL(10, 5),
     product_id INT,
     order_status BOOLEAN,
     PRIMARY KEY(order_id) NOT ENFORCED
     ) WITH (
     'connector' = 'tidb-cdc',
     'tikv.grpc.timeout_in_ms' = '20000', 
     'pd-addresses' = 'localhost:2379',
     'database-name' = 'mydb',
     'table-name' = 'orders'
);
  
-- read snapshot and binlogs from orders table
Flink SQL> SELECT * FROM orders;

Connector Options #

Option Required Default Type Description
connector required (none) String Specify what connector to use, here should be 'tidb-cdc'.
database-name required (none) String Database name of the TiDB server to monitor.
table-name required (none) String Table name of the TiDB database to monitor.
scan.startup.mode optional initial String Optional startup mode for TiDB CDC consumer, valid enumerations are "initial" and "latest-offset".
pd-addresses required (none) String TiKV cluster's PD address.
host-mapping optional (none) String TiKV cluster's host-mapping used to configure public IP and intranet IP mapping. When the TiKV cluster is running on the intranet, you can map a set of intranet IPs to public IPs for an outside Flink cluster to access. The format is {Intranet IP1}:{Public IP1};{Intranet IP2}:{Public IP2}, e.g. 192.168.0.2:8.8.8.8;192.168.0.3:9.9.9.9.
tikv.grpc.timeout_in_ms optional (none) Long TiKV GRPC timeout in ms.
tikv.grpc.scan_timeout_in_ms optional (none) Long TiKV GRPC scan timeout in ms.
tikv.batch_get_concurrency optional 20 Integer TiKV GRPC batch get concurrency.
tikv.* optional (none) String Pass-through TiDB client's properties.

Available Metadata #

The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.

Key DataType Description
table_name STRING NOT NULL Name of the table that contain the row.
database_name STRING NOT NULL Name of the database that contain the row.
op_ts TIMESTAMP_LTZ(3) NOT NULL It indicates the time that the change was made in the database.
If the record is read from snapshot of the table instead of the binlog, the value is always 0.

The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:

CREATE TABLE products (
    db_name STRING METADATA FROM 'database_name' VIRTUAL,
    table_name STRING METADATA  FROM 'table_name' VIRTUAL,
    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    order_id INT,
    order_date TIMESTAMP(0),
    customer_name STRING,
    price DECIMAL(10, 5),
    product_id INT,
    order_status BOOLEAN,
    PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
    'connector' = 'tidb-cdc',
    'tikv.grpc.timeout_in_ms' = '20000',
    'pd-addresses' = 'localhost:2379',
    'database-name' = 'mydb',
    'table-name' = 'orders'
);

Features #

Exactly-Once Processing #

The TiDB CDC connector is a Flink Source connector which will read database snapshot first and then continues to read change events with exactly-once processing even failures happen.

Startup Reading Position #

The config option scan.startup.mode specifies the startup mode for TiDB CDC consumer. The valid enumerations are:

  • initial (default): Takes a snapshot of structure and data of captured tables; useful if you want fetch a complete representation of the data from the captured tables.
  • latest-offset: Takes a snapshot of the structure of captured tables only; useful if only changes happening from now onwards should be fetched.

Multi Thread Reading #

The TiDB CDC source can work in parallel reading, because there is multiple tasks can receive change events.

DataStream Source #

The TiDB CDC connector can also be a DataStream source. You can create a SourceFunction as the following shows:

DataStream Source #

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

import org.apache.flink.cdc.connectors.tidb.TDBSourceOptions;
import org.apache.flink.cdc.connectors.tidb.TiDBSource;
import org.apache.flink.cdc.connectors.tidb.TiKVChangeEventDeserializationSchema;
import org.apache.flink.cdc.connectors.tidb.TiKVSnapshotEventDeserializationSchema;
import org.tikv.kvproto.Cdcpb;
import org.tikv.kvproto.Kvrpcpb;

import java.util.HashMap;

public class TiDBSourceExample {

    public static void main(String[] args) throws Exception {

        SourceFunction<String> tidbSource =
            TiDBSource.<String>builder()
                .database("mydb") // set captured database
                .tableName("products") // set captured table
                .tiConf(
                    TDBSourceOptions.getTiConfiguration(
                        "localhost:2399", new HashMap<>()))
                .snapshotEventDeserializer(
                    new TiKVSnapshotEventDeserializationSchema<String>() {
                        @Override
                        public void deserialize(
                            Kvrpcpb.KvPair record, Collector<String> out)
                            throws Exception {
                            out.collect(record.toString());
                        }

                        @Override
                        public TypeInformation<String> getProducedType() {
                            return BasicTypeInfo.STRING_TYPE_INFO;
                        }
                    })
                .changeEventDeserializer(
                    new TiKVChangeEventDeserializationSchema<String>() {
                        @Override
                        public void deserialize(
                            Cdcpb.Event.Row record, Collector<String> out)
                            throws Exception {
                            out.collect(record.toString());
                        }

                        @Override
                        public TypeInformation<String> getProducedType() {
                            return BasicTypeInfo.STRING_TYPE_INFO;
                        }
                    })
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // enable checkpoint
        env.enableCheckpointing(3000);
        env.addSource(tidbSource).print().setParallelism(1);

        env.execute("Print TiDB Snapshot + Binlog");
    }
}

Data Type Mapping #

TiDB type Flink SQL type NOTE
TINYINT TINYINT
SMALLINT
TINYINT UNSIGNED
SMALLINT
INT
MEDIUMINT
SMALLINT UNSIGNED
INT
BIGINT
INT UNSIGNED
BIGINT
BIGINT UNSIGNED DECIMAL(20, 0)
FLOAT
FLOAT
REAL
DOUBLE
DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
where p <= 38
DECIMAL(p, s)
NUMERIC(p, s)
DECIMAL(p, s)
where 38 < p <= 65
STRING The precision for DECIMAL data type is up to 65 in TiDB, but the precision for DECIMAL is limited to 38 in Flink. So if you define a decimal column whose precision is greater than 38, you should map it to STRING to avoid precision loss.
BOOLEAN
TINYINT(1)
BIT(1)
BOOLEAN
DATE DATE
TIME [(p)] TIME [(p)]
TIMESTAMP [(p)] TIMESTAMP_LTZ [(p)]
DATETIME [(p)] TIMESTAMP [(p)]
CHAR(n) CHAR(n)
VARCHAR(n) VARCHAR(n)
BIT(n) BINARY(⌈n/8⌉)
BINARY(n) BINARY(n)
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
STRING
TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
BYTES Currently, for BLOB data type in TiDB, only the blob whose length isn't greater than 2,147,483,647(2 ** 31 - 1) is supported.
YEAR INT
ENUM STRING
JSON STRING The JSON data type will be converted into STRING with JSON format in Flink.
SET ARRAY<STRING> As the SET data type in TiDB is a string object that can have zero or more values, it should always be mapped to an array of string

Back to top