Scan Source: Unbounded Sink: Streaming Upsert Mode
The Upsert Kafka connector allows for reading data from and writing data into Kafka topics in the upsert fashion.
As a source, the upsert-kafka connector produces a changelog stream, where each data record represents an update or delete event. More precisely, the value in a data record is interpreted as an UPDATE of the last value for the same key, if any (if a corresponding key doesn’t exist yet, the update will be considered an INSERT). Using the table analogy, a data record in a changelog stream is interpreted as an UPSERT aka INSERT/UPDATE because any existing row with the same key is overwritten. Also, null values are interpreted in a special way: a record with a null value represents a “DELETE”.
As a sink, the upsert-kafka connector can consume a changelog stream. It will write INSERT/UPDATE_AFTER data as normal Kafka messages value, and write DELETE data as Kafka messages with null values (indicate tombstone for the key). Flink will guarantee the message ordering on the primary key by partition data on the values of the primary key columns, so the update/deletion messages on the same key will fall into the same partition.
In order to use the Upsert Kafka connector the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
Upsert Kafka version | Maven dependency | SQL Client JAR |
---|---|---|
universal |
<dependency>
|
Download |
The example below shows how to create and use an Upsert Kafka table:
Attention Make sure to define the primary key in the DDL.
See the regular Kafka connector for a list-kafka.md of all available metadata fields.
Option | Required | Default | Type | Description |
---|---|---|---|---|
connector |
required | (none) | String | Specify which connector to use, for the Upsert Kafka use: 'upsert-kafka' . |
topic |
required | (none) | String | The Kafka topic name to read from and write to. |
properties.bootstrap.servers |
required | (none) | String | Comma separated list of Kafka brokers. |
properties.* |
optional | (none) | String |
This can set and pass arbitrary Kafka configurations. Suffix names must match the configuration key defined in Kafka Configuration documentation. Flink will remove the "properties." key prefix and pass the transformed key and values to the underlying KafkaClient. For example, you can disable automatic topic creation via 'properties.allow.auto.create.topics' = 'false' . But there are some configurations that do not support to set, because Flink will override them, e.g. 'key.deserializer' and 'value.deserializer' .
|
key.format |
required | (none) | String | The format used to deserialize and serialize the key part of Kafka messages. Please refer to the formats page for more details and more format options. Attention Compared to the regular Kafka connector, the key fields are specified by thePRIMARY KEY syntax.
|
key.fields-prefix |
optional | (none) | String | Defines a custom prefix for all fields of the key format to avoid name clashes with fields
of the value format. By default, the prefix is empty. If a custom prefix is defined, both the
table schema and 'key.fields' will work with prefixed names. When constructing the
data type of the key format, the prefix will be removed and the non-prefixed names will be used
within the key format. Please note that this option requires that 'value.fields-include'
must be set to 'EXCEPT_KEY' .
|
value.format |
required | (none) | String | The format used to deserialize and serialize the value part of Kafka messages. Please refer to the formats page for more details and more format options. |
value.fields-include |
optional | ALL | Enum Possible values: [ALL, EXCEPT_KEY] |
Defines a strategy how to deal with key columns in the data type of the value format. By
default, 'ALL' physical columns of the table schema will be included in the value
format which means that key columns appear in the data type for both the key and value format.
|
sink.parallelism |
optional | (none) | Integer | Defines the parallelism of the upsert-kafka sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator. |
See the regular Kafka connector for more
explanation around key and value formats. However, note that this connector requires both a key and
value format where the key fields are derived from the PRIMARY KEY
constraint.
The following example shows how to specify and configure key and value formats. The format options are
prefixed with either the 'key'
or 'value'
plus format identifier.
The Upsert Kafka always works in the upsert fashion and requires to define the primary key in the DDL. With the assumption that records with the same key should be ordered in the same partition, the primary key semantic on the changelog source means the materialized changelog is unique on the primary keys. The primary key definition will also control which fields should end up in Kafka’s key.
By default, an Upsert Kafka sink ingests data with at-least-once guarantees into a Kafka topic if the query is executed with checkpointing enabled.
This means, Flink may write duplicate records with the same key into the Kafka topic. But as the connector is working in the upsert mode, the last record on the same key will take effect when reading back as a source. Therefore, the upsert-kafka connector achieves idempotent writes just like the HBase sink.
Flink supports to emit per-partition watermarks for Upsert Kafka. Watermarks are generated inside the Kafka
consumer. The per-partition watermarks are merged in the same way as watermarks are merged during streaming
shuffles. The output watermark of the source is determined by the minimum watermark among the partitions
it reads. If some partitions in the topics are idle, the watermark generator will not advance. You can
alleviate this problem by setting the 'table.exec.source.idle-timeout'
option in the table configuration.
Please refer to Kafka watermark strategies for more details.
Upsert Kafka stores message keys and values as bytes, so Upsert Kafka doesn’t have schema or data types. The messages are serialized and deserialized by formats, e.g. csv, json, avro. Thus, the data type mapping is determined by specific formats. Please refer to Formats pages for more details.