Flink CDC sources #
Flink CDC sources is a set of source connectors for Apache Flink®, ingesting changes from different databases using change data capture (CDC). Some CDC sources integrate Debezium as the engine to capture data changes. So it can fully leverage the ability of Debezium. See more about what is Debezium.
You can also read tutorials about how to use these sources.
Supported Connectors #
Connector | Database | Driver |
---|---|---|
mongodb-cdc | MongoDB Driver: 4.11.2 | |
mysql-cdc | JDBC Driver: 8.0.28 | |
oceanbase-cdc | OceanBase Driver: 2.4.x | |
oracle-cdc | Oracle Driver: 19.3.0.0 | |
postgres-cdc | JDBC Driver: 42.5.1 | |
sqlserver-cdc | JDBC Driver: 9.4.1.jre8 | |
tidb-cdc | JDBC Driver: 8.0.27 | |
db2-cdc | Db2 Driver: 11.5.0.0 | |
vitess-cdc | MySql JDBC Driver: 8.0.26 |
Supported Flink Versions #
The following table shows the version mapping between Flink® CDC Connectors and Flink®:
Flink® CDC Version | Flink® Version |
---|---|
1.0.0 | 1.11.* |
1.1.0 | 1.11.* |
1.2.0 | 1.12.* |
1.3.0 | 1.12.* |
1.4.0 | 1.13.* |
2.0.* | 1.13.* |
2.1.* | 1.13.* |
2.2.* | 1.13.*, 1.14.* |
2.3.* | 1.13.*, 1.14.*, 1.15.*, 1.16.* |
2.4.* | 1.13.*, 1.14.*, 1.15.*, 1.16.*, 1.17.* |
3.0.* | 1.14.*, 1.15.*, 1.16.*, 1.17.*, 1.18.* |
3.1.* | 1.16.*, 1.17.*, 1.18.*, 1.19.* |
3.2.* | 1.17.*, 1.18.*, 1.19.*, 1.20.* |
Features #
- Supports reading database snapshot and continues to read binlogs with exactly-once processing even failures happen.
- CDC connectors for DataStream API, users can consume changes on multiple databases and tables in a single job without Debezium and Kafka deployed.
- CDC connectors for Table/SQL API, users can use SQL DDL to create a CDC source to monitor changes on a single table.
The following table shows the current features of the connector:
Connector | No-lock Read | Parallel Read | Exactly-once Read | Incremental Snapshot Read |
---|---|---|---|---|
mongodb-cdc | ✅ | ✅ | ✅ | ✅ |
mysql-cdc | ✅ | ✅ | ✅ | ✅ |
oracle-cdc | ✅ | ✅ | ✅ | ✅ |
postgres-cdc | ✅ | ✅ | ✅ | ✅ |
sqlserver-cdc | ✅ | ✅ | ✅ | ✅ |
oceanbase-cdc | ❌ | ❌ | ❌ | ❌ |
tidb-cdc | ✅ | ❌ | ✅ | ❌ |
db2-cdc | ✅ | ✅ | ✅ | ✅ |
vitess-cdc | ✅ | ❌ | ✅ | ❌ |
Usage for Table/SQL API #
We need several steps to setup a Flink cluster with the provided connector.
- Setup a Flink cluster with version 1.12+ and Java 8+ installed.
- Download the connector SQL jars from the Downloads page (or build yourself).
- Put the downloaded jars under
FLINK_HOME/lib/
. - Restart the Flink cluster.
The example shows how to create a MySQL CDC source in Flink SQL Client and execute queries on it.
-- creates a mysql cdc table source
CREATE TABLE mysql_binlog (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'inventory',
'table-name' = 'products'
);
-- read snapshot and binlog data from mysql, and do some transformation, and show on the client
SELECT id, UPPER(name), description, weight FROM mysql_binlog;
Usage for DataStream API #
Include following Maven dependency (available through Maven Central):
<dependency>
<groupId>org.apache.flink</groupId>
<!-- add the dependency matching your database -->
<artifactId>flink-connector-mysql-cdc</artifactId>
<!-- The dependency is available only for stable releases, SNAPSHOT dependencies need to be built based on master or release branches by yourself. -->
<version>3.0-SNAPSHOT</version>
</dependency>
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
public class MySqlBinlogSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // set captured database
.tableList("yourDatabaseName.yourTableName") // set captured table
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks
.setParallelism(4)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute("Print MySQL Snapshot + Binlog");
}
}
Deserialization #
The following JSON data show the change event in JSON format.
{
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.18
},
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.15
},
"source": {...},
"op": "u", // the operation type, "u" means this this is an update event
"ts_ms": 1589362330904, // the time at which the connector processed the event
"transaction": null
}
Note: Please refer Debezium documentation to know the meaning of each field.
In some cases, users can use the JsonDebeziumDeserializationSchema(true)
Constructor to enabled include schema in the message. Then the Debezium JSON message may look like this:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"default": "flink",
"field": "name"
},
{
"type": "string",
"optional": true,
"field": "description"
},
{
"type": "double",
"optional": true,
"field": "weight"
}
],
"optional": true,
"name": "mysql_binlog_source.inventory_1pzxhca.products.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"default": "flink",
"field": "name"
},
{
"type": "string",
"optional": true,
"field": "description"
},
{
"type": "double",
"optional": true,
"field": "weight"
}
],
"optional": true,
"name": "mysql_binlog_source.inventory_1pzxhca.products.Value",
"field": "after"
},
{
"type": "struct",
"fields": {...},
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "mysql_binlog_source.inventory_1pzxhca.products.Envelope"
},
"payload": {
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.18
},
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.15
},
"source": {...},
"op": "u", // the operation type, "u" means this this is an update event
"ts_ms": 1589362330904, // the time at which the connector processed the event
"transaction": null
}
}
Usually, it is recommended to exclude schema because schema fields makes the messages very verbose which reduces parsing performance.
The JsonDebeziumDeserializationSchema
can also accept custom configuration of JsonConverter
, for example if you want to obtain numeric output for decimal data,
you can construct JsonDebeziumDeserializationSchema
as following:
Map<String, Object> customConverterConfigs = new HashMap<>();
customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric");
JsonDebeziumDeserializationSchema schema =
new JsonDebeziumDeserializationSchema(true, customConverterConfigs);
Building from source #
Prerequisites:
- git
- Maven
- At least Java 8
git clone https://github.com/apache/flink-cdc.git
cd flink-cdc
mvn clean install -DskipTests
The dependencies are now available in your local .m2
repository.