Kafka
This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.

Kafka Pipeline Connector #

The Kafka Pipeline connector can be used as the Data Sink of the pipeline, and write data to Kafka. This document describes how to set up the Kafka Pipeline connector.

What can the connector do? #

  • Data synchronization

How to create Pipeline #

The pipeline for reading data from MySQL and sink to Kafka can be defined as follows:

source:
   type: mysql
   name: MySQL Source
   hostname: 127.0.0.1
   port: 3306
   username: admin
   password: pass
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: PLAINTEXT://localhost:62510

pipeline:
  name: MySQL to Kafka Pipeline
  parallelism: 2

Pipeline Connector Options #

Option Required Default Type Description
type required (none) String Specify what connector to use, here should be 'kafka'.
name optional (none) String The name of the sink.
value.format optional (none) String The format used to serialize the value part of Kafka messages. Available options are debezium-json and canal-json, default option is `debezium-json`, and do not support user-defined format now.
properties.bootstrap.servers required (none) String A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
topic optional (none) String If this parameter is configured, all events will be sent to this topic.
sink.add-tableId-to-header-enabled optional (none) Boolean If this parameter is true, a header with key of 'namespace','schemaName','tableName' will be added for each Kafka record. Default value is false.
properties.* optional (none) String Pass options of Kafka table to pipeline,See Kafka consume options.
sink.custom-header optional (none) String custom headers for each kafka record. Each header are separated by ',', separate key and value by ':'. For example, we can set headers like 'key1:value1,key2:value2'.

Usage Notes #

  • The written topic of Kafka will be namespace.schemaName.tableName string of TableId,this can be changed using route function of pipeline.

  • If the written topic of Kafka is not existed, we will create one automatically.

Data Type Mapping #

CDC type JSON type NOTE
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL(p, s) DECIMAL(p, s)
BOOLEAN BOOLEAN
DATE DATE
TIMESTAMP TIMESTAMP
TIMESTAMP_LTZ TIMESTAMP_LTZ
CHAR(n) CHAR(n)
VARCHAR(n) VARCHAR(n)

Back to top