This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
EXPLAIN 语句 #
EXPLAIN 语句用于解释 query 或 INSERT 语句的执行逻辑,也用于优化 query 语句的查询计划。
执行 EXPLAIN 语句 #
可以使用 TableEnvironment
的 executeSql()
方法执行 EXPLAIN 语句。如果 EXPLAIN 操作执行成功,executeSql()
方法会返回解释结果,否则会抛出异常。
以下示例展示了如何在 TableEnvironment
中执行一条 EXPLAIN 语句。
可以使用 TableEnvironment
的 executeSql()
方法执行 EXPLAIN 语句。如果 EXPLAIN 操作执行成功,executeSql()
方法会返回解释结果,否则会抛出异常。
以下示例展示了如何在 TableEnvironment
中执行一条 EXPLAIN 语句。
可以使用 TableEnvironment
的 execute_sql()
方法执行 EXPLAIN 语句。如果 EXPLAIN 操作执行成功,execute_sql()
方法会返回解释结果,否则会抛出异常。
以下示例展示了如何在 TableEnvironment
中执行一条 EXPLAIN 语句。
EXPLAIN 语句可以在 SQL CLI 中执行。
以下示例展示了如何在 SQL CLI 中执行一条 EXPLAIN 语句。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 注册名为 “Orders” 的表
tEnv.executeSql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')");
tEnv.executeSql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')");
// 调用 TableEnvironment.explainSql() 来解释 SELECT 语句
String explanation =
tEnv.explainSql(
"SELECT `count`, COUNT(word) FROM ("
+ "MyTable1 WHERE word LIKE 'F%' "
+ "UNION ALL "
+ "SELECT `count`, word FROM MyTable2) tmp"
+ "GROUP BY `count`");
System.out.println(explanation);
// 调用 TableEnvironment.executeSql() 来解释 SELECT 语句
TableResult tableResult =
tEnv.executeSql(
"EXPLAIN PLAN FOR "
+ "SELECT `count`, COUNT(word) FROM ("
+ "MyTable1 WHERE word LIKE 'F%' "
+ "UNION ALL "
+ "SELECT `count`, word FROM MyTable2) tmp GROUP BY `count`");
tableResult.print();
TableResult tableResult2 =
tEnv.executeSql(
"EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN "
+ "SELECT `count`, COUNT(word) FROM ("
+ "MyTable1 WHERE word LIKE 'F%' "
+ "UNION ALL "
+ "SELECT `count`, word FROM MyTable2) tmp GROUP BY `count`");
tableResult2.print();
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val tEnv = StreamTableEnvironment.create(env)
// 注册名为 “Orders” 的表
tEnv.executeSql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')")
tEnv.executeSql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')")
// 调用 TableEnvironment.explainSql() 来解释 SELECT 语句
val explanation = tEnv.explainSql(
"""
|SELECT `count`, COUNT(word)
|FROM (
| SELECT `count`, word FROM MyTable1
| WHERE word LIKE 'F%'
| UNION ALL
| SELECT `count`, word FROM MyTable2 ) tmp
|GROUP BY `count`
|""".stripMargin)
println(explanation)
// 调用 TableEnvironment.executeSql() 来解释 SELECT 语句
val tableResult = tEnv.executeSql(
"""
|EXPLAIN PLAN FOR
|SELECT `count`, COUNT(word)
|FROM (
| SELECT `count`, word FROM MyTable1
| WHERE word LIKE 'F%'
| UNION ALL
| SELECT `count`, word FROM MyTable2 ) tmp
|GROUP BY `count`
|""".stripMargin)
tableResult.print()
val tableResult2 = tEnv.executeSql(
"""
|EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN
|SELECT `count`, COUNT(word)
|FROM (
| SELECT `count`, word FROM MyTable1
| WHERE word LIKE 'F%'
| UNION ALL
| SELECT `count`, word FROM MyTable2 ) tmp
|GROUP BY `count`
|""".stripMargin)
tableResult2.print()
settings = EnvironmentSettings.new_instance()...
table_env = StreamTableEnvironment.create(env, settings)
t_env.execute_sql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')")
t_env.execute_sql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')")
# 调用 TableEnvironment.explain_sql() 来解释 SELECT 语句
explanation1 = t_env.explain_sql(
"SELECT `count`, COUNT(word) FROM ("
"MyTable1 WHERE word LIKE 'F%' "
"UNION ALL "
"SELECT `count`, word FROM MyTable2) tmp GROUP BY `count`")
print(explanation1)
# 调用 TableEnvironment.execute_sql() 来解释 SELECT 语句
table_result = t_env.execute_sql(
"EXPLAIN PLAN FOR "
"SELECT `count`, COUNT(word) FROM ("
"MyTable1 WHERE word LIKE 'F%' "
"UNION ALL "
"SELECT `count`, word FROM MyTable2) tmp GROUP BY `count`")
table_result.print()
table_result2 = t_env.execute_sql(
"EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN "
"SELECT `count`, COUNT(word) FROM ("
"MyTable1 WHERE word LIKE 'F%' "
"UNION ALL "
"SELECT `count`, word FROM MyTable2) tmp GROUP BY `count`")
table_result2.print()
Flink SQL> CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen');
[INFO] Table has been created.
Flink SQL> CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen');
[INFO] Table has been created.
Flink SQL> EXPLAIN PLAN FOR SELECT `count`, COUNT(word) FROM
> ( SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%'
> UNION ALL
> SELECT `count`, word FROM MyTable2 ) tmp GROUP BY `count`;
Flink SQL> EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN SELECT `count`, COUNT(word) FROM
> ( SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%'
> UNION ALL
> SELECT `count`, word FROM MyTable2 ) tmp GROUP BY `count`;
EXPLAIN
的结果如下:
== Abstract Syntax Tree ==
LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
+- LogicalUnion(all=[true])
:- LogicalProject(count=[$0], word=[$1])
: +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+- LogicalProject(count=[$0], word=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
== Optimized Physical Plan ==
GroupAggregate(groupBy=[count], select=[count, COUNT(word) AS EXPR$1])
+- Exchange(distribution=[hash[count]])
+- Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
: +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])
== Optimized Execution Plan ==
GroupAggregate(groupBy=[count], select=[count, COUNT(word) AS EXPR$1])
+- Exchange(distribution=[hash[count]])
+- Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, 'F%')])
: +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])
== Abstract Syntax Tree ==
LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
+- LogicalUnion(all=[true])
:- LogicalProject(count=[$0], word=[$1])
: +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+- LogicalProject(count=[$0], word=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
== Optimized Physical Plan With Advice ==
GroupAggregate(advice=[1], groupBy=[count], select=[count, COUNT(word) AS EXPR$1], changelogMode=[I,UA]): rowcount = 1.05E8, cumulative cost = {5.2E8 rows, 1.805E10 cpu, 4.0E9 io, 2.1E9 network, 0.0 memory}
+- Exchange(distribution=[hash[count]], changelogMode=[I]): rowcount = 1.05E8, cumulative cost = {4.15E8 rows, 1.7945E10 cpu, 4.0E9 io, 2.1E9 network, 0.0 memory}
+- Union(all=[true], union=[count, word], changelogMode=[I]): rowcount = 1.05E8, cumulative cost = {3.1E8 rows, 3.05E8 cpu, 4.0E9 io, 0.0 network, 0.0 memory}
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')], changelogMode=[I]): rowcount = 5000000.0, cumulative cost = {1.05E8 rows, 1.0E8 cpu, 2.0E9 io, 0.0 network, 0.0 memory}
: +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.0E9 io, 0.0 network, 0.0 memory}
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.0E9 io, 0.0 network, 0.0 memory}
advice[1]: [ADVICE] You might want to enable local-global two-phase optimization by configuring ('table.exec.mini-batch.enabled' to 'true', 'table.exec.mini-batch.allow-latency' to a positive long value, 'table.exec.mini-batch.size' to a positive long value).
== Optimized Execution Plan ==
GroupAggregate(groupBy=[count], select=[count, COUNT(word) AS EXPR$1])
+- Exchange(distribution=[hash[count]])
+- Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, 'F%')])
: +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])
== Physical Execution Plan ==
{
"nodes" : [ {
"id" : 17,
"type" : "Source: MyTable1[15]",
"pact" : "Data Source",
"contents" : "[15]:TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])",
"parallelism" : 1
}, {
"id" : 18,
"type" : "Calc[16]",
"pact" : "Operator",
"contents" : "[16]:Calc(select=[count, word], where=[LIKE(word, 'F%')])",
"parallelism" : 1,
"predecessors" : [ {
"id" : 17,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 19,
"type" : "Source: MyTable2[17]",
"pact" : "Data Source",
"contents" : "[17]:TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])",
"parallelism" : 1
}, {
"id" : 22,
"type" : "GroupAggregate[20]",
"pact" : "Operator",
"contents" : "[20]:GroupAggregate(groupBy=[count], select=[count, COUNT(word) AS EXPR$1])",
"parallelism" : 1,
"predecessors" : [ {
"id" : 18,
"ship_strategy" : "HASH",
"side" : "second"
}, {
"id" : 19,
"ship_strategy" : "HASH",
"side" : "second"
} ]
} ]
}
ExplainDetails #
使用指定的 ExplainDetail
类型来打印语句的计划。
ESTIMATED_COST
指定 ESTIMATED_COST
将使得优化器(optimizer)将估算出的成本信息附加在每个物理节点上输出。
== Optimized Physical Plan ==
TableSourceScan(..., cumulative cost ={1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory})
CHANGELOG_MODE
指定 CHANGELOG_MODE
将使得优化器(optimizer)将 changelog mode 附加在每个物理节点上输出。
关于 changelog mode 更多信息请参阅 动态表。
== Optimized Physical Plan ==
GroupAggregate(..., changelogMode=[I,UA,D])
PLAN_ADVICE
从 Flink 1.17 版本开始支持 PLAN_ADVICE
。
指定 PLAN_ADVICE
将使得优化器(optimizer)分析优化后的物理执行计划并提供潜在的数据风险预警或性能调优建议。
此时输出标题将会从 “Optimized Physical Plan” 变为 “Optimized Physical Plan with Advice” 作为提示。
针对物理计划的建议按照 类型 和 范围 来区分。
建议类型 | 说明 |
---|---|
WARNING | 给出潜在的数据正确性风险 |
ADVICE | 给出可能的 SQL 调优建议 |
建议范围 | 说明 |
---|---|
QUERY_LEVEL | 针对整个 SQL 的建议 |
NODE_LEVEL | 针对单个物理节点的建议 |
PLAN_ADVICE
提供针对如下问题的建议
若检测到分组聚合可以启用两阶段优化但未开启时,优化器(optimizer)将会把建议 id 附在 GroupAggregate
节点内作为索引,在最后附上建议内容。
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '5s';
SET 'table.exec.mini-batch.size' = '200';
SET 'table.optimizer.agg-phase-strategy' = 'ONE_PHASE';
CREATE TABLE MyTable (
a BIGINT,
b INT NOT NULL,
c VARCHAR,
d BIGINT
) WITH (
'connector' = 'values',
'bounded' = 'false');
EXPLAIN PLAN_ADVICE
SELECT
AVG(a) AS avg_a,
COUNT(*) AS cnt,
COUNT(b) AS cnt_b,
MIN(b) AS min_b,
MAX(c) FILTER (WHERE a > 1) AS max_c
FROM MyTable;
== Optimized Physical Plan With Advice ==
Calc(select=[avg_a, cnt, cnt AS cnt_b, min_b, max_c])
+- GroupAggregate(advice=[1], select=[AVG(a) AS avg_a, COUNT(*) AS cnt, MIN(b) AS min_b, MAX(c) FILTER $f3 AS max_c])
+- Exchange(distribution=[single])
+- Calc(select=[a, b, c, IS TRUE(>(a, 1)) AS $f3])
+- MiniBatchAssigner(interval=[10000ms], mode=[ProcTime])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c], metadata=[]]], fields=[a, b, c])
advice[1]: [ADVICE] You might want to enable local-global two-phase optimization by configuring ('table.optimizer.agg-phase-strategy' to 'AUTO').
若检测到存在 NDU 问题风险时,优化器(optimizer)将会把建议内容附在最后。
CREATE TABLE MyTable (
a INT,
b BIGINT,
c STRING,
d INT,
`day` AS DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd'),
PRIMARY KEY (a, c) NOT ENFORCED
) WITH (
'connector' = 'values',
'changelog-mode' = 'I,UA,UB,D'
);
CREATE TABLE MySink (
a INT,
b BIGINT,
c STRING,
PRIMARY KEY (a) NOT ENFORCED
) WITH (
'connector' = 'values',
'sink-insert-only' = 'false'
);
EXPLAIN PLAN_ADVICE
INSERT INTO MySink
SELECT a, b, `day`
FROM MyTable
WHERE b > 100;
== Optimized Physical Plan With Advice ==
Sink(table=[default_catalog.default_database.MySink], fields=[a, b, day], upsertMaterialize=[true])
+- Calc(select=[a, b, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS day], where=[>(b, 100)])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, filter=[], project=[a, b], metadata=[]]], fields=[a, b])
advice[1]: [WARNING] The column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism requirement for correctly processing update message('UB'/'UA'/'D' in changelogMode, not 'I' only), this usually happens when input node has no upsertKey(upsertKeys=[{}]) or current node outputs non-deterministic update messages. Please consider removing these non-deterministic columns or making them deterministic by using deterministic functions.
related rel plan:
Calc(select=[a, b, DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd') AS day], where=[>(b, 100)], changelogMode=[I,UB,UA,D])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, filter=[], project=[a, b], metadata=[]]], fields=[a, b], changelogMode=[I,UB,UA,D])
若未检测到问题,优化器(optimizer)将会在计划最后附上 “No available advice” 作为提示。
CREATE TABLE MyTable (
a INT,
b BIGINT,
c STRING,
d INT
) WITH (
'connector' = 'values',
'changelog-mode' = 'I'
);
EXPLAIN PLAN_ADVICE
SELECT * FROM MyTable WHERE b > 100;
== Optimized Physical Plan With Advice ==
Calc(select=[a, b, c, d], where=[>(b, 100)])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d])
No available advice...
JSON_EXECUTION_PLAN
生成 json 格式的程序执行计划。
语法 #
EXPLAIN [([ExplainDetail[, ExplainDetail]*]) | PLAN FOR] <query_statement_or_insert_statement_or_statement_set>
statement_set:
STATEMENT SET
BEGIN
insert_statement;
...
insert_statement;
END;