Querying Tables
This documentation is for an unreleased version of Apache Flink Table Store. We recommend you use the latest stable version.

Querying Tables #

Just like all other tables, Table Store tables can be queried with SELECT statement.

Scan Mode #

By specifying the scan.mode table property, users can specify where and how Table Store sources should produce records.

Scan Mode Batch Source Behavior Streaming Source Behavior
default The default scan mode. Determines actual scan mode according to other table properties. If "scan.timestamp-millis" is set the actual scan mode will be "from-timestamp", and if "scan.snapshot-id" is set the actual startup mode will be "from-snapshot". Otherwise the actual scan mode will be "latest-full".
latest-full Produces the latest snapshot of table. Produces the latest snapshot on the table upon first startup, and continues to read the following changes.
compacted-full Produces the snapshot after the latest compaction. Produces the snapshot after the latest compaction on the table upon first startup, and continues to read the following changes.
latest Same as "latest-full" Continuously reads latest changes without producing a snapshot at the beginning.
from-timestamp Produces a snapshot earlier than or equals to the timestamp specified by "scan.timestamp-millis". Continuously reads changes starting from timestamp specified by "scan.timestamp-millis", without producing a snapshot at the beginning.
from-snapshot Produces a snapshot specified by "scan.snapshot-id". Continuously reads changes starting from a snapshot specified by "scan.snapshot-id", without producing a snapshot at the beginning.

Users can also adjust changelog-producer table property to specify the pattern of produced changes. See changelog producer for details.

System Tables #

System tables contain metadata and information about each table, such as the snapshots created and the options in use. Users can access system tables with batch queries.

Currently, Flink, Spark and Trino supports querying system tables.

In some cases, the table name needs to be enclosed with back quotes to avoid syntax parsing conflicts, for example triple access mode:

SELECT * FROM my_catalog.my_db.`MyTable$snapshots`;

Snapshots Table #

You can query the snapshot history information of the table through snapshots table.

SELECT * FROM MyTable$snapshots;

/*
+--------------+------------+-----------------+-------------------+--------------+-------------------------+
|  snapshot_id |  schema_id |     commit_user | commit_identifier |  commit_kind |             commit_time |
+--------------+------------+-----------------+-------------------+--------------+-------------------------+
|            2 |          0 | 7ca4cd28-98e... |                 2 |       APPEND | 2022-10-26 11:44:15.600 |
|            1 |          0 | 870062aa-3e9... |                 1 |       APPEND | 2022-10-26 11:44:15.148 |
+--------------+------------+-----------------+-------------------+--------------+-------------------------+
2 rows in set
*/

By querying the snapshots table, you can know the commit and expiration information about that table and time travel through the data.

Schemas Table #

You can query the historical schemas of the table through schemas table.

SELECT * FROM MyTable$schemas;

/*
+-----------+--------------------------------+----------------+--------------+---------+---------+
| schema_id |                         fields | partition_keys | primary_keys | options | comment |
+-----------+--------------------------------+----------------+--------------+---------+---------+
|         0 | [{"id":0,"name":"word","typ... |             [] |     ["word"] |      {} |         |
|         1 | [{"id":0,"name":"word","typ... |             [] |     ["word"] |      {} |         |
|         2 | [{"id":0,"name":"word","typ... |             [] |     ["word"] |      {} |         |
+-----------+--------------------------------+----------------+--------------+---------+---------+
3 rows in set
*/

You can join the snapshots table and schemas table to get the fields of given snapshots.

SELECT s.snapshot_id, t.schema_id, t.fields 
    FROM MyTable$snapshots s JOIN MyTable$schemas t 
    ON s.schema_id=t.schema_id where s.snapshot_id=100;

Options Table #

You can query the table’s option information which is specified from the DDL through options table. The options not shown will be the default value. You can take reference to [Configuration].

SELECT * FROM MyTable$options;

/*
+------------------------+--------------------+
|         key            |        value       |
+------------------------+--------------------+
| snapshot.time-retained |         5 h        |
+------------------------+--------------------+
1 rows in set
*/

Audit log Table #

If you need to audit the changelog of the table, you can use the audit_log system table. Through audit_log table, you can get the rowkind column when you get the incremental data of the table. You can use this column for filtering and other operations to complete the audit.

There are four values for rowkind:

  • +I: Insertion operation.
  • -U: Update operation with the previous content of the updated row.
  • +U: Update operation with new content of the updated row.
  • -D: Deletion operation.
SELECT * FROM MyTable$audit_log;

/*
+------------------+-----------------+-----------------+
|     rowkind      |     column_0    |     column_1    |
+------------------+-----------------+-----------------+
|        +I        |      ...        |      ...        |
+------------------+-----------------+-----------------+
|        -U        |      ...        |      ...        |
+------------------+-----------------+-----------------+
|        +U        |      ...        |      ...        |
+------------------+-----------------+-----------------+
3 rows in set
*/