This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.
Kafka Pipeline Connector #
The Kafka Pipeline connector can be used as the Data Sink of the pipeline, and write data to Kafka. This document describes how to set up the Kafka Pipeline connector.
What can the connector do? #
- Data synchronization
How to create Pipeline #
The pipeline for reading data from MySQL and sink to Kafka can be defined as follows:
source:
type: mysql
name: MySQL Source
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: PLAINTEXT://localhost:62510
pipeline:
name: MySQL to Kafka Pipeline
parallelism: 2
Pipeline Connector Options #
Option | Required | Default | Type | Description |
---|---|---|---|---|
type | required | (none) | String | Specify what connector to use, here should be 'kafka' . |
name | optional | (none) | String | The name of the sink. |
partition.strategy | optional | (none) | String | Defines the strategy for sending record to kafka topic, available options are `all-to-zero`(sending all records to 0 partition) and `hash-by-key`(distributing all records by hash of primary keys), default option is `all-to-zero`. |
key.format | optional | (none) | String | Defines the format identifier for encoding key data, available options are `csv` and `json`, default option is `json`. |
value.format | optional | (none) | String | The format used to serialize the value part of Kafka messages. Available options are debezium-json and canal-json, default option is `debezium-json`, and do not support user-defined format now. |
properties.bootstrap.servers | required | (none) | String | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. |
topic | optional | (none) | String | If this parameter is configured, all events will be sent to this topic. |
sink.add-tableId-to-header-enabled | optional | (none) | Boolean | If this parameter is true, a header with key of 'namespace','schemaName','tableName' will be added for each Kafka record. Default value is false. |
properties.* | optional | (none) | String | Pass options of Kafka table to pipeline,See Kafka consume options. |
sink.custom-header | optional | (none) | String | custom headers for each kafka record. Each header are separated by ',', separate key and value by ':'. For example, we can set headers like 'key1:value1,key2:value2'. |
Usage Notes #
-
The written topic of Kafka will be
namespace.schemaName.tableName
string of TableId,this can be changed using route function of pipeline. -
If the written topic of Kafka is not existed, we will create one automatically.
Output Format #
For different built-in value.format
options, the output format is different:
debezium-json #
Refer to Debezium docs, debezium-json format will contains before
,after
,op
,source
elements, but ts_ms
is not included in source
.
An output example is:
{
"before": null,
"after": {
"col1": "1",
"col2": "1"
},
"op": "c",
"source": {
"db": "default_namespace",
"table": "table1"
}
}
canal-json #
Refer to Canal | Apache Flink, canal-json format will contains old
,data
,type
,database
,table
,pkNames
elements, but ts
is not included.
An output example is:
{
"old": null,
"data": [
{
"col1": "1",
"col2": "1"
}
],
"type": "INSERT",
"database": "default_schema",
"table": "table1",
"pkNames": [
"col1"
]
}
Data Type Mapping #
CDC type | JSON type | NOTE |
---|---|---|
TINYINT | TINYINT | |
SMALLINT | SMALLINT | |
INT | INT | |
BIGINT | BIGINT | |
FLOAT | FLOAT | |
DOUBLE | DOUBLE | |
DECIMAL(p, s) | DECIMAL(p, s) | |
BOOLEAN | BOOLEAN | |
DATE | DATE | |
TIMESTAMP | TIMESTAMP | |
TIMESTAMP_LTZ | TIMESTAMP_LTZ | |
CHAR(n) | CHAR(n) | |
VARCHAR(n) | VARCHAR(n) |