Kafka Pipeline Connector #
The Kafka Pipeline connector can be used as the Data Sink of the pipeline, and write data to Kafka. This document describes how to set up the Kafka Pipeline connector.
What can the connector do? #
- Data synchronization
How to create Pipeline #
The pipeline for reading data from MySQL and sink to Kafka can be defined as follows:
source:
type: mysql
name: MySQL Source
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: PLAINTEXT://localhost:62510
pipeline:
name: MySQL to Kafka Pipeline
parallelism: 2
Pipeline Connector Options #
Option | Required | Default | Type | Description |
---|---|---|---|---|
type | required | (none) | String | Specify what connector to use, here should be 'kafka' . |
name | optional | (none) | String | The name of the sink. |
value.format | optional | (none) | String | The format used to serialize the value part of Kafka messages. Available options are debezium-json and canal-json, default option is `debezium-json`, and do not support user-defined format now. |
properties.bootstrap.servers | required | (none) | String | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. |
topic | optional | (none) | String | If this parameter is configured, all events will be sent to this topic. |
sink.add-tableId-to-header-enabled | optional | (none) | Boolean | If this parameter is true, a header with key of 'namespace','schemaName','tableName' will be added for each Kafka record. Default value is false. |
properties.* | optional | (none) | String | Pass options of Kafka table to pipeline,See Kafka consume options. |
sink.custom-header | optional | (none) | String | custom headers for each kafka record. Each header are separated by ',', separate key and value by ':'. For example, we can set headers like 'key1:value1,key2:value2'. |
Usage Notes #
-
The written topic of Kafka will be
namespace.schemaName.tableName
string of TableId,this can be changed using route function of pipeline. -
If the written topic of Kafka is not existed, we will create one automatically.
Data Type Mapping #
CDC type | JSON type | NOTE |
---|---|---|
TINYINT | TINYINT | |
SMALLINT | SMALLINT | |
INT | INT | |
BIGINT | BIGINT | |
FLOAT | FLOAT | |
DOUBLE | DOUBLE | |
DECIMAL(p, s) | DECIMAL(p, s) | |
BOOLEAN | BOOLEAN | |
DATE | DATE | |
TIMESTAMP | TIMESTAMP | |
TIMESTAMP_LTZ | TIMESTAMP_LTZ | |
CHAR(n) | CHAR(n) | |
VARCHAR(n) | VARCHAR(n) |