This documentation is for an unreleased version of Apache Flink Table Store. We recommend you use the latest stable version.
Write Table #
INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name
[PARTITION part_spec] [column_list] select_statement
part_spec:
(part_col_name1=val1 [, part_col_name2=val2, ...])
column_list:
(col_name1 [, column_name2, ...])
IMPORTANT:
- Checkpointing needs to be enabled when writing to the Table Store in STREAMING mode.
execution.checkpointing.unaligned=true
is not supported when writing to the Table Store in STREAMING mode.execution.checkpointing.mode=at-least-once
is not supported when writing to the Table Store in STREAMING mode.
Parallelism #
It is recommended that the parallelism of sink should be less than or
equal to the number of buckets, preferably equal. You can control the
parallelism of the sink with the sink.parallelism
option.
Option | Required | Default | Type | Description |
---|---|---|---|---|
sink.parallelism |
No | (none) | Integer | Defines the parallelism of the sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator. |
Expiring Snapshot #
Table Store generates one or two snapshots per commit. To avoid too many snapshots that create a lot of small files and redundant storage, Table Store writes defaults to eliminate expired snapshots:
Option | Required | Default | Type | Description |
---|---|---|---|---|
snapshot.time-retained |
No | 1 h | Duration | The maximum time of completed snapshots to retain. |
snapshot.num-retained.min |
No | 10 | Integer | The minimum number of completed snapshots to retain. |
snapshot.num-retained.max |
No | Integer.MAX_VALUE | Integer | The maximum number of completed snapshots to retain. |
Please note that too short retain time or too small retain number may result in:
- Batch query cannot find the file. For example, the table is relatively large and the batch query takes 10 minutes to read, but the snapshot from 10 minutes ago expires, at which point the batch query will read a deleted snapshot.
- Streaming reading jobs on FileStore (Without Log System) fail to restart. At the time of the job failover, the snapshot it recorded has expired.
Performance #
Table Store uses LSM data structure, which itself has the ability to support a large number of updates. Update performance and query performance is a tradeoff, the following parameters control this tradeoff:
Option | Required | Default | Type | Description |
---|---|---|---|---|
num-sorted-run.compaction-trigger |
No | 5 | Integer | The sorted run number to trigger compaction. Includes level0 files (one file one sorted run) and high-level runs (one level one sorted run). |
The compaction-trigger determines the frequency of compaction. The smaller the number of sorted run, the more compaction occurs, and the larger the number of sorted run, the less compaction occurs.
-
The larger
num-sorted-run.compaction-trigger
, the less merge cost when updating data, which can avoid many invalid merges. However, if this value is too large, more memory will be needed when merging files because each FileReader will take up a lot of memory. -
The smaller
num-sorted-run.compaction-trigger
, the better performance when querying, fewer files will be merged.
Write Stalls #
The Writer automatically maintains the structure of the LSM, which means that there will be asynchronous threads constantly compaction, but if write speed is faster than the compaction speed, write stalls may occur. Writing will be stopping.
If we don’t limit writing, we will have the following problems:
- Increasing space scaling, which can lead to running out of disk space.
- Increasing read amplification, which greatly reduces read performance.
The following parameters determine when to stop writing:
Option | Required | Default | Type | Description |
---|---|---|---|---|
num-sorted-run.stop-trigger |
No | 10 | Integer | The number of sorted-runs that trigger the stopping of writes. |
Memory #
There are three main places in the Table Store’s sink writer that take up memory:
- MemTable’s write buffer, shared and preempted by all writers of a single task.
This memory value can be adjusted by the
write-buffer-size
option. - The memory consumed by compaction for reading files, can be adjusted by the
num-sorted-run.compaction-trigger
option to change the maximum number of files to be merged. - The memory consumed by writing columnar (ORC) file, which is not adjustable.