Write Table

Write Table #

INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name
  [PARTITION part_spec] [column_list] select_statement

part_spec:
  (part_col_name1=val1 [, part_col_name2=val2, ...])

column_list:
  (col_name1 [, column_name2, ...])

Unify Streaming and Batch #

Flink Table Store supports read/write under both batch and streaming mode. Beyond that, it can also write to the same managed table simultaneously by different streaming and batch jobs.

Suppose you have a warehouse pipeline:

Unify Streaming and Batch

The DWD layer has a following table:

-- a managed table ddl
CREATE TABLE MyDwdTable (
  user_id BIGINT,
  item_id BIGINT,
  dt STRING
) PARTITIONED BY (dt);

And there is a real-time pipeline to perform the data sync task, followed by the downstream jobs to perform the rest ETL steps.

-- Run a streaming job that continuously writes to the table
SET 'execution.runtime-mode' = 'streaming';
INSERT INTO MyDwdTable SELECT user_id, item_id, dt FROM MyCdcTable WHERE some_filter;

-- The downstream aggregation task
INSERT INTO MyDwsTable
SELECT dt, item_id, COUNT(user_id) FROM MyDwdTable GROUP BY dt, item_id;

Some backfill tasks are often required to correct historical data, which means you can start a new batch job overwriting the table’s historical partition without influencing the current streaming pipeline and the downstream tasks.

-- Run a batch job to revise yesterday's partition
SET 'execution.runtime-mode' = 'batch';
INSERT OVERWRITE MyDwdTable PARTITION ('dt'='20220402')
SELECT user_id, item_id FROM MyCdcTable WHERE dt = '20220402' AND new_filter;

This way you revise yesterday’s partition without suspending the streaming job.

Note: Multiple jobs writing to a single partition at the same time is not supported. The behavior does not result in data errors, but can lead to job failover.

Parallelism #

It is recommended that the parallelism of sink should be less than or equal to the number of buckets, preferably equal. You can control the parallelism of the sink with the sink.parallelism option.

Option Required Default Type Description
sink.parallelism
No (none) Integer Defines the parallelism of the sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.

Expiring Snapshot #

Table Store generates one or two snapshots per commit. To avoid too many snapshots that create a lot of small files and redundant storage, Table Store writes defaults to eliminate expired snapshots:

Option Required Default Type Description
snapshot.time-retained
No 1 h Duration The maximum time of completed snapshots to retain.
snapshot.num-retained.min
No 10 Integer The minimum number of completed snapshots to retain.
snapshot.num-retained.max
No Integer.MAX_VALUE Integer The maximum number of completed snapshots to retain.

Please note that too short retain time or too small retain number may result in:

  • Batch query cannot find the file. For example, the table is relatively large and the batch query takes 10 minutes to read, but the snapshot from 10 minutes ago expires, at which point the batch query will read a deleted snapshot.
  • Continuous reading jobs on FileStore (Without Log System) fail to restart. At the time of the job failover, the snapshot it recorded has expired.

Performance #

Table Store uses LSM data structure, which itself has the ability to support a large number of updates. Update performance and query performance is a tradeoff, the following parameters control this tradeoff:

Option Required Default Type Description
num-sorted-run.compaction-trigger
No 5 Integer The sorted run number to trigger compaction. Includes level0 files (one file one sorted run) and high-level runs (one level one sorted run).

The compaction-trigger determines the frequency of compaction. The smaller the number of sorted run, the more compaction occurs, and the larger the number of sorted run, the less compaction occurs.

  • The larger num-sorted-run.compaction-trigger, the less merge cost when updating data, which can avoid many invalid merges. However, if this value is too large, more memory will be needed when merging files because each FileReader will take up a lot of memory.

  • The smaller num-sorted-run.compaction-trigger, the better performance when querying, fewer files will be merged.

Write Stalls #

The Writer automatically maintains the structure of the LSM, which means that there will be asynchronous threads constantly compaction, but if write speed is faster than the compaction speed, write stalls may occur. Writing will be stopping.

If we don’t limit writing, we will have the following problems:

  • Increasing space scaling, which can lead to running out of disk space.
  • Increasing read amplification, which greatly reduces read performance.

The following parameters determine when to stop writing:

Option Required Default Type Description
num-sorted-run.stop-trigger
No 10 Integer The number of sorted-runs that trigger the stopping of writes.

Memory #

There are three main places in the Table Store’s sink writer that take up memory:

  • MemTable’s write buffer, which is individually occupied by each partition, each bucket, and this memory value can be adjustable by the write-buffer-size option (default 64 MB).
  • The memory consumed by compaction for reading files, it can be adjusted by the num-sorted-run.compaction-trigger option to change the maximum number of files to be merged.
  • The memory consumed by writing file, which is not adjustable.