This documentation is for an unreleased version of Apache Flink Table Store. We recommend you use the latest stable version.
Configuration
Configuration #
CoreOptions #
Core options for table store.
Key | Default | Type | Description |
---|---|---|---|
auto-create |
false | Boolean | Whether to create underlying storage when reading and writing the table. |
bucket |
1 | Integer | Bucket number for file store. |
bucket-key |
(none) | String | Specify the table store distribution policy. Data is assigned to each bucket according to the hash value of bucket-key. If you specify multiple fields, delimiter is ','. If not specified, the primary key will be used; if there is no primary key, the full row will be used. |
changelog-producer |
none | Enum |
Whether to double write to a changelog file. This changelog file keeps the details of data changes, it can be read directly during stream reads. Possible values:
|
changelog-producer.compaction-interval |
30 min | Duration | When changelog-producer is set to FULL_COMPACTION, full compaction will be constantly triggered after this interval. |
commit.force-compact |
false | Boolean | Whether to force a compaction before commit. |
compaction.early-max.file-num |
50 | Integer | For file set [f_0,...,f_N], the maximum file number to trigger a compaction for append-only table, even if sum(size(f_i)) < targetFileSize. This value avoids pending too much small files, which slows down the performance. |
compaction.max-size-amplification-percent |
200 | Integer | The size amplification is defined as the amount (in percentage) of additional storage needed to store a single byte of data in the merge tree for changelog mode table. |
compaction.max-sorted-run-num |
2147483647 | Integer | The maximum sorted run number to pick for compaction. This value avoids merging too much sorted runs at the same time during compaction, which may lead to OutOfMemoryError. |
compaction.min.file-num |
5 | Integer | For file set [f_0,...,f_N], the minimum file number which satisfies sum(size(f_i)) >= targetFileSize to trigger a compaction for append-only table. This value avoids almost-full-file to be compacted, which is not cost-effective. |
compaction.size-ratio |
1 | Integer | Percentage flexibility while comparing sorted run size for changelog mode table. If the candidate sorted run(s) size is 1% smaller than the next sorted run's size, then include next sorted run into this candidate set. |
continuous.discovery-interval |
1 s | Duration | The discovery interval of continuous reading. |
file.format |
"orc" | String | Specify the message format of data files. |
local-sort.max-num-file-handles |
128 | Integer | The maximal fan-in for external merge sort. It limits the number of file handles. If it is too small, may cause intermediate merging. But if it is too large, it will cause too many files opened at the same time, consume memory and lead to random reading. |
log.changelog-mode |
auto | Enum |
Specify the log changelog mode for table. Possible values:
|
log.consistency |
transactional | Enum |
Specify the log consistency mode for table. Possible values:
|
log.format |
"debezium-json" | String | Specify the message format of log system. |
log.key.format |
"json" | String | Specify the key message format of log system with primary key. |
log.retention |
(none) | Duration | It means how long changes log will be kept. The default value is from the log system cluster. |
log.scan.remove-normalize |
false | Boolean | Whether to force the removal of the normalize node when streaming read. Note: This is dangerous and is likely to cause data errors if downstream is used to calculate aggregation and the input is not complete changelog. |
manifest.format |
"avro" | String | Specify the message format of manifest files. |
manifest.merge-min-count |
30 | Integer | To avoid frequent manifest merges, this parameter specifies the minimum number of ManifestFileMeta to merge. |
manifest.target-file-size |
8 mb | MemorySize | Suggested file size of a manifest file. |
merge-engine |
deduplicate | Enum |
Specify the merge engine for table with primary key. Possible values:
|
num-levels |
(none) | Integer | Total level number, for example, there are 3 levels, including 0,1,2 levels. |
num-sorted-run.compaction-trigger |
5 | Integer | The sorted run number to trigger compaction. Includes level0 files (one file one sorted run) and high-level runs (one level one sorted run). |
num-sorted-run.stop-trigger |
(none) | Integer | The number of sorted runs that trigger the stopping of writes, the default value is 'num-sorted-run.compaction-trigger' + 1. |
page-size |
64 kb | MemorySize | Memory page size. |
partial-update.ignore-delete |
false | Boolean | Whether to ignore delete records in partial-update mode. |
partition.default-name |
"__DEFAULT_PARTITION__" | String | The default partition name in case the dynamic partition column value is null/empty string. |
scan.mode |
default | Enum |
Specify the scanning behavior of the source. Possible values:
|
scan.timestamp-millis |
(none) | Long | Optional timestamp used in case of "from-timestamp" scan mode. |
sequence.field |
(none) | String | The field that generates the sequence number for primary key table, the sequence number determines which data is the most recent. |
snapshot.num-retained.max |
2147483647 | Integer | The maximum number of completed snapshots to retain. |
snapshot.num-retained.min |
10 | Integer | The minimum number of completed snapshots to retain. |
snapshot.time-retained |
1 h | Duration | The maximum time of completed snapshots to retain. |
source.split.open-file-cost |
4 mb | MemorySize | Open file cost of a source file. It is used to avoid reading too many files with a source split, which can be very slow. |
source.split.target-size |
128 mb | MemorySize | Target size of a source split when scanning a bucket. |
target-file-size |
128 mb | MemorySize | Target size of a file. |
write-buffer-size |
256 mb | MemorySize | Amount of data to build up in memory before converting to a sorted on-disk file. |
write-buffer-spillable |
(none) | Boolean | Whether the write buffer can be spillable. Enabled by default when using object storage. |
write-mode |
change-log | Enum |
Specify the write mode for table. Possible values:
|
write.compaction-skip |
false | Boolean | Whether to skip compaction on write. |
CatalogOptions #
Options for table store catalog.
Key | Default | Type | Description |
---|---|---|---|
lock-acquire-timeout |
8 min | Duration | The maximum time to wait for acquiring the lock. |
lock-check-max-sleep |
8 s | Duration | The maximum sleep time when retrying to check the lock. |
lock.enabled |
false | Boolean | Enable Catalog Lock. |
metastore |
"filesystem" | String | Metastore of table store catalog, supports filesystem and hive. |
table.type |
MANAGED_TABLE | Enum |
Type of table. Possible values:
|
uri |
(none) | String | Uri of metastore server. |
warehouse |
(none) | String | The warehouse root path of catalog. |
FlinkConnectorOptions #
Flink connector options for table store.
Key | Default | Type | Description |
---|---|---|---|
log.system |
"none" | String | The log system used to keep changes of the table. Possible values:
|
scan.parallelism |
(none) | Integer | Define a custom parallelism for the scan source. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration. |
sink.parallelism |
(none) | Integer | Defines a custom parallelism for the sink. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration. |
KafkaLogOptions #
Kafka log options provided after configuring log.system
for kafka.
Key | Default | Type | Description |
---|---|---|---|
kafka.bootstrap.servers |
(none) | String | Required Kafka server connection string. |
kafka.topic |
(none) | String | Topic of this kafka table. |