Hive #
Table Store currently supports the following features related with Hive:
- Create, drop and insert into table store tables in Flink SQL through table store Hive catalog. Tables created in this way can also be read directly from Hive.
- Register existing table store tables as external tables in Hive SQL.
Version #
Table Store currently supports Hive 2.1, 2.2 and 2.3.
Execution Engine #
Table Store currently supports MR and Tez execution engine for Hive.
Install #
Table Store Hive Catalog (For Flink) #
Download the jar file with corresponding version.
Jar | |
---|---|
Hive 2.3 | flink-table-store-hive-catalog-0.2.1_2.3.jar |
Hive 2.2 | flink-table-store-hive-catalog-0.2.1_2.2.jar |
Hive 2.1 | flink-table-store-hive-catalog-0.2.1_2.1.jar |
If you’re aiming for Hive 2.1 CDH 6.3, see Build From Source for more information.
To enable Table Store Hive Catalog support in Flink, you can pick one of the following two methods.
- Copy the jar file into the
lib
directory of your Flink installation directory. Note that this must be done before starting your Flink cluster. - If you’re using Flink’s SQL client, append
--jar /path/to/flink-table-store-hive-catalog-0.2.1.jar
to the starting command of SQL client.
Table Store Hive Connector (For Hive) #
Download the jar file with corresponding version.
Jar | |
---|---|
Hive 2.3 | flink-table-store-hive-connector-0.2.1_2.3.jar |
Hive 2.2 | flink-table-store-hive-connector-0.2.1_2.2.jar |
Hive 2.1 | flink-table-store-hive-connector-0.2.1_2.1.jar |
If you’re aiming for Hive 2.1 CDH 6.3, see Build From Source for more information.
There are several ways to add this jar to Hive.
- You can create an
auxlib
folder under the root directory of Hive, and copyflink-table-store-hive-connector-0.2.1.jar
intoauxlib
. - You can also copy this jar to a path accessible by Hive, then use
add jar /path/to/flink-table-store-hive-connector-0.2.1.jar
to enable table store support in Hive. Note that this method is not recommended. If you’re using the MR execution engine and running a join statement, you may be faced with the exceptionorg.apache.hive.com.esotericsoftware.kryo.kryoexception: unable to find class
.
Note: If you are using HDFS, make sure that the environment variableHADOOP_HOME
orHADOOP_CONF_DIR
is set.
Using Table Store Hive Catalog #
By using table store Hive catalog, you can create, drop and insert into table store tables from Flink. These operations directly affect the corresponding Hive metastore. Tables created in this way can also be accessed directly from Hive.
Execute the following Flink SQL script in Flink SQL client to define a table store Hive catalog and create a table store table.
-- Flink SQL CLI
-- Define table store Hive catalog
CREATE CATALOG my_hive WITH (
'type' = 'table-store',
'metastore' = 'hive',
'uri' = 'thrift://<hive-metastore-host-name>:<port>',
'warehouse' = '/path/to/table/store/warehouse'
);
-- Use table store Hive catalog
USE CATALOG my_hive;
-- Create a table in table store Hive catalog (use "default" database by default)
CREATE TABLE test_table (
a int,
b string
);
-- Insert records into test table
INSERT INTO test_table VALUES (1, 'Table'), (2, 'Store');
-- Read records from test table
SELECT * FROM test_table;
/*
+---+-------+
| a | b |
+---+-------+
| 1 | Table |
| 2 | Store |
+---+-------+
*/
Run the following Hive SQL in Hive CLI to access the created table.
-- Assume that flink-table-store-hive-connector-0.2.1.jar is already in auxlib directory.
-- List tables in Hive
-- (you might need to switch to "default" database if you're not there by default)
SHOW TABLES;
/*
OK
test_table
*/
-- Read records from test_table
SELECT a, b FROM test_table ORDER BY a;
/*
OK
1 Table
2 Store
*/
Using External Table #
To access existing table store table, you can also register them as external tables in Hive. Run the following Hive SQL in Hive CLI.
-- Assume that flink-table-store-hive-connector-0.2.1.jar is already in auxlib directory.
-- Let's use the test_table created in the above section.
-- To create an external table, you don't need to specify any column or table properties.
-- Pointing the location to the path of table is enough.
CREATE EXTERNAL TABLE external_test_table
STORED BY 'org.apache.flink.table.store.hive.TableStoreHiveStorageHandler'
LOCATION '/path/to/table/store/warehouse/default.db/test_table';
-- Read records from external_test_table
SELECT a, b FROM test_table ORDER BY a;
/*
OK
1 Table
2 Store
*/
Hive Type Conversion #
This section lists all supported type conversion between Hive and Flink.
All Hive’s data types are available in package org.apache.hadoop.hive.serde2.typeinfo
.
Hive Data Type | Flink Data Type | Atomic Type |
---|---|---|
StructTypeInfo |
RowType |
false |
MapTypeInfo |
MapType |
false |
ListTypeInfo |
ArrayType |
false |
PrimitiveTypeInfo("boolean") |
BooleanType |
true |
PrimitiveTypeInfo("tinyint") |
TinyIntType |
true |
PrimitiveTypeInfo("smallint") |
SmallIntType |
true |
PrimitiveTypeInfo("int") |
IntType |
true |
PrimitiveTypeInfo("bigint") |
BigIntType |
true |
PrimitiveTypeInfo("float") |
FloatType |
true |
PrimitiveTypeInfo("double") |
DoubleType |
true |
BaseCharTypeInfo("char(%d)") |
CharType(length) |
true |
PrimitiveTypeInfo("string") |
VarCharType(VarCharType.MAX_LENGTH) |
true |
BaseCharTypeInfo("varchar(%d)") |
VarCharType(length), length is less than VarCharType.MAX_LENGTH |
true |
PrimitiveTypeInfo("date") |
DateType |
true |
TimestampType |
TimestampType |
true |
DecimalTypeInfo("decimal(%d, %d)") |
DecimalType(precision, scale) |
true |
DecimalTypeInfo("binary") |
VarBinaryType , BinaryType |
true |