This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
SQL Hints #
SQL hints can be used with SQL statements to alter execution plans. This chapter explains how to use hints to force various approaches.
Generally a hint can be used to:
- Enforce planner: there’s no perfect planner, so it makes sense to implement hints to allow user better control the execution;
- Append meta data(or statistics): some statistics like “table index for scan” and “skew info of some shuffle keys” are somewhat dynamic for the query, it would be very convenient to config them with hints because our planning metadata from the planner is very often not that accurate;
- Operator resource constraints: for many cases, we would give a default resource configuration for the execution operators, i.e. min parallelism or managed memory (resource consuming UDF) or special resource requirement (GPU or SSD disk) and so on, it would be very flexible to profile the resource with hints per query(instead of the Job).
Dynamic Table Options #
Dynamic table options allows to specify or override table options dynamically, different with static table options defined with SQL DDL or connect API, these options can be specified flexibly in per-table scope within each query.
Thus it is very suitable to use for the ad-hoc queries in interactive terminal, for example, in the SQL-CLI,
you can specify to ignore the parse error for a CSV source just by adding a dynamic option
/*+ OPTIONS('csv.ignore-parse-errors'='true') */.
In order to not break the SQL compatibility, we use the Oracle style SQL hint syntax:
table_path /*+ OPTIONS(key=val [, key=val]*) */ key: stringLiteral val: stringLiteral
CREATE TABLE kafka_table1 (id BIGINT, name STRING, age INT) WITH (...); CREATE TABLE kafka_table2 (id BIGINT, name STRING, age INT) WITH (...); -- override table options in query source select id, name from kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */; -- override table options in join select * from kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t1 join kafka_table2 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t2 on t1.id = t2.id; -- override table options for INSERT target table insert into kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */ select * from kafka_table2;