Hive

Hive #

This documentation is a guide for using Table Store in Hive.

Version #

Table Store currently supports Hive 2.1, 2.2, 2.3 and 3.1.

Execution Engine #

Table Store currently supports MR and Tez execution engine for Hive.

Installation #

Download the jar file with corresponding version.

Jar
Hive 3.1 flink-table-store-hive-connector-0.3.0_3.1.jar
Hive 2.3 flink-table-store-hive-connector-0.3.0_2.3.jar
Hive 2.2 flink-table-store-hive-connector-0.3.0_2.2.jar
Hive 2.1 flink-table-store-hive-connector-0.3.0_2.1.jar

You can also manually build bundled jar from the source code.

To build from source code, either download the source of a release or clone the git repository.

Build bundled jar with the following command.

Version Command
Hive 3.1 mvn clean install -Dmaven.test.skip=true -Phive-3.1
Hive 2.3 mvn clean install -Dmaven.test.skip=true
Hive 2.2 mvn clean install -Dmaven.test.skip=true -Phive-2.2
Hive 2.1 mvn clean install -Dmaven.test.skip=true -Phive-2.1
Hive 2.1 CDH 6.3 mvn clean install -Dmaven.test.skip=true -Phive-2.1-cdh-6.3

You can find Hive connector jar in ./flink-table-store-hive/flink-table-store-hive-connector/target/flink-table-store-hive-connector-0.3.0.jar.

There are several ways to add this jar to Hive.

  • You can create an auxlib folder under the root directory of Hive, and copy flink-table-store-hive-connector-0.3.0.jar into auxlib.
  • You can also copy this jar to a path accessible by Hive, then use add jar /path/to/flink-table-store-hive-connector-0.3.0.jar to enable table store support in Hive. Note that this method is not recommended. If you’re using the MR execution engine and running a join statement, you may be faced with the exception org.apache.hive.com.esotericsoftware.kryo.kryoexception: unable to find class.

NOTE: If you are using HDFS, make sure that the environment variable HADOOP_HOME or HADOOP_CONF_DIR is set.

Quick Start with Table Store Hive Catalog #

By using table store Hive catalog, you can create, drop and insert into table store tables from Flink. These operations directly affect the corresponding Hive metastore. Tables created in this way can also be accessed directly from Hive.

Step 1: Prepare Table Store Hive Catalog Jar File for Flink

See creating a catalog with Hive metastore.

Step 2: Create Test Data with Flink SQL

Execute the following Flink SQL script in Flink SQL client to define a Table Store Hive catalog and create a table.

-- Flink SQL CLI
-- Define table store Hive catalog

CREATE CATALOG my_hive WITH (
  'type' = 'table-store',
  'metastore' = 'hive',
  'uri' = 'thrift://<hive-metastore-host-name>:<port>',
  'warehouse' = '/path/to/table/store/warehouse'
);

-- Use table store Hive catalog

USE CATALOG my_hive;

-- Create a table in table store Hive catalog (use "default" database by default)

CREATE TABLE test_table (
  a int,
  b string
);

-- Insert records into test table

INSERT INTO test_table VALUES (1, 'Table'), (2, 'Store');

-- Read records from test table

SELECT * FROM test_table;

/*
+---+-------+
| a |     b |
+---+-------+
| 1 | Table |
| 2 | Store |
+---+-------+
*/

Step 3: Query the Table in Hive

Run the following Hive SQL in Hive CLI to access the created table.

-- Assume that flink-table-store-hive-connector-0.3.0.jar is already in auxlib directory.
-- List tables in Hive
-- (you might need to switch to "default" database if you're not there by default)

SHOW TABLES;

/*
OK
test_table
*/

-- Read records from test_table

SELECT a, b FROM test_table ORDER BY a;

/*
OK
1	Table
2	Store
*/

Quick Start with External Table #

To access existing table store table, you can also register them as external tables in Hive. Run the following Hive SQL in Hive CLI.

-- Assume that flink-table-store-hive-connector-0.3.0.jar is already in auxlib directory.
-- Let's use the test_table created in the above section.
-- To create an external table, you don't need to specify any column or table properties.
-- Pointing the location to the path of table is enough.

CREATE EXTERNAL TABLE external_test_table
STORED BY 'org.apache.flink.table.store.hive.TableStoreHiveStorageHandler'
LOCATION '/path/to/table/store/warehouse/default.db/test_table';

-- Read records from external_test_table

SELECT a, b FROM test_table ORDER BY a;

/*
OK
1	Table
2	Store
*/

Hive Type Conversion #

This section lists all supported type conversion between Hive and Flink. All Hive’s data types are available in package org.apache.hadoop.hive.serde2.typeinfo.

Hive Data Type Flink Data Type Atomic Type
StructTypeInfo RowType false
MapTypeInfo MapType false
ListTypeInfo ArrayType false
PrimitiveTypeInfo("boolean") BooleanType true
PrimitiveTypeInfo("tinyint") TinyIntType true
PrimitiveTypeInfo("smallint") SmallIntType true
PrimitiveTypeInfo("int") IntType true
PrimitiveTypeInfo("bigint") BigIntType true
PrimitiveTypeInfo("float") FloatType true
PrimitiveTypeInfo("double") DoubleType true
BaseCharTypeInfo("char(%d)") CharType(length) true
PrimitiveTypeInfo("string") VarCharType(VarCharType.MAX_LENGTH) true
BaseCharTypeInfo("varchar(%d)") VarCharType(length), length is less than VarCharType.MAX_LENGTH true
PrimitiveTypeInfo("date") DateType true
TimestampType TimestampType true
DecimalTypeInfo("decimal(%d, %d)") DecimalType(precision, scale) true
DecimalTypeInfo("binary") VarBinaryType, BinaryType true