External Log Systems

External Log Systems #

Aside from underlying table files, changelog of Table Store can also be stored into or consumed from an external log system, such as Kafka. By specifying log.system table property, users can choose which external log system to use.

If an external log system is used, all records written into table files will also be written into the log system. Changes produced by the streaming queries will thus come from the log system instead of table files.

Consistency Guarantees #

By default, changes in the log systems are visible to consumers only after a snapshot, just like table files. This behavior guarantees the exactly-once semantics. That is, each record is seen by the consumers exactly once.

However, users can also specify the table property 'log.consistency' = 'eventual' so that changelog written into the log system can be immediately consumed by the consumers, without waiting for the next snapshot. This behavior decreases the latency of changelog, but it can only guarantee the at-least-once semantics (that is, consumers might see duplicated records) due to possible failures.

If 'log.consistency' = 'eventual' is set, in order to achieve correct results, Table Store source in Flink will automatically adds a “normalize” operator for deduplication. This operator persists the values of each key in states. As one can easily tell, this operator will be very costly and should be avoided.

Supported Log Systems #

Kafka #

By specifying 'log.system' = 'kafka', users can write changes into Kafka along with table files.

CREATE TABLE T (...)
WITH (
    'log.system' = 'kafka',
    'kafka.bootstrap.servers' = '...',
    'kafka.topic' = '...'
);

Table Properties for Kafka are listed as follows.

Key Default Type Description
kafka.bootstrap.servers
(none) String Required Kafka server connection string.
kafka.topic
(none) String Topic of this kafka table.