Streaming Query

Streaming Query #

Currently, Table Store only supports Flink streaming queries.

The Table Store is streaming batch unified, you can read full and incremental data depending on the runtime execution mode:

-- Batch mode, read latest snapshot
SET 'execution.runtime-mode' = 'batch';
SELECT * FROM MyTable;

-- Streaming mode, streaming reading, read incremental snapshot, read the snapshot first, then read the incremental
SET 'execution.runtime-mode' = 'streaming';
SELECT * FROM MyTable;

-- Streaming mode, streaming reading, read latest incremental
SET 'execution.runtime-mode' = 'streaming';
SELECT * FROM MyTable /*+ OPTIONS ('log.scan'='latest') */;

Different log.scan mode will result in different consuming behavior under streaming mode.

Scan Mode Default Description
FULL
Yes FULL scan mode performs a hybrid reading with a snapshot scan and the streaming incremental scan.
LATEST
No LATEST scan mode only reads incremental data from the latest offset.

Streaming Query on Files #

You can choose to consume incremental changes directly from the lake store files under streaming mode. This mode has a lower cost compared to Kafka but has a higher latency, depending on the checkpoint interval of the writing job.

By default, the downstream streaming consumption is disordered (ordered within the key) stream of upsert data. If you expect an ordered CDC data stream, and remove downstream changelog normalized operator (which is costly), you can configure it as follows (Recommended, but this requires that your input is inclusive of all changelogs):

CREATE TABLE T (...)
WITH (
    'changelog-producer' = 'input'
)

You can understand the difference between changelog-producer(none) and changelog-producer(input) by the following pictures:

When the changelog-producer is none, because the storage only retains the upsert data and does not have the full changelog data containing update_before, so the downstream consumption job needs to use the normalized node to generate the complete changelog.

Note: The normalized node needs to persist all the data into the state, which is very costly.

When the changelog-producer is input, the storage trusts input data is a complete changelog so that downstream consumption can also read the complete changelog.

Note: You need to ensure that the input is a complete changelog, such as from a Database CDC, or generated by a Flink stateful computation.

Streaming Query on Kafka #

For a table configuring a log system like Kafka, data will be double written to the file storage and the Kafka topic under streaming mode. For queries, there will be hybrid reads will from incremental snapshots.

CREATE TABLE T (...)
WITH (
    'log.system' = 'kafka',
    'kafka.bootstrap.servers' = '...',
    'kafka.topic' = '...'
)

The partition of the Kafka topic needs to be the same as the number of buckets.

By default, data is only visible after the checkpoint, which means that the streaming reading has transactional consistency.

Immediate data visibility is configured via log.consistency = eventual.

Note: Use SELECT in the sql-client does not get immediate data, the collect sink generated by select relies on the checkpoint mechanism. The more recommended way to test is to use a print sink.

Due to the tradeoff between data freshness and completeness, immediate data visibility is barely accomplished under exactly-once semantics. Nevertheless, users can relax the constraint to use at-least-once mode to achieve it. Note that records may be sent to downstream jobs ahead of the committing (since no barrier alignment is required), which may lead to duplicate data during job failover. As a result, users may need to manually de-duplicate data to achieve final consistency.