Streaming Query
This documentation is for an unreleased version of Apache Flink Table Store. We recommend you use the latest stable version.

Streaming Query #

Currently, Table Store only supports Flink streaming queries.

The Table Store is streaming batch unified, you can read full and incremental data depending on the runtime execution mode:

-- Batch mode, read latest snapshot
SET 'execution.runtime-mode' = 'batch';
SELECT * FROM MyTable;

-- Streaming mode, streaming reading, read incremental snapshot, read the snapshot first, then read the incremental
SET 'execution.runtime-mode' = 'streaming';
SELECT * FROM MyTable;

-- Streaming mode, streaming reading, read latest incremental
SET 'execution.runtime-mode' = 'streaming';
SELECT * FROM MyTable /*+ OPTIONS ('scan.mode'='latest') */;

Different scan.mode will result in different consuming behavior under streaming mode.

Scan Mode Default Description
default
Yes Determines actual startup mode according to other table properties. If "scan.timestamp-millis" is set the actual startup mode will be "from-timestamp". Otherwise the actual startup mode will be "full".
full
No Produces a snapshot on the table upon first startup, and continue to read the latest changes.
latest
No Continuously reads latest changes without producing a snapshot at the beginning.
from-timestamp
No Continuously reads changes starting from timestamp specified by "scan.timestamp-millis", without producing a snapshot at the beginning.

Streaming Query on Files #

You can choose to consume incremental changes directly from the lake store files under streaming mode. This mode has a lower cost compared to Kafka but has a higher latency, depending on the checkpoint interval of the writing job.

No Changelog Producer #

By default, the downstream streaming consumption is disordered (ordered within the key) stream of upsert data. Changelog is produced by a normalized operator following the source, which is costly.

When changelog-producer property is not set, because the storage only retains the upsert data and does not have the full changelog data containing update_before, so the downstream consumption job needs to use the normalized node to generate the complete changelog.

Note: The normalized node needs to persist all the data into the state, which is very costly.

Input Changelog Producer #

If your input can produce a full stream of complete changelog (for example, a CDC source), you may consider using the input changelog producer to remove the costly normalized operator downstream.

To enable the input changelog producer, configure the table as follows:

CREATE TABLE T (...)
WITH (
    'changelog-producer' = 'input'
)

When changelog-producer property is set to input, the storage trusts input data is a complete changelog so that downstream consumption can also read the complete changelog.

Note: You need to ensure that the input is a complete changelog, such as from a Database CDC, or generated by a Flink stateful computation.

Full Compaction Changelog Producer #

If your input can’t produce a full stream of complete changelog but you still want to get rid of the costly normalized operator downstream, you may consider using the full compaction changelog producer.

To enable the full compaction changelog producer, configure the table as follows:

CREATE TABLE T (...)
WITH (
    'changelog-producer' = 'full-compaction'
)

When changelog-producer property is set to full-compaction, the storage will compare the results between full compactions and produce the differences as changelog. That is to say, the latency of changelog is affected by the frequency of full compactions.

To ensure that full compactions are performed once in a while, you can configure the following table property:

CREATE TABLE T (...)
WITH (
    'changelog-producer' = 'full-compaction',
    'changelog-producer.compaction-interval' = '<time interval you want a full compaction to happen>'
)

Full compaction will be performed at least once per changelog-producer.compaction-interval. The default value is 30min.

Note: Full compaction changelog producer can produce complete changelog for any type of source. However it is not as efficient as the input changelog producer and the latency to produce changelog might be high.

Streaming Query on Kafka #

For a table configuring a log system like Kafka, data will be double written to the file storage and the Kafka topic under streaming mode. For queries, there will be hybrid reads will from incremental snapshots.

CREATE TABLE T (...)
WITH (
    'log.system' = 'kafka',
    'kafka.bootstrap.servers' = '...',
    'kafka.topic' = '...'
)

The partition of the Kafka topic needs to be the same as the number of buckets.

By default, data is only visible after the checkpoint, which means that the streaming reading has transactional consistency.

Immediate data visibility is configured via log.consistency = eventual.

Note: Use SELECT in the sql-client does not get immediate data, the collect sink generated by select relies on the checkpoint mechanism. The more recommended way to test is to use a print sink.

Due to the tradeoff between data freshness and completeness, immediate data visibility is barely accomplished under exactly-once semantics. Nevertheless, users can relax the constraint to use at-least-once mode to achieve it. Note that records may be sent to downstream jobs ahead of the committing (since no barrier alignment is required), which may lead to duplicate data during job failover. As a result, users may need to manually de-duplicate data to achieve final consistency.