Hive

Hive #

Table Store currently supports the following features related with Hive:

  • Create, drop and insert into table store tables in Flink SQL through table store Hive catalog. Tables created in this way can also be read directly from Hive.
  • Register existing table store tables as external tables in Hive SQL.

Version #

Table Store currently supports Hive 2.1, 2.2 and 2.3.

Execution Engine #

Table Store currently supports MR and Tez execution engine for Hive.

Install #

Download the jar file with corresponding version.

Jar
Hive 2.3 flink-table-store-hive-catalog-0.2.1_2.3.jar
Hive 2.2 flink-table-store-hive-catalog-0.2.1_2.2.jar
Hive 2.1 flink-table-store-hive-catalog-0.2.1_2.1.jar

If you’re aiming for Hive 2.1 CDH 6.3, see Build From Source for more information.

To enable Table Store Hive Catalog support in Flink, you can pick one of the following two methods.

  • Copy the jar file into the lib directory of your Flink installation directory. Note that this must be done before starting your Flink cluster.
  • If you’re using Flink’s SQL client, append --jar /path/to/flink-table-store-hive-catalog-0.2.1.jar to the starting command of SQL client.

Table Store Hive Connector (For Hive) #

Download the jar file with corresponding version.

Jar
Hive 2.3 flink-table-store-hive-connector-0.2.1_2.3.jar
Hive 2.2 flink-table-store-hive-connector-0.2.1_2.2.jar
Hive 2.1 flink-table-store-hive-connector-0.2.1_2.1.jar

If you’re aiming for Hive 2.1 CDH 6.3, see Build From Source for more information.

There are several ways to add this jar to Hive.

  • You can create an auxlib folder under the root directory of Hive, and copy flink-table-store-hive-connector-0.2.1.jar into auxlib.
  • You can also copy this jar to a path accessible by Hive, then use add jar /path/to/flink-table-store-hive-connector-0.2.1.jar to enable table store support in Hive. Note that this method is not recommended. If you’re using the MR execution engine and running a join statement, you may be faced with the exception org.apache.hive.com.esotericsoftware.kryo.kryoexception: unable to find class.
Note: If you are using HDFS, make sure that the environment variable HADOOP_HOME or HADOOP_CONF_DIR is set.

Using Table Store Hive Catalog #

By using table store Hive catalog, you can create, drop and insert into table store tables from Flink. These operations directly affect the corresponding Hive metastore. Tables created in this way can also be accessed directly from Hive.

Execute the following Flink SQL script in Flink SQL client to define a table store Hive catalog and create a table store table.

-- Flink SQL CLI
-- Define table store Hive catalog

CREATE CATALOG my_hive WITH (
  'type' = 'table-store',
  'metastore' = 'hive',
  'uri' = 'thrift://<hive-metastore-host-name>:<port>',
  'warehouse' = '/path/to/table/store/warehouse'
);

-- Use table store Hive catalog

USE CATALOG my_hive;

-- Create a table in table store Hive catalog (use "default" database by default)

CREATE TABLE test_table (
  a int,
  b string
);

-- Insert records into test table

INSERT INTO test_table VALUES (1, 'Table'), (2, 'Store');

-- Read records from test table

SELECT * FROM test_table;

/*
+---+-------+
| a |     b |
+---+-------+
| 1 | Table |
| 2 | Store |
+---+-------+
*/

Run the following Hive SQL in Hive CLI to access the created table.

-- Assume that flink-table-store-hive-connector-0.2.1.jar is already in auxlib directory.
-- List tables in Hive
-- (you might need to switch to "default" database if you're not there by default)

SHOW TABLES;

/*
OK
test_table
*/

-- Read records from test_table

SELECT a, b FROM test_table ORDER BY a;

/*
OK
1	Table
2	Store
*/

Using External Table #

To access existing table store table, you can also register them as external tables in Hive. Run the following Hive SQL in Hive CLI.

-- Assume that flink-table-store-hive-connector-0.2.1.jar is already in auxlib directory.
-- Let's use the test_table created in the above section.
-- To create an external table, you don't need to specify any column or table properties.
-- Pointing the location to the path of table is enough.

CREATE EXTERNAL TABLE external_test_table
STORED BY 'org.apache.flink.table.store.hive.TableStoreHiveStorageHandler'
LOCATION '/path/to/table/store/warehouse/default.db/test_table';

-- Read records from external_test_table

SELECT a, b FROM test_table ORDER BY a;

/*
OK
1	Table
2	Store
*/

Hive Type Conversion #

This section lists all supported type conversion between Hive and Flink. All Hive’s data types are available in package org.apache.hadoop.hive.serde2.typeinfo.

Hive Data Type Flink Data Type Atomic Type
StructTypeInfo RowType false
MapTypeInfo MapType false
ListTypeInfo ArrayType false
PrimitiveTypeInfo("boolean") BooleanType true
PrimitiveTypeInfo("tinyint") TinyIntType true
PrimitiveTypeInfo("smallint") SmallIntType true
PrimitiveTypeInfo("int") IntType true
PrimitiveTypeInfo("bigint") BigIntType true
PrimitiveTypeInfo("float") FloatType true
PrimitiveTypeInfo("double") DoubleType true
BaseCharTypeInfo("char(%d)") CharType(length) true
PrimitiveTypeInfo("string") VarCharType(VarCharType.MAX_LENGTH) true
BaseCharTypeInfo("varchar(%d)") VarCharType(length), length is less than VarCharType.MAX_LENGTH true
PrimitiveTypeInfo("date") DateType true
TimestampType TimestampType true
DecimalTypeInfo("decimal(%d, %d)") DecimalType(precision, scale) true
DecimalTypeInfo("binary") VarBinaryType, BinaryType true