This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.
EXPLAIN 语句
EXPLAIN 语句 #
EXPLAIN 语句用于解释 query 或 INSERT 语句的执行逻辑,也用于优化 query 语句的查询计划。
执行 EXPLAIN 语句 #
可以使用 TableEnvironment
的 executeSql()
方法执行 EXPLAIN 语句。如果 EXPLAIN 操作执行成功,executeSql()
方法会返回解释结果,否则会抛出异常。
以下示例展示了如何在 TableEnvironment
中执行一条 EXPLAIN 语句。
可以使用 TableEnvironment
的 executeSql()
方法执行 EXPLAIN 语句。如果 EXPLAIN 操作执行成功,executeSql()
方法会返回解释结果,否则会抛出异常。
以下示例展示了如何在 TableEnvironment
中执行一条 EXPLAIN 语句。
可以使用 TableEnvironment
的 execute_sql()
方法执行 EXPLAIN 语句。如果 EXPLAIN 操作执行成功,execute_sql()
方法会返回解释结果,否则会抛出异常。
以下示例展示了如何在 TableEnvironment
中执行一条 EXPLAIN 语句。
EXPLAIN 语句可以在 SQL CLI 中执行。
以下示例展示了如何在 SQL CLI 中执行一条 EXPLAIN 语句。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 注册名为 “Orders” 的表
tEnv.executeSql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')");
tEnv.executeSql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')");
// 调用 TableEnvironment.explainSql() 来解释 SELECT 语句
String explanation = tEnv.explainSql(
"SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%' " +
"UNION ALL " +
"SELECT `count`, word FROM MyTable2");
System.out.println(explanation);
// 调用 TableEnvironment.executeSql() 来解释 SELECT 语句
TableResult tableResult = tEnv.executeSql(
"EXPLAIN PLAN FOR " +
"SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%' " +
"UNION ALL " +
"SELECT `count`, word FROM MyTable2");
tableResult.print();
TableResult tableResult2 = tEnv.executeSql(
"EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, JSON_EXECUTION_PLAN " +
"SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%' " +
"UNION ALL " +
"SELECT `count`, word FROM MyTable2");
tableResult2.print();
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val tEnv = StreamTableEnvironment.create(env)
// 注册名为 “Orders” 的表
tEnv.executeSql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')")
tEnv.executeSql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')")
// 调用 TableEnvironment.explainSql() 来解释 SELECT 语句
val explanation = tEnv.explainSql(
"SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%' " +
"UNION ALL " +
"SELECT `count`, word FROM MyTable2")
println(explanation)
// 调用 TableEnvironment.executeSql() 来解释 SELECT 语句
val tableResult = tEnv.executeSql(
"EXPLAIN PLAN FOR " +
"SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%' " +
"UNION ALL " +
"SELECT `count`, word FROM MyTable2")
tableResult.print()
val tableResult2 = tEnv.executeSql(
"EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, JSON_EXECUTION_PLAN " +
"SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%' " +
"UNION ALL " +
"SELECT `count`, word FROM MyTable2")
tableResult2.print()
settings = EnvironmentSettings.new_instance()...
table_env = StreamTableEnvironment.create(env, settings)
t_env.execute_sql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')")
t_env.execute_sql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')")
# 调用 TableEnvironment.explain_sql() 来解释 SELECT 语句
explanation1 = t_env.explain_sql(
"SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%' "
"UNION ALL "
"SELECT `count`, word FROM MyTable2")
print(explanation1)
# 调用 TableEnvironment.execute_sql() 来解释 SELECT 语句
table_result = t_env.execute_sql(
"EXPLAIN PLAN FOR "
"SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%' "
"UNION ALL "
"SELECT `count`, word FROM MyTable2")
table_result.print()
table_result2 = t_env.execute_sql(
"EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, JSON_EXECUTION_PLAN "
"SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%' "
"UNION ALL "
"SELECT `count`, word FROM MyTable2")
table_result2.print()
Flink SQL> CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen');
[INFO] Table has been created.
Flink SQL> CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen');
[INFO] Table has been created.
Flink SQL> EXPLAIN PLAN FOR SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%'
> UNION ALL
> SELECT `count`, word FROM MyTable2;
Flink SQL> EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, JSON_EXECUTION_PLAN SELECT `count`, word FROM MyTable1
> WHERE word LIKE 'F%'
> UNION ALL
> SELECT `count`, word FROM MyTable2;
EXPLAIN
的结果如下:
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalProject(count=[$0], word=[$1])
: +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+- LogicalProject(count=[$0], word=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
== Optimized Physical Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
: +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])
== Optimized Execution Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
: +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalProject(count=[$0], word=[$1])
: +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+- LogicalProject(count=[$0], word=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
== Optimized Physical Plan ==
Union(all=[true], union=[count, word], changelogMode=[I]): rowcount = 1.05E8, cumulative cost = {3.1E8 rows, 3.05E8 cpu, 4.0E9 io, 0.0 network, 0.0 memory}
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')], changelogMode=[I]): rowcount = 5000000.0, cumulative cost = {1.05E8 rows, 1.0E8 cpu, 2.0E9 io, 0.0 network, 0.0 memory}
: +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.0E9 io, 0.0 network, 0.0 memory}
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.0E9 io, 0.0 network, 0.0 memory}
== Optimized Execution Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
: +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])
== Physical Execution Plan ==
{
"nodes" : [ {
"id" : 37,
"type" : "Source: TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])",
"pact" : "Data Source",
"contents" : "Source: TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])",
"parallelism" : 1
}, {
"id" : 38,
"type" : "Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])",
"pact" : "Operator",
"contents" : "Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])",
"parallelism" : 1,
"predecessors" : [ {
"id" : 37,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 39,
"type" : "Source: TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])",
"pact" : "Data Source",
"contents" : "Source: TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])",
"parallelism" : 1
} ]
ExplainDetails #
使用指定的 explainDetail 类型来打印语句的计划。
ESTIMATED_COST:生成优化器(optimizer)估算的物理节点相关的成本信息,
例如:TableSourceScan(..., cumulative cost ={1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory})
CHANGELOG_MODE:为每个物理 RelNode 生成 changelog mode。
例如:GroupAggregate(..., changelogMode=[I,UA,D])
JSON_EXECUTION_PLAN:生成 json 格式的程序执行计划。
语法 #
EXPLAIN [([ExplainDetail[, ExplainDetail]*]) | PLAN FOR] <query_statement_or_insert_statement_or_statement_set>
statement_set:
EXECUTE STATEMENT SET
BEGIN
insert_statement;
...
insert_statement;
END;