配置

配置 #

Table 和 SQL API 的默认配置能够确保结果准确,同时也提供可接受的性能。

根据 Table 程序的需求,可能需要调整特定的参数用于优化。例如,无界流程序可能需要保证所需的状态是有限的(请参阅 流式概念).

概览 #

当实例化一个 TableEnvironment 时,可以使用 EnvironmentSettings 来传递用于当前会话的所期望的配置项 —— 传递一个 Configuration 对象到 EnvironmentSettings

此外,在每个 TableEnvironment 中,TableConfig 提供用于当前会话的配置项。

对于常见或者重要的配置项,TableConfig 提供带有详细注释的 getterssetters 方法。

对于更加高级的配置,用户可以直接访问底层的 key-value 配置项。以下章节列举了所有可用于调整 Flink Table 和 SQL API 程序的配置项。

注意 因为配置项会在执行操作的不同时间点被读取,所以推荐在实例化 TableEnvironment 后尽早地设置配置项。

// instantiate table environment
Configuration configuration = new Configuration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
EnvironmentSettings settings = EnvironmentSettings.newInstance()
        .inStreamingMode().withConfiguration(configuration).build();
TableEnvironment tEnv = TableEnvironment.create(settings);

// access flink configuration after table environment instantiation
TableConfig tableConfig = tEnv.getConfig();
// set low-level key-value options
tableConfig.set("table.exec.mini-batch.enabled", "true");
tableConfig.set("table.exec.mini-batch.allow-latency", "5 s");
tableConfig.set("table.exec.mini-batch.size", "5000");
// instantiate table environment
val configuration = new Configuration;
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true")
configuration.setString("table.exec.mini-batch.allow-latency", "5 s")
configuration.setString("table.exec.mini-batch.size", "5000")
val settings = EnvironmentSettings.newInstance
  .inStreamingMode.withConfiguration(configuration).build
val tEnv: TableEnvironment = TableEnvironment.create(settings)

// access flink configuration after table environment instantiation
val tableConfig = tEnv.getConfig()
// set low-level key-value options
tableConfig.set("table.exec.mini-batch.enabled", "true")
tableConfig.set("table.exec.mini-batch.allow-latency", "5 s")
tableConfig.set("table.exec.mini-batch.size", "5000")
# instantiate table environment
configuration = Configuration()
configuration.set("table.exec.mini-batch.enabled", "true")
configuration.set("table.exec.mini-batch.allow-latency", "5 s")
configuration.set("table.exec.mini-batch.size", "5000")
settings = EnvironmentSettings.new_instance() \
...     .in_streaming_mode() \
...     .with_configuration(configuration) \
...     .build()

t_env = TableEnvironment.create(settings)

# access flink configuration after table environment instantiation
table_config = t_env.get_config()
# set low-level key-value options
table_config.set("table.exec.mini-batch.enabled", "true")
table_config.set("table.exec.mini-batch.allow-latency", "5 s")
table_config.set("table.exec.mini-batch.size", "5000")
Flink SQL> SET 'table.exec.mini-batch.enabled' = 'true';
Flink SQL> SET 'table.exec.mini-batch.allow-latency' = '5s';
Flink SQL> SET 'table.exec.mini-batch.size' = '5000';

执行配置 #

以下选项可用于优化查询执行的性能。

Key Default Type Description
table.exec.async-lookup.buffer-capacity

Batch Streaming
100 Integer The max number of async i/o operation that the async lookup join can trigger.
table.exec.async-lookup.output-mode

Batch Streaming
ORDERED

Enum

Output mode for asynchronous operations which will convert to {@see AsyncDataStream.OutputMode}, ORDERED by default. If set to ALLOW_UNORDERED, will attempt to use {@see AsyncDataStream.OutputMode.UNORDERED} when it does not affect the correctness of the result, otherwise ORDERED will be still used.

Possible values:
  • "ORDERED"
  • "ALLOW_UNORDERED"
table.exec.async-lookup.timeout

Batch Streaming
3 min Duration The async timeout for the asynchronous operation to complete.
table.exec.async-scalar.buffer-capacity

Streaming
10 Integer The max number of async i/o operation that the async lookup join can trigger.
table.exec.async-scalar.max-attempts

Streaming
3 Integer The max number of async retry attempts to make before task execution is failed.
table.exec.async-scalar.retry-delay

Streaming
100 ms Duration The delay to wait before trying again.
table.exec.async-scalar.retry-strategy

Streaming
FIXED_DELAY

Enum

Restart strategy which will be used, FIXED_DELAY by default.

Possible values:
  • "NO_RETRY"
  • "FIXED_DELAY"
table.exec.async-scalar.timeout

Streaming
3 min Duration The async timeout for the asynchronous operation to complete.
table.exec.async-state.enabled

Streaming
false Boolean Set whether to use the SQL/Table operators based on the asynchronous state api. Default value is false.
table.exec.deduplicate.insert-update-after-sensitive-enabled

Streaming
true Boolean Set whether the job (especially the sinks) is sensitive to INSERT messages and UPDATE_AFTER messages. If false, Flink may, sometimes (e.g. deduplication for last row), send UPDATE_AFTER instead of INSERT for the first row. If true, Flink will guarantee to send INSERT for the first row, in that case there will be additional overhead. Default is true.
table.exec.deduplicate.mini-batch.compact-changes-enabled

Streaming
false Boolean Set whether to compact the changes sent downstream in row-time mini-batch. If true, Flink will compact changes and send only the latest change downstream. Note that if the downstream needs the details of versioned data, this optimization cannot be applied. If false, Flink will send all changes to downstream just like when the mini-batch is not enabled.
table.exec.disabled-operators

Batch
(none) String Mainly for testing. A comma-separated list of operator names, each name represents a kind of disabled operator. Operators that can be disabled include "NestedLoopJoin", "ShuffleHashJoin", "BroadcastHashJoin", "SortMergeJoin", "HashAgg", "SortAgg". By default no operator is disabled.
table.exec.interval-join.min-cleanup-interval

Streaming
0 ms Duration Specifies a minimum time interval for how long cleanup unmatched records in the interval join operator. Before Flink 1.18, the default value of this param was the half of interval duration. Note: Set this option greater than 0 will cause unmatched records in outer joins to be output later than watermark, leading to possible discarding of these records by downstream watermark-dependent operators, such as window operators. The default value is 0, which means it will clean up unmatched records immediately.
table.exec.legacy-cast-behaviour

Batch Streaming
DISABLED

Enum

Determines whether CAST will operate following the legacy behaviour or the new one that introduces various fixes and improvements.

Possible values:
  • "ENABLED": CAST will operate following the legacy behaviour.
  • "DISABLED": CAST will operate following the new correct behaviour.
table.exec.local-hash-agg.adaptive.distinct-value-rate-threshold

Batch
0.5 Double The distinct value rate can be defined as the number of local aggregation results for the sampled data divided by the sampling threshold (see table.exec.local-hash-agg.adaptive.sampling-threshold). If the computed result is lower than the given configuration value, the remaining input records proceed to do local aggregation, otherwise the remaining input records are subjected to simple projection which calculation cost is less than local aggregation. The default value is 0.5.
table.exec.local-hash-agg.adaptive.enabled

Batch
true Boolean Whether to enable adaptive local hash aggregation. Adaptive local hash aggregation is an optimization of local hash aggregation, which can adaptively determine whether to continue to do local hash aggregation according to the distinct value rate of sampling data. If distinct value rate bigger than defined threshold (see parameter: table.exec.local-hash-agg.adaptive.distinct-value-rate-threshold), we will stop aggregating and just send the input data to the downstream after a simple projection. Otherwise, we will continue to do aggregation. Adaptive local hash aggregation only works in batch mode. Default value of this parameter is true.
table.exec.local-hash-agg.adaptive.sampling-threshold

Batch
500000 Long If adaptive local hash aggregation is enabled, this value defines how many records will be used as sampled data to calculate distinct value rate (see parameter: table.exec.local-hash-agg.adaptive.distinct-value-rate-threshold) for the local aggregate. The higher the sampling threshold, the more accurate the distinct value rate is. But as the sampling threshold increases, local aggregation is meaningless when the distinct values rate is low. The default value is 500000.
table.exec.mini-batch.allow-latency

Streaming
0 ms Duration The maximum latency can be used for MiniBatch to buffer input records. MiniBatch is an optimization to buffer input records to reduce state access. MiniBatch is triggered with the allowed latency interval and when the maximum number of buffered records reached. NOTE: If table.exec.mini-batch.enabled is set true, its value must be greater than zero.
table.exec.mini-batch.enabled

Streaming
false Boolean Specifies whether to enable MiniBatch optimization. MiniBatch is an optimization to buffer input records to reduce state access. This is disabled by default. To enable this, users should set this config to true. NOTE: If mini-batch is enabled, 'table.exec.mini-batch.allow-latency' and 'table.exec.mini-batch.size' must be set.
table.exec.mini-batch.size

Streaming
-1 Long The maximum number of input records can be buffered for MiniBatch. MiniBatch is an optimization to buffer input records to reduce state access. MiniBatch is triggered with the allowed latency interval and when the maximum number of buffered records reached. NOTE: MiniBatch only works for non-windowed aggregations currently. If table.exec.mini-batch.enabled is set true, its value must be positive.
table.exec.operator-fusion-codegen.enabled

Batch Streaming
false Boolean If true, multiple physical operators will be compiled into a single operator by planner which can improve the performance.
table.exec.rank.topn-cache-size

Streaming
10000 Long Rank operators have a cache which caches partial state contents to reduce state access. Cache size is the number of records in each ranking task.
table.exec.resource.default-parallelism

Batch Streaming
-1 Integer Sets default parallelism for all operators (such as aggregate, join, filter) to run with parallel instances. This config has a higher priority than parallelism of StreamExecutionEnvironment (actually, this config overrides the parallelism of StreamExecutionEnvironment). A value of -1 indicates that no default parallelism is set, then it will fallback to use the parallelism of StreamExecutionEnvironment.
table.exec.simplify-operator-name-enabled

Batch Streaming
true Boolean When it is true, the optimizer will simplify the operator name with id and type of ExecNode and keep detail in description. Default value is true.
table.exec.sink.keyed-shuffle

Streaming
AUTO

Enum

In order to minimize the distributed disorder problem when writing data into table with primary keys that many users suffers. FLINK will auto add a keyed shuffle by default when the sink parallelism differs from upstream operator and sink parallelism is not 1. This works only when the upstream ensures the multi-records' order on the primary key, if not, the added shuffle can not solve the problem (In this situation, a more proper way is to consider the deduplicate operation for the source firstly or use an upsert source with primary key definition which truly reflect the records evolution).
By default, the keyed shuffle will be added when the sink's parallelism differs from upstream operator. You can set to no shuffle(NONE) or force shuffle(FORCE).

Possible values:
  • "NONE"
  • "AUTO"
  • "FORCE"
table.exec.sink.not-null-enforcer

Batch Streaming
ERROR

Enum

Determines how Flink enforces NOT NULL column constraints when inserting null values.

Possible values:
  • "ERROR": Throw a runtime exception when writing null values into NOT NULL column.
  • "DROP": Drop records silently if a null value would have to be inserted into a NOT NULL column.
table.exec.sink.rowtime-inserter

Streaming
ENABLED

Enum

Some sink implementations require a single rowtime attribute in the input that can be inserted into the underlying stream record. This option allows disabling the timestamp insertion and avoids errors around multiple time attributes being present in the query schema.

Possible values:
  • "ENABLED": Insert a rowtime attribute (if available) into the underlying stream record. This requires at most one time attribute in the input for the sink.
  • "DISABLED": Do not insert the rowtime attribute into the underlying stream record.
table.exec.sink.type-length-enforcer

Batch Streaming
IGNORE

Enum

Determines whether values for columns with CHAR(<length>)/VARCHAR(<length>)/BINARY(<length>)/VARBINARY(<length>) types will be trimmed or padded (only for CHAR(<length>)/BINARY(<length>)), so that their length will match the one defined by the length of their respective CHAR/VARCHAR/BINARY/VARBINARY column type.

Possible values:
  • "IGNORE": Don't apply any trimming and padding, and instead ignore the CHAR/VARCHAR/BINARY/VARBINARY length directive.
  • "TRIM_PAD": Trim and pad string and binary values to match the length defined by the CHAR/VARCHAR/BINARY/VARBINARY length.
table.exec.sink.upsert-materialize

Streaming
AUTO

Enum

Because of the disorder of ChangeLog data caused by Shuffle in distributed system, the data received by Sink may not be the order of global upsert. So add upsert materialize operator before upsert sink. It receives the upstream changelog records and generate an upsert view for the downstream.
By default, the materialize operator will be added when a distributed disorder occurs on unique keys. You can also choose no materialization(NONE) or force materialization(FORCE).

Possible values:
  • "NONE"
  • "AUTO"
  • "FORCE"
table.exec.sort.async-merge-enabled

Batch
true Boolean Whether to asynchronously merge sorted spill files.
table.exec.sort.default-limit

Batch
-1 Integer Default limit when user don't set a limit after order by. -1 indicates that this configuration is ignored.
table.exec.sort.max-num-file-handles

Batch
128 Integer The maximal fan-in for external merge sort. It limits the number of file handles per operator. If it is too small, may cause intermediate merging. But if it is too large, it will cause too many files opened at the same time, consume memory and lead to random reading.
table.exec.source.cdc-events-duplicate

Streaming
false Boolean Indicates whether the CDC (Change Data Capture) sources in the job will produce duplicate change events that requires the framework to deduplicate and get consistent result. CDC source refers to the source that produces full change events, including INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE, for example Kafka source with Debezium format. The value of this configuration is false by default.

However, it's a common case that there are duplicate change events. Because usually the CDC tools (e.g. Debezium) work in at-least-once delivery when failover happens. Thus, in the abnormal situations Debezium may deliver duplicate change events to Kafka and Flink will get the duplicate events. This may cause Flink query to get wrong results or unexpected exceptions.

Therefore, it is recommended to turn on this configuration if your CDC tool is at-least-once delivery. Enabling this configuration requires to define PRIMARY KEY on the CDC sources. The primary key will be used to deduplicate change events and generate normalized changelog stream at the cost of an additional stateful operator.
table.exec.source.idle-timeout

Streaming
0 ms Duration When a source do not receive any elements for the timeout time, it will be marked as temporarily idle. This allows downstream tasks to advance their watermarks without the need to wait for watermarks from this source while it is idle. Default value is 0, which means detecting source idleness is not enabled.
table.exec.spill-compression.block-size

Batch
64 kb MemorySize The memory size used to do compress when spilling data. The larger the memory, the higher the compression ratio, but more memory resource will be consumed by the job.
table.exec.spill-compression.enabled

Batch
true Boolean Whether to compress spilled data. Currently we only support compress spilled data for sort and hash-agg and hash-join operators.
table.exec.state.ttl

Streaming
0 ms Duration Specifies a minimum time interval for how long idle state (i.e. state which was not updated), will be retained. State will never be cleared until it was idle for less than the minimum time, and will be cleared at some time after it was idle. Default is never clean-up the state. NOTE: Cleaning up state requires additional overhead for bookkeeping. Default value is 0, which means that it will never clean up state.
table.exec.uid.format

Streaming
"<id>_<transformation>" String Defines the format pattern for generating the UID of an ExecNode streaming transformation. The pattern can be defined globally or per-ExecNode in the compiled plan. Supported arguments are: <id> (from static counter), <type> (e.g. 'stream-exec-sink'), <version>, and <transformation> (e.g. 'constraint-validator' for a sink). In Flink 1.15.x the pattern was wrongly defined as '<id>_<type>_<version>_<transformation>' which would prevent migrations in the future.
table.exec.uid.generation

Streaming
PLAN_ONLY

Enum

In order to remap state to operators during a restore, it is required that the pipeline's streaming transformations get a UID assigned.
The planner can generate and assign explicit UIDs. If no UIDs have been set by the planner, the UIDs will be auto-generated by lower layers that can take the complete topology into account for uniqueness of the IDs. See the DataStream API for more information.
This configuration option is for experts only and the default should be sufficient for most use cases. By default, only pipelines created from a persisted compiled plan will get UIDs assigned explicitly. Thus, these pipelines can be arbitrarily moved around within the same topology without affecting the stable UIDs.

Possible values:
  • "PLAN_ONLY": Sets UIDs on streaming transformations if and only if the pipeline definition comes from a compiled plan. Pipelines that have been constructed in the API without a compilation step will not set an explicit UID as it might not be stable across multiple translations.
  • "ALWAYS": Always sets UIDs on streaming transformations. This strategy is for experts only! Pipelines that have been constructed in the API without a compilation step might not be able to be restored properly. The UID generation depends on previously declared pipelines (potentially across jobs if the same JVM is used). Thus, a stable environment must be ensured. Pipeline definitions that come from a compiled plan are safe to use.
  • "DISABLED": No explicit UIDs will be set.
table.exec.window-agg.buffer-size-limit

Batch
100000 Integer Sets the window elements buffer size limit used in group window agg operator.

优化器配置 #

以下配置可以用于调整查询优化器的行为以获得更好的执行计划。

Key Default Type Description
table.optimizer.agg-phase-strategy

Batch Streaming
AUTO

Enum

Strategy for aggregate phase. Only AUTO, TWO_PHASE or ONE_PHASE can be set. AUTO: No special enforcer for aggregate stage. Whether to choose two stage aggregate or one stage aggregate depends on cost. TWO_PHASE: Enforce to use two stage aggregate which has localAggregate and globalAggregate. Note that if aggregate call does not support optimize into two phase, we will still use one stage aggregate. ONE_PHASE: Enforce to use one stage aggregate which only has CompleteGlobalAggregate.

Possible values:
  • "AUTO"
  • "ONE_PHASE"
  • "TWO_PHASE"
table.optimizer.bushy-join-reorder-threshold

Batch Streaming
12 Integer The maximum number of joined nodes allowed in the bushy join reorder algorithm, otherwise the left-deep join reorder algorithm will be used. The search space of bushy join reorder algorithm will increase with the increase of this threshold value, so this threshold is not recommended to be set too large. The default value is 12.
table.optimizer.distinct-agg.split.bucket-num

Streaming
1024 Integer Configure the number of buckets when splitting distinct aggregation. The number is used in the first level aggregation to calculate a bucket key 'hash_code(distinct_key) % BUCKET_NUM' which is used as an additional group key after splitting.
table.optimizer.distinct-agg.split.enabled

Streaming
false Boolean Tells the optimizer whether to split distinct aggregation (e.g. COUNT(DISTINCT col), SUM(DISTINCT col)) into two level. The first aggregation is shuffled by an additional key which is calculated using the hashcode of distinct_key and number of buckets. This optimization is very useful when there is data skew in distinct aggregation and gives the ability to scale-up the job. Default is false.
table.optimizer.dynamic-filtering.enabled

Batch Streaming
true Boolean When it is true, the optimizer will try to push dynamic filtering into scan table source, the irrelevant partitions or input data will be filtered to reduce scan I/O in runtime.
table.optimizer.incremental-agg-enabled

Streaming
true Boolean When both local aggregation and distinct aggregation splitting are enabled, a distinct aggregation will be optimized into four aggregations, i.e., local-agg1, global-agg1, local-agg2, and global-agg2. We can combine global-agg1 and local-agg2 into a single operator (we call it incremental agg because it receives incremental accumulators and outputs incremental results). In this way, we can reduce some state overhead and resources. Default is enabled.
table.optimizer.join-reorder-enabled

Batch Streaming
false Boolean Enables join reorder in optimizer. Default is disabled.
table.optimizer.join.broadcast-threshold

Batch
1048576 Long Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 to disable broadcasting.
table.optimizer.multiple-input-enabled

Batch
true Boolean When it is true, the optimizer will merge the operators with pipelined shuffling into a multiple input operator to reduce shuffling and improve performance. Default value is true.
table.optimizer.non-deterministic-update.strategy

Streaming
IGNORE

Enum

When it is `TRY_RESOLVE`, the optimizer tries to resolve the correctness issue caused by 'Non-Deterministic Updates' (NDU) in a changelog pipeline. Changelog may contain kinds of message types: Insert (I), Delete (D), Update_Before (UB), Update_After (UA). There's no NDU problem in an insert only changelog pipeline. For updates, there are three main NDU problems:
1. Non-deterministic functions, include scalar, table, aggregate functions, both builtin and custom ones.
2. LookupJoin on an evolving source
3. Cdc-source carries metadata fields which are system columns, not belongs to the entity data itself.

For the first step, the optimizer automatically enables the materialization for No.2(LookupJoin) if needed, and gives the detailed error message for No.1(Non-deterministic functions) and No.3(Cdc-source with metadata) which is relatively easier to solve by changing the SQL.
Default value is `IGNORE`, the optimizer does no changes.

Possible values:
  • "TRY_RESOLVE"
  • "IGNORE"
table.optimizer.reuse-optimize-block-with-digest-enabled

Batch Streaming
false Boolean When true, the optimizer will try to find out duplicated sub-plans by digest to build optimize blocks (a.k.a. common sub-graphs). Each optimize block will be optimized independently.
table.optimizer.reuse-source-enabled

Batch Streaming
true Boolean When it is true, the optimizer will try to find out duplicated table sources and reuse them. This works only when table.optimizer.reuse-sub-plan-enabled is true.
table.optimizer.reuse-sub-plan-enabled

Batch Streaming
true Boolean When it is true, the optimizer will try to find out duplicated sub-plans and reuse them.
table.optimizer.runtime-filter.enabled

Batch
false Boolean A flag to enable or disable the runtime filter. When it is true, the optimizer will try to inject a runtime filter for eligible join.
table.optimizer.runtime-filter.max-build-data-size

Batch
150 mb MemorySize Max data volume threshold of the runtime filter build side. Estimated data volume needs to be under this value to try to inject runtime filter.
table.optimizer.runtime-filter.min-filter-ratio

Batch
0.5 Double Min filter ratio threshold of the runtime filter. Estimated filter ratio needs to be over this value to try to inject runtime filter.
table.optimizer.runtime-filter.min-probe-data-size

Batch
10 gb MemorySize Min data volume threshold of the runtime filter probe side. Estimated data volume needs to be over this value to try to inject runtime filter.This value should be larger than table.optimizer.runtime-filter.max-build-data-size.
table.optimizer.source.report-statistics-enabled

Batch Streaming
true Boolean When it is true, the optimizer will collect and use the statistics from source connectors if the source extends from SupportsStatisticReport and the statistics from catalog is UNKNOWN.Default value is true.
table.optimizer.sql2rel.project-merge.enabled

Batch Streaming
false Boolean If set to true, it will merge projects when converting SqlNode to RelNode.
Note: it is not recommended to turn on unless you are aware of possible side effects, such as causing the output of certain non-deterministic expressions to not meet expectations(see FLINK-20887).
table.optimizer.union-all-as-breakpoint-enabled

Batch Streaming
true Boolean When true, the optimizer will breakup the graph at union-all node when it's a breakpoint. When false, the optimizer will skip the union-all node even it's a breakpoint, and will try find the breakpoint in its inputs.

Planner 配置 #

以下配置可以用于调整 planner 的行为。

Key Default Type Description
table.builtin-catalog-name

Batch Streaming
"default_catalog" String The name of the initial catalog to be created when instantiating a TableEnvironment.
table.builtin-database-name

Batch Streaming
"default_database" String The name of the default database in the initial catalog to be created when instantiating TableEnvironment.
table.catalog-modification.listeners

Batch Streaming
(none) List<String> A (semicolon-separated) list of factories that creates listener for catalog modification which will be notified in catalog manager after it performs database and table ddl operations successfully.
table.column-expansion-strategy

Batch Streaming

List<Enum>

Configures the default expansion behavior of 'SELECT *'. By default, all top-level columns of the table's schema are selected and nested fields are retained.

Possible values:
  • "EXCLUDE_ALIASED_VIRTUAL_METADATA_COLUMNS": Excludes virtual metadata columns that reference a metadata key via an alias. For example, a column declared as 'c METADATA VIRTUAL FROM k' is not selected by default if the strategy is applied.
  • "EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS": Excludes virtual metadata columns that directly reference a metadata key. For example, a column declared as 'k METADATA VIRTUAL' is not selected by default if the strategy is applied.
table.display.max-column-width

Batch Streaming
30 Integer When printing the query results to the client console, this parameter determines the number of characters shown on screen before truncating. This only applies to columns with variable-length types (e.g. CHAR, VARCHAR, STRING) in the streaming mode. Fixed-length types are printed in the batch mode using a deterministic column width.
table.dml-sync

Batch Streaming
false Boolean Specifies if the DML job (i.e. the insert operation) is executed asynchronously or synchronously. By default, the execution is async, so you can submit multiple DML jobs at the same time. If set this option to true, the insert operation will wait for the job to finish.
table.dynamic-table-options.enabled

Batch Streaming
true Boolean Enable or disable the OPTIONS hint used to specify table options dynamically, if disabled, an exception would be thrown if any OPTIONS hint is specified
table.generated-code.max-length

Batch Streaming
4000 Integer Specifies a threshold where generated code will be split into sub-function calls. Java has a maximum method length of 64 KB. This setting allows for finer granularity if necessary. Default value is 4000 instead of 64KB as by default JIT refuses to work on methods with more than 8K byte code.
table.local-time-zone

Batch Streaming
"default" String The local time zone defines current session time zone id. It is used when converting to/from <code>TIMESTAMP WITH LOCAL TIME ZONE</code>. Internally, timestamps with local time zone are always represented in the UTC time zone. However, when converting to data types that don't include a time zone (e.g. TIMESTAMP, TIME, or simply STRING), the session time zone is used during conversion. The input of option is either a full name such as "America/Los_Angeles", or a custom timezone id such as "GMT-08:00".
table.plan.compile.catalog-objects

Batch Streaming
ALL

Enum

Strategy how to persist catalog objects such as tables, functions, or data types into a plan during compilation.

It influences the need for catalog metadata to be present during a restore operation and affects the plan size.

This configuration option does not affect anonymous/inline or temporary objects. Anonymous/inline objects will be persisted entirely (including schema and options) if possible or fail the compilation otherwise. Temporary objects will be persisted only by their identifier and the object needs to be present in the session context during a restore.

Possible values:
  • "ALL": All metadata about catalog tables, functions, or data types will be persisted into the plan during compilation. For catalog tables, this includes the table's identifier, schema, and options. For catalog functions, this includes the function's identifier and class. For catalog data types, this includes the identifier and entire type structure. With this strategy, the catalog's metadata doesn't have to be available anymore during a restore operation.
  • "SCHEMA": In addition to an identifier, schema information about catalog tables, functions, or data types will be persisted into the plan during compilation. A schema allows for detecting incompatible changes in the catalog during a plan restore operation. However, all other metadata will still be retrieved from the catalog.
  • "IDENTIFIER": Only the identifier of catalog tables, functions, or data types will be persisted into the plan during compilation. All metadata will be retrieved from the catalog during a restore operation. With this strategy, plans become less verbose.
table.plan.force-recompile

Streaming
false Boolean When false COMPILE PLAN statement will fail if the output plan file is already existing, unless the clause IF NOT EXISTS is used. When true COMPILE PLAN will overwrite the existing output plan file. We strongly suggest to enable this flag only for debugging purpose.
table.plan.restore.catalog-objects

Batch Streaming
ALL

Enum

Strategy how to restore catalog objects such as tables, functions, or data types using a given plan and performing catalog lookups if necessary. It influences the need for catalog metadata to bepresent and enables partial enrichment of plan information.

Possible values:
  • "ALL": Reads all metadata about catalog tables, functions, or data types that has been persisted in the plan. The strategy performs a catalog lookup by identifier to fill in missing information or enrich mutable options. If the original object is not available in the catalog anymore, pipelines can still be restored if all information necessary is contained in the plan.
  • "ALL_ENFORCED": Requires that all metadata about catalog tables, functions, or data types has been persisted in the plan. The strategy will neither perform a catalog lookup by identifier nor enrich mutable options with catalog information. A restore will fail if not all information necessary is contained in the plan.
  • "IDENTIFIER": Uses only the identifier of catalog tables, functions, or data types and always performs a catalog lookup. A restore will fail if the original object is not available in the catalog anymore. Additional metadata that might be contained in the plan will be ignored.
table.resources.download-dir

Batch Streaming
System.getProperty("java.io.tmpdir") String Local directory that is used by planner for storing downloaded resources.
table.rtas-ctas.atomicity-enabled

Batch Streaming
false Boolean Specifies if the CREATE TABLE/REPLACE TABLE/CREATE OR REPLACE AS SELECT statement is executed atomically. By default, the statement is non-atomic. The target table is created/replaced on the client side, and it will not be rolled back even though the job fails or is canceled. If set this option to true and the underlying DynamicTableSink implements the SupportsStaging interface, the statement is expected to be executed atomically, the behavior of which depends on the actual DynamicTableSink.
table.sql-dialect

Batch Streaming
"default" String The SQL dialect defines how to parse a SQL query. A different SQL dialect may support different SQL grammar. Currently supported dialects are: default and hive

Materialized Table 配置 #

以下配置可以用于调整 Materialized Table 的行为。

Key Default Type Description
materialized-table.refresh-mode.freshness-threshold

Batch Streaming
30 min Duration Specifies a time threshold for determining the materialized table refresh mode. If the materialized table defined FRESHNESS is below this threshold, it run in continuous mode. Otherwise, it switches to full refresh mode.
partition.fields.#.date-formatter

Batch Streaming
(none) String Specifies the time partition formatter for the partitioned materialized table, where '#' denotes a string-based partition field name. This serves as a hint to the framework regarding which partition to refresh in full refresh mode.

SQL Client 配置 #

以下配置可以用于调整 sql client 的行为。

Key Default Type Description
sql-client.display.color-schema

Batch Streaming
"DEFAULT" String SQL highlight color schema to be used at SQL client. Possible values: 'default', 'dark', 'light', 'chester', 'vs2010', 'solarized', 'obsidian', 'geshi'
sql-client.display.print-time-cost

Batch
true Boolean Determine whether to display the time consumption of the query. By default, no query time cost will be displayed.
sql-client.display.show-line-numbers

Batch Streaming
false Boolean Determines whether there should be shown line numbers in multiline SQL or not.
sql-client.execution.max-table-result.rows

Batch Streaming
1000000 Integer The number of rows to cache when in the table mode. If the number of rows exceeds the specified value, it retries the row in the FIFO style.
sql-client.execution.result-mode

Batch Streaming
TABLE

Enum

Determines how the query result should be displayed.

Possible values:
  • "TABLE": Materializes results in memory and visualizes them in a regular, paginated table representation.
  • "CHANGELOG": Visualizes the result stream that is produced by a continuous query.
  • "TABLEAU": Display results in the screen directly in a tableau format.
sql-client.verbose

Batch Streaming
false Boolean Determine whether to output the verbose output to the console. If set the option true, it will print the exception stack. Otherwise, it only output the cause.