Query Table
This documentation is for an unreleased version of Apache Flink Table Store. We recommend you use the latest stable version.

Query Table #

The Table Store is streaming batch unified, you can read full and incremental data depending on the runtime execution mode:

-- Batch mode, read latest snapshot
SET 'execution.runtime-mode' = 'batch';

-- Streaming mode, read incremental snapshot, read the snapshot first, then read the incremental
SET 'execution.runtime-mode' = 'streaming';

-- Streaming mode, read latest incremental
SET 'execution.runtime-mode' = 'streaming';
SELECT * FROM MyTable /*+ OPTIONS ('log.scan'='latest') */;

Query Optimization #

It is highly recommended to take partition and primary key filters along with the query, which will speed up the data skipping of the query.

Supported filter functions are:

  • =
  • <>
  • <
  • <=
  • >
  • >=
  • in
  • starts with like

Streaming Real-time #

By default, data is only visible after the checkpoint, which means that the streaming reading has transactional consistency.

If you want the data to be immediately visible, you need to set the following options:

Table Option Default Description
`log.system` = `kafka`
No You need to enable log system because the FileStore's continuous consumption only provides checkpoint-based visibility.
`log.consistency` = `eventual`
No This means that writes are visible without using LogSystem's transaction mechanism.

Note: All tables need to have the primary key defined because only then can the data be de-duplicated by the normalizing node of the downstream job.

Streaming Low Cost #

By default, for the table with the primary key, the records in the table store only contain INSERT, UPDATE_AFTER, and DELETE. The downstream consuming job will generate a normalized node, and it stores all processed key-value to produce the UPDATE_BEFORE message, which will bring extra overhead.

If you want to remove downstream normalized node (It’s costly) or see the all changes of this table, you can configure:

  • ‘log.changelog-mode’ = ‘all’
  • ‘log.consistency’ = ‘transactional’ (default)

The inserted query written to the table store must contain all message types with UPDATE_BEFORE, otherwise the planner will throw an exception. It means that Planner expects the inserted query to produce a real changelog, otherwise the data would be wrong.