Scan Source: Unbounded Sink: Streaming Append Mode
The Kafka connector allows for reading data from and writing data into Kafka topics.
Apache Flink ships with multiple Kafka connectors: universal, 0.10, and 0.11.
This universal Kafka connector attempts to track the latest version of the Kafka client.
The version of the client it uses may change between Flink releases.
Modern Kafka clients are backwards compatible with broker versions 0.10.0 or later.
For most users the universal Kafka connector is the most appropriate.
However, for Kafka versions 0.11.x and 0.10.x, we recommend using the dedicated 0.11
and 0.10
connectors, respectively.
For details on Kafka compatibility, please refer to the official Kafka documentation.
Kafka Version | Maven dependency | SQL Client JAR |
---|---|---|
universal | flink-connector-kafka_2.11 |
Download |
0.11.x | flink-connector-kafka-0.11_2.11 |
Download |
0.10.x | flink-connector-kafka-0.10_2.11 |
Download |
The Kafka connectors are not currently part of the binary distribution. See how to link with them for cluster execution here.
The example below shows how to create a Kafka table:
Option | Required | Default | Type | Description |
---|---|---|---|---|
connector |
required | (none) | String | Specify what connector to use, for Kafka the options are: 'kafka' , 'kafka-0.11' , 'kafka-0.10' . |
topic |
required | (none) | String | Topic name from which the table is read. |
properties.bootstrap.servers |
required | (none) | String | Comma separated list of Kafka brokers. |
properties.group.id |
required by source | (none) | String | The id of the consumer group for Kafka source, optional for Kafka sink. |
format |
required | (none) | String | The format used to deserialize and serialize Kafka messages.
The supported formats are 'csv' , 'json' , 'avro' , 'debezium-json' and 'canal-json' .
Please refer to Formats page for more details and more format options.
|
scan.startup.mode |
optional | group-offsets | String | Startup mode for Kafka consumer, valid values are 'earliest-offset' , 'latest-offset' , 'group-offsets' , 'timestamp' and 'specific-offsets' .
See the following Start Reading Position for more details. |
scan.startup.specific-offsets |
optional | (none) | String | Specify offsets for each partition in case of 'specific-offsets' startup mode, e.g. 'partition:0,offset:42;partition:1,offset:300' .
|
scan.startup.timestamp-millis |
optional | (none) | Long | Start from the specified epoch timestamp (milliseconds) used in case of 'timestamp' startup mode. |
sink.partitioner |
optional | (none) | String | Output partitioning from Flink's partitions into Kafka's partitions. Valid values are
|
The config option scan.startup.mode
specifies the startup mode for Kafka consumer. The valid enumerations are:
group-offsets
: start from committed offsets in ZK / Kafka brokers of a specific consumer group.earliest-offset
: start from the earliest offset possible.latest-offset
: start from the latest offset.timestamp
: start from user-supplied timestamp for each partition.specific-offsets
: start from user-supplied specific offsets for each partition.The default option value is group-offsets
which indicates to consume from last committed offsets in ZK / Kafka brokers.
If timestamp
is specified, another config option scan.startup.timestamp-millis
is required to specify a specific startup timestamp in milliseconds since January 1, 1970 00:00:00.000 GMT.
If specific-offsets
is specified, another config option scan.startup.specific-offsets
is required to specify specific startup offsets for each partition,
e.g. an option value partition:0,offset:42;partition:1,offset:300
indicates offset 42
for partition 0
and offset 300
for partition 1
.
Flink natively supports Kafka as a changelog source. If messages in Kafka topic is change event captured from other databases using CDC tools, then you can use a CDC format to interpret messages as INSERT/UPDATE/DELETE messages into Flink SQL system. Flink provides two CDC formats debezium-json and canal-json to interpret change events captured by Debezium and Canal. The changelog source is a very useful feature in many cases, such as synchronizing incremental data from databases to other systems, auditing logs, materialized views on databases, temporal join changing history of a database table and so on. See more about how to use the CDC formats in debezium-json and canal-json.
The config option sink.partitioner
specifies output partitioning from Flink’s partitions into Kafka’s partitions.
By default, a Kafka sink writes to at most as many partitions as its own parallelism (each parallel instance of the sink writes to exactly one partition).
In order to distribute the writes to more partitions or control the routing of rows into partitions, a custom sink partitioner can be provided. The round-robin
partitioner is useful to avoid an unbalanced partitioning.
However, it will cause a lot of network connections between all the Flink instances and all the Kafka brokers.
By default, a Kafka sink ingests data with at-least-once guarantees into a Kafka topic if the query is executed with checkpointing enabled.
Kafka stores message keys and values as bytes, so Kafka doesn’t have schema or data types. The Kafka messages are deserialized and serialized by formats, e.g. csv, json, avro. Thus, the data type mapping is determined by specific formats. Please refer to Formats pages for more details.