Configuration
This documentation is for an unreleased version of Apache Flink Table Store. We recommend you use the latest stable version.

Configuration #

CoreOptions #

Core options for table store.

Key Default Type Description
bucket
1 Integer Bucket number for file store.
bucket-key
(none) String Specify the table store distribution policy. Data is assigned to each bucket according to the hash value of bucket-key.
If you specify multiple fields, delimiter is ','.
If not specified, the primary key will be used; if there is no primary key, the full row will be used.
changelog-producer
none

Enum

Whether to double write to a changelog file. This changelog file keeps the details of data changes, it can be read directly during stream reads.

Possible values:
  • "none": No changelog file.
  • "input": Double write to a changelog file when flushing memory table, the changelog is from input.
commit.force-compact
false Boolean Whether to force a compaction before commit.
compaction.early-max.file-num
50 Integer For file set [f_0,...,f_N], the maximum file number to trigger a compaction for append-only table, even if sum(size(f_i)) < targetFileSize. This value avoids pending too much small files, which slows down the performance.
compaction.max-size-amplification-percent
200 Integer The size amplification is defined as the amount (in percentage) of additional storage needed to store a single byte of data in the merge tree for changelog mode table.
compaction.max-sorted-run-num
2147483647 Integer The maximum sorted run number to pick for compaction. This value avoids merging too much sorted runs at the same time during compaction, which may lead to OutOfMemoryError.
compaction.min.file-num
5 Integer For file set [f_0,...,f_N], the minimum file number which satisfies sum(size(f_i)) >= targetFileSize to trigger a compaction for append-only table. This value avoids almost-full-file to be compacted, which is not cost-effective.
compaction.size-ratio
1 Integer Percentage flexibility while comparing sorted run size for changelog mode table. If the candidate sorted run(s) size is 1% smaller than the next sorted run's size, then include next sorted run into this candidate set.
continuous.discovery-interval
1 s Duration The discovery interval of continuous reading.
file.format
"orc" String Specify the message format of data files.
log.changelog-mode
auto

Enum

Specify the log changelog mode for table.

Possible values:
  • "auto": Upsert for table with primary key, all for table without primary key..
  • "all": The log system stores all changes including UPDATE_BEFORE.
  • "upsert": The log system does not store the UPDATE_BEFORE changes, the log consumed job will automatically add the normalized node, relying on the state to generate the required update_before.
log.consistency
transactional

Enum

Specify the log consistency mode for table.

Possible values:
  • "transactional": Only the data after the checkpoint can be seen by readers, the latency depends on checkpoint interval.
  • "eventual": Immediate data visibility, you may see some intermediate states, but eventually the right results will be produced, only works for table with primary key.
log.format
"debezium-json" String Specify the message format of log system.
log.key.format
"json" String Specify the key message format of log system with primary key.
log.retention
(none) Duration It means how long changes log will be kept. The default value is from the log system cluster.
log.scan
full

Enum

Specify the startup mode for log consumer.

Possible values:
  • "full": Perform a snapshot on the table upon first startup, and continue to read the latest changes.
  • "latest": Start from the latest.
  • "from-timestamp": Start from user-supplied timestamp.
log.scan.timestamp-millis
(none) Long Optional timestamp used in case of "from-timestamp" scan mode
manifest.format
"avro" String Specify the message format of manifest files.
manifest.merge-min-count
30 Integer To avoid frequent manifest merges, this parameter specifies the minimum number of ManifestFileMeta to merge.
manifest.target-file-size
8 mb MemorySize Suggested file size of a manifest file.
merge-engine
deduplicate

Enum

Specify the merge engine for table with primary key.

Possible values:
  • "deduplicate": De-duplicate and keep the last row.
  • "partial-update": Partial update non-null fields.
num-levels
(none) Integer Total level number, for example, there are 3 levels, including 0,1,2 levels.
num-sorted-run.compaction-trigger
5 Integer The sorted run number to trigger compaction. Includes level0 files (one file one sorted run) and high-level runs (one level one sorted run).
num-sorted-run.stop-trigger
(none) Integer The number of sorted runs that trigger the stopping of writes, the default value is 'num-sorted-run.compaction-trigger' + 1.
page-size
64 kb MemorySize Memory page size.
partition.default-name
"__DEFAULT_PARTITION__" String The default partition name in case the dynamic partition column value is null/empty string.
sequence.field
(none) String The field that generates the sequence number for primary key table, the sequence number determines which data is the most recent.
snapshot.num-retained.max
2147483647 Integer The maximum number of completed snapshots to retain.
snapshot.num-retained.min
10 Integer The minimum number of completed snapshots to retain.
snapshot.time-retained
1 h Duration The maximum time of completed snapshots to retain.
source.split.open-file-cost
4 mb MemorySize Open file cost of a source file. It is used to avoid reading too many files with a source split, which can be very slow.
source.split.target-size
128 mb MemorySize Target size of a source split when scanning a bucket.
target-file-size
128 mb MemorySize Target size of a file.
write-buffer-size
256 mb MemorySize Amount of data to build up in memory before converting to a sorted on-disk file.
write-mode
change-log

Enum

Specify the write mode for table.

Possible values:
  • "append-only": The table can only accept append-only insert operations. Neither data deduplication nor any primary key constraints will be done when inserting rows into table store.
  • "change-log": The table can accept insert/delete/update operations.

CatalogOptions #

Options for table store catalog.

Key Default Type Description
lock-acquire-timeout
8 min Duration The maximum time to wait for acquiring the lock.
lock-check-max-sleep
8 s Duration The maximum sleep time when retrying to check the lock.
lock.enabled
false Boolean Enable Catalog Lock.
metastore
"filesystem" String Metastore of table store catalog, supports filesystem and hive.
uri
(none) String Uri of metastore server.
warehouse
(none) String The warehouse root path of catalog.

FlinkConnectorOptions #

Flink connector options for table store.

Key Default Type Description
log.system
(none) String The log system used to keep changes of the table.
scan.parallelism
(none) Integer Define a custom parallelism for the scan source. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration.
sink.parallelism
(none) Integer Defines a custom parallelism for the sink. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration.

KafkaLogOptions #

Kafka log options provided after configuring log.system for kafka.

Key Default Type Description
kafka.bootstrap.servers
(none) String Required Kafka server connection string
kafka.topic
(none) String Topic of this kafka table.