Write Table
This documentation is for an unreleased version of Apache Flink Table Store. We recommend you use the latest stable version.

Write Table #

INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name
  [PARTITION part_spec] [column_list] select_statement

part_spec:
  (part_col_name1=val1 [, part_col_name2=val2, ...])

column_list:
  (col_name1 [, column_name2, ...])

IMPORTANT:

  • Checkpointing needs to be enabled when writing to the Table Store in STREAMING mode.
  • execution.checkpointing.unaligned=true is not supported when writing to the Table Store in STREAMING mode.
  • execution.checkpointing.mode=at-least-once is not supported when writing to the Table Store in STREAMING mode.

Parallelism #

It is recommended that the parallelism of sink should be less than or equal to the number of buckets, preferably equal. You can control the parallelism of the sink with the sink.parallelism option.

Option Required Default Type Description
sink.parallelism
No (none) Integer Defines the parallelism of the sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.

Expiring Snapshot #

Table Store generates one or two snapshots per commit. To avoid too many snapshots that create a lot of small files and redundant storage, Table Store writes defaults to eliminate expired snapshots:

Option Required Default Type Description
snapshot.time-retained
No 1 h Duration The maximum time of completed snapshots to retain.
snapshot.num-retained.min
No 10 Integer The minimum number of completed snapshots to retain.
snapshot.num-retained.max
No Integer.MAX_VALUE Integer The maximum number of completed snapshots to retain.

Please note that too short retain time or too small retain number may result in:

  • Batch query cannot find the file. For example, the table is relatively large and the batch query takes 10 minutes to read, but the snapshot from 10 minutes ago expires, at which point the batch query will read a deleted snapshot.
  • Streaming reading jobs on FileStore (Without Log System) fail to restart. At the time of the job failover, the snapshot it recorded has expired.

Performance #

Table Store uses LSM data structure, which itself has the ability to support a large number of updates. Update performance and query performance is a tradeoff, the following parameters control this tradeoff:

Option Required Default Type Description
num-sorted-run.compaction-trigger
No 5 Integer The sorted run number to trigger compaction. Includes level0 files (one file one sorted run) and high-level runs (one level one sorted run).

The compaction-trigger determines the frequency of compaction. The smaller the number of sorted run, the more compaction occurs, and the larger the number of sorted run, the less compaction occurs.

  • The larger num-sorted-run.compaction-trigger, the less merge cost when updating data, which can avoid many invalid merges. However, if this value is too large, more memory will be needed when merging files because each FileReader will take up a lot of memory.

  • The smaller num-sorted-run.compaction-trigger, the better performance when querying, fewer files will be merged.

Write Stalls #

The Writer automatically maintains the structure of the LSM, which means that there will be asynchronous threads constantly compaction, but if write speed is faster than the compaction speed, write stalls may occur. Writing will be stopping.

If we don’t limit writing, we will have the following problems:

  • Increasing space scaling, which can lead to running out of disk space.
  • Increasing read amplification, which greatly reduces read performance.

The following parameters determine when to stop writing:

Option Required Default Type Description
num-sorted-run.stop-trigger
No 10 Integer The number of sorted-runs that trigger the stopping of writes.

Memory #

There are three main places in the Table Store’s sink writer that take up memory:

  • MemTable’s write buffer, shared and preempted by all writers of a single task. This memory value can be adjusted by the write-buffer-size option.
  • The memory consumed by compaction for reading files, can be adjusted by the num-sorted-run.compaction-trigger option to change the maximum number of files to be merged.
  • The memory consumed by writing columnar (ORC) file, which is not adjustable.