概览

Hive 方言 #

从 1.11.0 开始,在使用 Hive 方言时,Flink 允许用户用 Hive 语法来编写 SQL 语句。 通过提供与 Hive 语法的兼容性,我们旨在改善与 Hive 的互操作性,并减少用户需要在 Flink 和 Hive 之间切换来执行不同语句的情况。

使用 Hive 方言 #

Flink 目前支持两种 SQL 方言: defaulthive。你需要先切换到 Hive 方言,然后才能使用 Hive 语法编写。下面介绍如何使用 SQL 客户端,启动了 HiveServer2 endpoint 的 SQL Gateway 和 Table API 设置方言。 还要注意,你可以为执行的每个语句动态切换方言。无需重新启动会话即可使用其他方言。

Note:

  • 为了使用 Hive 方言, 你必须首先添加和 Hive 相关的依赖. 请参考 Hive dependencies 如何添加这些依赖。
  • 从 Flink 1.15版本开始,如果需要在 Flink SQL Client 或者 SQL Gateway 使用 Hive 方言的话,请首先将 FLINK_HOME/opt 下面的 flink-table-planner_2.12-1.16.2.jar jar 包放到 FLINK_HOME/lib 下,并将 FLINK_HOME/lib 下的 flink-table-planner-loader-1.16.2.jar jar 包移出 FLINK_HOME/lib 目录。否则将抛出 ValidationException。具体原因请参考 FLINK-25128
  • 请确保当前的 Catalog 是 HiveCatalog. 否则, 将使用 Flink 的默认方言。 在启动了 HiveServer2 Endpoint 的 SQL Gateway 下,默认当前的 Catalog 就是 HiveCatalog。
  • 为了实现更好的语法和语义的兼容,强烈建议首先加载 HiveModule 并将其放在 Module 列表的首位,以便在函数解析时优先使用 Hive 内置函数。 请参考文档 here 来将 HiveModule 放在 Module 列表的首。 在启动了 HiveServer2 endpoint 的 SQL Gateway,HiveModule 已经被加载进来了。
  • Hive 方言只支持 db.table 这种两级的标识符,不支持带有 Catalog 名字的标识符。
  • 虽然所有 Hive 版本支持相同的语法,但是一些特定的功能是否可用仍取决于你使用的 Hive 版本。例如,更新数据库位置 只在 Hive-2.4.0 或更高版本支持。
  • Hive 方言主要是在批模式下使用的,某些 Hive 的语法(Sort/Cluster/Distributed BY, Transform, 等)还没有在流模式下支持。

SQL Client #

SQL 方言可以通过 table.sql-dialect 属性指定。你可以在 SQL 客户端启动后设置方言。

Flink SQL> SET table.sql-dialect = hive; -- 使用 Hive 方言
[INFO] Session property has been set.

Flink SQL> SET table.sql-dialect = default; -- 使用 Flink 默认 方言
[INFO] Session property has been set.

SQL Gateway Configured With HiveServer2 Endpoint #

在启动了 HiveServer2 endpoint 的 SQL Gateway中,会默认使用 Hive 方言,所以如果你想使用 Hive 方言的话,你不需要手动切换至 Hive 方言,直接就能使用。但是如果你想使用 Flink 的默认方言,你也手动进行切换。

# 假设已经通过 beeline 连接上了 SQL Gateway
jdbc:hive2> SET table.sql-dialect = default; -- 使用 Flink 默认 方言

jdbc:hive2> SET table.sql-dialect = hive; -- 使用 Hive 方言

Table API #

你可以使用 Table API 为 TableEnvironment 设置方言。

EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);

// to use hive dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

// to use default dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
from pyflink.table import *
settings = EnvironmentSettings.in_batch_mode()
t_env = TableEnvironment.create(settings)

# to use hive dialect
t_env.get_config().set_sql_dialect(SqlDialect.HIVE)

# to use default dialect
t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)