Overview
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

Hive Dialect #

Flink allows users to write SQL statements in Hive syntax when Hive dialect is used. By providing compatibility with Hive syntax, we aim to improve the interoperability with Hive and reduce the scenarios when users need to switch between Flink and Hive in order to execute different statements.

Use Hive Dialect #

Flink currently supports two SQL dialects: default and hive. You need to switch to Hive dialect before you can write in Hive syntax. The following describes how to set dialect using SQL Client, SQL Gateway configured with HiveServer2 Endpoint and Table API. Also notice that you can dynamically switch dialect for each statement you execute. There’s no need to restart a session to use a different dialect.

Note:

  • To use Hive dialect, you have to add dependencies related to Hive. Please refer to Hive dependencies for how to add the dependencies.
  • Please make sure the current catalog is HiveCatalog. Otherwise, it will fall back to Flink’s default dialect. When using SQL Gateway configured with HiveServer2 Endpoint, the current catalog will be a HiveCatalog by default.
  • In order to have better syntax and semantic compatibility, it’s highly recommended to load HiveModule and place it first in the module list, so that Hive built-in functions can be picked up during function resolution. Please refer here for how to change resolution order. But when using SQL Gateway configured with HiveServer2 Endpoint, the Hive module will be loaded automatically.
  • Hive dialect only supports 2-part identifiers, so you can’t specify catalog for an identifier.
  • While all Hive versions support the same syntax, whether a specific feature is available still depends on the Hive version you use. For example, updating database location is only supported in Hive-2.4.0 or later.
  • The Hive dialect is mainly used in batch mode. Some Hive’s syntax (Sort/Cluster/Distributed BY, Transform, etc.) haven’t been supported in streaming mode yet.

SQL Client #

SQL dialect can be specified via the table.sql-dialect property. Therefore,you can set the dialect after the SQL Client has launched.

Flink SQL> SET table.sql-dialect = hive; -- to use Hive dialect
[INFO] Session property has been set.

Flink SQL> SET table.sql-dialect = default; -- to use Flink default dialect
[INFO] Session property has been set.

SQL Gateway Configured With HiveServer2 Endpoint #

When using the SQL Gateway configured with HiveServer2 Endpoint, the dialect will be Hive dialect by default, so you don’t need to do anything if you want to use Hive dialect. But you can still change the dialect to Flink default dialect.

# assuming has connected to SQL Gateway with beeline
jdbc:hive2> SET table.sql-dialect = default; -- to use Flink default dialect

jdbc:hive2> SET table.sql-dialect = hive; -- to use Hive dialect

Table API #

You can set dialect for your TableEnvironment with Table API.

EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);

// to use hive dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

// to use default dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
from pyflink.table import *
settings = EnvironmentSettings.in_batch_mode()
t_env = TableEnvironment.create(settings)

# to use Hive dialect
t_env.get_config().set_sql_dialect(SqlDialect.HIVE)

# to use Flink default dialect
t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)