Kafka
This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.

Kafka Pipeline Connector #

The Kafka Pipeline connector can be used as the Data Sink of the pipeline, and write data to Kafka. This document describes how to set up the Kafka Pipeline connector.

What can the connector do? #

  • Data synchronization

How to create Pipeline #

The pipeline for reading data from MySQL and sink to Kafka can be defined as follows:

source:
   type: mysql
   name: MySQL Source
   hostname: 127.0.0.1
   port: 3306
   username: admin
   password: pass
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: PLAINTEXT://localhost:62510

pipeline:
  name: MySQL to Kafka Pipeline
  parallelism: 2

Pipeline Connector Options #

Option Required Default Type Description
type required (none) String Specify what connector to use, here should be 'kafka'.
name optional (none) String The name of the sink.
partition.strategy optional (none) String Defines the strategy for sending record to kafka topic, available options are `all-to-zero`(sending all records to 0 partition) and `hash-by-key`(distributing all records by hash of primary keys), default option is `all-to-zero`.
key.format optional (none) String Defines the format identifier for encoding key data, available options are `csv` and `json`, default option is `json`.
value.format optional (none) String The format used to serialize the value part of Kafka messages. Available options are debezium-json and canal-json, default option is `debezium-json`, and do not support user-defined format now.
properties.bootstrap.servers required (none) String A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
topic optional (none) String If this parameter is configured, all events will be sent to this topic.
sink.add-tableId-to-header-enabled optional (none) Boolean If this parameter is true, a header with key of 'namespace','schemaName','tableName' will be added for each Kafka record. Default value is false.
properties.* optional (none) String Pass options of Kafka table to pipeline,See Kafka consume options.
sink.custom-header optional (none) String custom headers for each kafka record. Each header are separated by ',', separate key and value by ':'. For example, we can set headers like 'key1:value1,key2:value2'.

Usage Notes #

  • The written topic of Kafka will be namespace.schemaName.tableName string of TableId,this can be changed using route function of pipeline.

  • If the written topic of Kafka is not existed, we will create one automatically.

Output Format #

For different built-in value.format options, the output format is different:

debezium-json #

Refer to Debezium docs, debezium-json format will contains before,after,op,source elements, but ts_ms is not included in source.
An output example is:

{
  "before": null,
  "after": {
    "col1": "1",
    "col2": "1"
  },
  "op": "c",
  "source": {
    "db": "default_namespace",
    "table": "table1"
  }
}

canal-json #

Refer to Canal | Apache Flink, canal-json format will contains old,data,type,database,table,pkNames elements, but ts is not included.
An output example is:

{
    "old": null,
    "data": [
        {
            "col1": "1",
            "col2": "1"
        }
    ],
    "type": "INSERT",
    "database": "default_schema",
    "table": "table1",
    "pkNames": [
        "col1"
    ]
}

Data Type Mapping #

CDC type JSON type NOTE
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL(p, s) DECIMAL(p, s)
BOOLEAN BOOLEAN
DATE DATE
TIMESTAMP TIMESTAMP
TIMESTAMP_LTZ TIMESTAMP_LTZ
CHAR(n) CHAR(n)
VARCHAR(n) VARCHAR(n)

Back to top