This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.
TiDB CDC Connector #
The TiDB CDC connector allows for reading snapshot data and incremental data from TiDB database. This document describes how to setup the TiDB CDC connector to run SQL queries against TiDB databases.
Dependencies #
In order to setup the TiDB CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
Maven dependency #
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-tidb-cdc</artifactId>
<version>3.3-SNAPSHOT</version>
</dependency>
SQL Client JAR #
Download link is available only for stable releases.
Download flink-sql-connector-tidb-cdc-3.1.0.jar and put it under <FLINK_HOME>/lib/
.
Note: Refer to flink-sql-connector-tidb-cdc, more released versions will be available in the Maven central warehouse.
How to create a TiDB CDC table #
The TiDB CDC table can be defined as following:
-- checkpoint every 3000 milliseconds
Flink SQL> SET 'execution.checkpointing.interval' = '3s';
-- register a TiDB table 'orders' in Flink SQL
Flink SQL> CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(3),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'tidb-cdc',
'tikv.grpc.timeout_in_ms' = '20000',
'pd-addresses' = 'localhost:2379',
'database-name' = 'mydb',
'table-name' = 'orders'
);
-- read snapshot and binlogs from orders table
Flink SQL> SELECT * FROM orders;
Connector Options #
Option | Required | Default | Type | Description |
---|---|---|---|---|
connector | required | (none) | String | Specify what connector to use, here should be 'tidb-cdc' . |
database-name | required | (none) | String | Database name of the TiDB server to monitor. |
table-name | required | (none) | String | Table name of the TiDB database to monitor. |
scan.startup.mode | optional | initial | String | Optional startup mode for TiDB CDC consumer, valid enumerations are "initial" and "latest-offset". |
pd-addresses | required | (none) | String | TiKV cluster's PD address. |
host-mapping | optional | (none) | String | TiKV cluster's host-mapping used to configure public IP and intranet IP mapping. When the TiKV cluster is running on the intranet, you can map a set of intranet IPs to public IPs for an outside Flink cluster to access. The format is {Intranet IP1}:{Public IP1};{Intranet IP2}:{Public IP2}, e.g. 192.168.0.2:8.8.8.8;192.168.0.3:9.9.9.9. |
tikv.grpc.timeout_in_ms | optional | (none) | Long | TiKV GRPC timeout in ms. |
tikv.grpc.scan_timeout_in_ms | optional | (none) | Long | TiKV GRPC scan timeout in ms. |
tikv.batch_get_concurrency | optional | 20 | Integer | TiKV GRPC batch get concurrency. |
tikv.* | optional | (none) | String | Pass-through TiDB client's properties. |
Available Metadata #
The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.
Key | DataType | Description |
---|---|---|
table_name | STRING NOT NULL | Name of the table that contain the row. |
database_name | STRING NOT NULL | Name of the database that contain the row. |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | It indicates the time that the change was made in the database. If the record is read from snapshot of the table instead of the binlog, the value is always 0. |
The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:
CREATE TABLE products (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'tidb-cdc',
'tikv.grpc.timeout_in_ms' = '20000',
'pd-addresses' = 'localhost:2379',
'database-name' = 'mydb',
'table-name' = 'orders'
);
Features #
Exactly-Once Processing #
The TiDB CDC connector is a Flink Source connector which will read database snapshot first and then continues to read change events with exactly-once processing even failures happen.
Startup Reading Position #
The config option scan.startup.mode
specifies the startup mode for TiDB CDC consumer. The valid enumerations are:
initial
(default): Takes a snapshot of structure and data of captured tables; useful if you want fetch a complete representation of the data from the captured tables.latest-offset
: Takes a snapshot of the structure of captured tables only; useful if only changes happening from now onwards should be fetched.
Multi Thread Reading #
The TiDB CDC source can work in parallel reading, because there is multiple tasks can receive change events.
DataStream Source #
The TiDB CDC connector can also be a DataStream source. You can create a SourceFunction as the following shows:
DataStream Source #
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.cdc.connectors.tidb.TDBSourceOptions;
import org.apache.flink.cdc.connectors.tidb.TiDBSource;
import org.apache.flink.cdc.connectors.tidb.TiKVChangeEventDeserializationSchema;
import org.apache.flink.cdc.connectors.tidb.TiKVSnapshotEventDeserializationSchema;
import org.tikv.kvproto.Cdcpb;
import org.tikv.kvproto.Kvrpcpb;
import java.util.HashMap;
public class TiDBSourceExample {
public static void main(String[] args) throws Exception {
SourceFunction<String> tidbSource =
TiDBSource.<String>builder()
.database("mydb") // set captured database
.tableName("products") // set captured table
.tiConf(
TDBSourceOptions.getTiConfiguration(
"localhost:2399", new HashMap<>()))
.snapshotEventDeserializer(
new TiKVSnapshotEventDeserializationSchema<String>() {
@Override
public void deserialize(
Kvrpcpb.KvPair record, Collector<String> out)
throws Exception {
out.collect(record.toString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
})
.changeEventDeserializer(
new TiKVChangeEventDeserializationSchema<String>() {
@Override
public void deserialize(
Cdcpb.Event.Row record, Collector<String> out)
throws Exception {
out.collect(record.toString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
})
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
env.addSource(tidbSource).print().setParallelism(1);
env.execute("Print TiDB Snapshot + Binlog");
}
}