This documentation is for an unreleased version of Apache Flink Table Store. We recommend you use the latest stable version.
Hive #
This documentation is a guide for using Table Store in Hive.
Version #
Table Store currently supports Hive 2.1, 2.1-cdh-6.3, 2.2, 2.3 and 3.1.
Execution Engine #
Table Store currently supports MR and Tez execution engine for Hive.
Installation #
You are using an unreleased version of Table Store so you need to manually build bundled jar from the source code.To build from source code, either download the source of a release or clone the git repository.
Build bundled jar with the following command.
mvn clean install -Dmaven.test.skip=true
You can find Hive connector jar in ./flink-table-store-hive/flink-table-store-hive-connector-<hive-version>/target/flink-table-store-hive-connector-<hive-version>-0.4-SNAPSHOT.jar
.
There are several ways to add this jar to Hive.
- You can create an
auxlib
folder under the root directory of Hive, and copyflink-table-store-hive-connector-0.4-SNAPSHOT.jar
intoauxlib
. - You can also copy this jar to a path accessible by Hive, then use
add jar /path/to/flink-table-store-hive-connector-0.4-SNAPSHOT.jar
to enable table store support in Hive. Note that this method is not recommended. If you’re using the MR execution engine and running a join statement, you may be faced with the exceptionorg.apache.hive.com.esotericsoftware.kryo.kryoexception: unable to find class
.
NOTE: If you are using HDFS, make sure that the environment variable HADOOP_HOME
or HADOOP_CONF_DIR
is set.
Quick Start with Table Store Hive Catalog #
By using table store Hive catalog, you can create, drop and insert into table store tables from Flink. These operations directly affect the corresponding Hive metastore. Tables created in this way can also be accessed directly from Hive.
Step 1: Prepare Table Store Hive Catalog Jar File for Flink
See creating a catalog with Hive metastore.
Step 2: Create Test Data with Flink SQL
Execute the following Flink SQL script in Flink SQL client to define a Table Store Hive catalog and create a table.
-- Flink SQL CLI
-- Define table store Hive catalog
CREATE CATALOG my_hive WITH (
'type' = 'table-store',
'metastore' = 'hive',
'uri' = 'thrift://<hive-metastore-host-name>:<port>',
'warehouse' = '/path/to/table/store/warehouse'
);
-- Use table store Hive catalog
USE CATALOG my_hive;
-- Create a table in table store Hive catalog (use "default" database by default)
CREATE TABLE test_table (
a int,
b string
);
-- Insert records into test table
INSERT INTO test_table VALUES (1, 'Table'), (2, 'Store');
-- Read records from test table
SELECT * FROM test_table;
/*
+---+-------+
| a | b |
+---+-------+
| 1 | Table |
| 2 | Store |
+---+-------+
*/
Step 3: Query the Table in Hive
Run the following Hive SQL in Hive CLI to access the created table.
-- Assume that flink-table-store-hive-connector-<hive-version>-0.4-SNAPSHOT.jar is already in auxlib directory.
-- List tables in Hive
-- (you might need to switch to "default" database if you're not there by default)
SHOW TABLES;
/*
OK
test_table
*/
-- Read records from test_table
SELECT a, b FROM test_table ORDER BY a;
/*
OK
1 Table
2 Store
*/
Quick Start with External Table #
To access existing table store table, you can also register them as external tables in Hive. Run the following Hive SQL in Hive CLI.
-- Assume that flink-table-store-hive-connector-0.4-SNAPSHOT.jar is already in auxlib directory.
-- Let's use the test_table created in the above section.
-- To create an external table, you don't need to specify any column or table properties.
-- Pointing the location to the path of table is enough.
CREATE EXTERNAL TABLE external_test_table
STORED BY 'org.apache.flink.table.store.hive.TableStoreHiveStorageHandler'
LOCATION '/path/to/table/store/warehouse/default.db/test_table';
-- Read records from external_test_table
SELECT a, b FROM test_table ORDER BY a;
/*
OK
1 Table
2 Store
*/
Hive Type Conversion #
This section lists all supported type conversion between Hive and Flink.
All Hive’s data types are available in package org.apache.hadoop.hive.serde2.typeinfo
.
Hive Data Type | Flink Data Type | Atomic Type |
---|---|---|
StructTypeInfo |
RowType |
false |
MapTypeInfo |
MapType |
false |
ListTypeInfo |
ArrayType |
false |
PrimitiveTypeInfo("boolean") |
BooleanType |
true |
PrimitiveTypeInfo("tinyint") |
TinyIntType |
true |
PrimitiveTypeInfo("smallint") |
SmallIntType |
true |
PrimitiveTypeInfo("int") |
IntType |
true |
PrimitiveTypeInfo("bigint") |
BigIntType |
true |
PrimitiveTypeInfo("float") |
FloatType |
true |
PrimitiveTypeInfo("double") |
DoubleType |
true |
BaseCharTypeInfo("char(%d)") |
CharType(length) |
true |
PrimitiveTypeInfo("string") |
VarCharType(VarCharType.MAX_LENGTH) |
true |
BaseCharTypeInfo("varchar(%d)") |
VarCharType(length), length is less than VarCharType.MAX_LENGTH |
true |
PrimitiveTypeInfo("date") |
DateType |
true |
TimestampType |
TimestampType |
true |
DecimalTypeInfo("decimal(%d, %d)") |
DecimalType(precision, scale) |
true |
DecimalTypeInfo("binary") |
VarBinaryType , BinaryType |
true |