Write Performance

Write Performance #

Performance of Table Store writers are related with the following factors.

Parallelism #

It is recommended that the parallelism of sink should be less than or equal to the number of buckets, preferably equal. You can control the parallelism of the sink with the sink.parallelism table property.

Option Required Default Type Description
sink.parallelism
No (none) Integer Defines the parallelism of the sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.

Compaction #

Number of Sorted Runs to Trigger Compaction #

Table Store uses LSM tree which supports a large number of updates. LSM organizes files in several sorted runs. When querying records from an LSM tree, all sorted runs must be combined to produce a complete view of all records.

One can easily see that too many sorted runs will result in poor query performance. To keep the number of sorted runs in a reasonable range, Table Store writers will automatically perform compactions. The following table property determines the minimum number of sorted runs to trigger a compaction.

Option Required Default Type Description
num-sorted-run.compaction-trigger
No 5 Integer The sorted run number to trigger compaction. Includes level0 files (one file one sorted run) and high-level runs (one level one sorted run).

Compaction will become less frequent when num-sorted-run.compaction-trigger becomes larger, thus improving writing performance. However, if this value becomes too large, more memory and CPU time will be needed when querying the table. This is a trade-off between writing and query performance.

Number of Sorted Runs to Pause Writing #

When number of sorted runs is small, Table Store writers will perform compaction asynchronously in separated threads, so records can be continuously written into the table. However to avoid unbounded growth of sorted runs, writers will have to pause writing when the number of sorted runs hits the threshold. The following table property determines the threshold.

Option Required Default Type Description
num-sorted-run.stop-trigger
No 10 Integer The number of sorted-runs that trigger the stopping of writes.

Write stalls will become less frequent when num-sorted-run.stop-trigger becomes larger, thus improving writing performance. However, if this value becomes too large, more memory and CPU time will be needed when querying the table. This is a trade-off between writing and query performance.

Dedicated Compaction Job #

By default, Table Store writers will perform compaction as needed when writing records. This is sufficient for most use cases, but there are two downsides:

  • This may result in unstable write throughput because throughput might temporarily drop when performing a compaction.
  • Compaction will mark some data files as “deleted” (not really deleted, see expiring snapshots for more info). If multiple writers mark the same file a conflict will occur when committing the changes. Table Store will automatically resolve the conflict, but this may result in job restarts.

To avoid these downsides, users can also choose to skip compactions in writers, and run a dedicated job only for compaction. As compactions are performed only by the dedicated job, writers can continuously write records without pausing and no conflicts will ever occur.

To skip compactions in writers, set the following table property to true.

Option Required Default Type Description
write-only
No false Boolean If set to true, compactions and snapshot expiration will be skipped. This option is used along with dedicated compact jobs.

To run a dedicated job for compaction, follow these instructions.

Flink SQL currently does not support statements related to compactions, so we have to submit the compaction job through flink run.

Run the following command to submit a compaction job for the table.

<FLINK_HOME>/bin/flink run \
    -c org.apache.flink.table.store.connector.action.FlinkActions \
    /path/to/flink-table-store-dist-0.3.0.jar \
    compact \
    --warehouse <warehouse-path> \
    --database <database-name> \
    --table <table-name>

If you submit a batch job (set execution.runtime-mode: batch in Flink’s configuration), all current table files will be compacted. If you submit a streaming job (set execution.runtime-mode: streaming in Flink’s configuration), the job will continuously monitor new changes to the table and perform compactions as needed.

If you only want to submit the compaction job and don’t want to wait until the job is done, you should submit in detached mode.

For more usage of the compact action, see

<FLINK_HOME>/bin/flink run \
    -c org.apache.flink.table.store.connector.action.FlinkActions \
    /path/to/flink-table-store-dist-0.3.0.jar \
    compact --help

Memory #

There are three main places in Table Store writer that takes up memory:

  • Writer’s memory buffer, shared and preempted by all writers of a single task. This memory value can be adjusted by the write-buffer-size table property.
  • Memory consumed when merging several sorted runs for compaction. Can be adjusted by the num-sorted-run.compaction-trigger option to change the number of sorted runs to be merged.
  • The memory consumed by writing columnar (ORC, Parquet, etc.) file, which is not adjustable.