Spark3 #
Table Store supports reading table store tables through Spark.
Version #
Table Store supports Spark 3+. It is highly recommended to use Spark 3+ version with many improvements.
Install #
Download flink-table-store-spark-0.2.1.jar.
Use --jars
in spark-sql:
spark-sql ... --jars flink-table-store-spark-0.2.1.jar
Alternatively, you can copy flink-table-store-spark-0.2.1.jar
under spark/jars
in your Spark installation.
Note: If you are using HDFS, make sure that the environment variableHADOOP_HOME
orHADOOP_CONF_DIR
is set.
Catalog #
The following command registers the Table Store’s Spark catalog with the name tablestore
:
spark-sql ... \
--conf spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog \
--conf spark.sql.catalog.tablestore.warehouse=file:/tmp/warehouse
If you would like to use Table Store Hive catalog from Spark, please also find the jar file with corresponding Hive version and append its path to --jars
argument.
Download the jar file with corresponding version.
Jar | |
---|---|
Hive 2.3 | flink-table-store-hive-catalog-0.2.1_2.3.jar |
Hive 2.2 | flink-table-store-hive-catalog-0.2.1_2.2.jar |
Hive 2.1 | flink-table-store-hive-catalog-0.2.1_2.1.jar |
Some extra configurations are needed if your Table Store Catalog uses the Hive Metastore (No extra configuration is required for read-only).
spark-sql ... \
--conf spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog \
--conf spark.sql.catalog.tablestore.warehouse=file:/tmp/warehouse \
--conf spark.sql.catalog.tablestore.metastore=hive \
--conf spark.sql.catalog.tablestore.uri=thrift://...
Query Table #
SELECT * FROM tablestore.default.myTable;
DataSet #
You can load a mapping table as DataSet on top of an existing Table Store table if you don’t want to use Table Store Catalog.
val dataset = spark.read.format("tablestore").load("file:/tmp/warehouse/default.db/myTable")
DDL Statements #
Create Table #
CREATE TABLE [IF NOT EXISTS] table_identifier
[ ( col_name1[:] col_type1 [ COMMENT col_comment1 ], ... ) ]
[ USING tablestore ]
[ COMMENT table_comment ]
[ PARTITIONED BY ( col_name1, col_name2, ... ) ]
[ TBLPROPERTIES ( key1=val1, key2=val2, ... ) ]
For example, create an order table with order_id
as primary key and partitioned by dt, hh
.
CREATE TABLE tablestore.default.OrderTable (
order_id BIGINT NOT NULL comment 'biz order id',
buyer_id BIGINT NOT NULL COMMENT 'buyer id',
coupon_info ARRAY<STRING> NOT NULL COMMENT 'coupon info',
order_amount DOUBLE NOT NULL COMMENT 'order amount',
dt STRING NOT NULL COMMENT 'yyyy-MM-dd',
hh STRING NOT NULL COMMENT 'HH'
) COMMENT 'my table'
PARTITIONED BY (dt, hh)
TBLPROPERTIES ('foo' = 'bar', 'primary-key' = 'order_id,dt,hh')
Note:
- Primary key feature is supported via table properties, and composite primary key is delimited with comma.
- Partition fields should be predefined, complex partition such like
PARTITIONED BY ( col_name2[:] col_type2 [ COMMENT col_comment2 ], ... )
is not supported.- For Spark 3.0,
CREATE TABLE USING tablestore
is required.
Alter Table #
ALTER TABLE table_identifier
SET TBLPROPERTIES ( key1=val1 )
| RESET TBLPROPERTIES (key2)
| ADD COLUMNS ( col_name col_type [ , ... ] )
| { ALTER | CHANGE } COLUMN col_name { DROP NOT NULL | COMMENT 'new_comment'}
- Change/add table properties
ALTER TABLE tablestore.default.OrderTable SET TBLPROPERTIES (
'write-buffer-size'='256 MB'
)
- Remove a table property
ALTER TABLE tablestore.default.OrderTable UNSET TBLPROPERTIES ('write-buffer-size')
- Add a new column
ALTER TABLE tablestore.default.OrderTable ADD COLUMNS (buy_count INT)
- Change column nullability
ALTER TABLE tablestore.default.OrderTable ALTER COLUMN coupon_info DROP NOT NULL
- Change column comment
ALTER TABLE tablestore.default.OrderTable ALTER COLUMN buy_count COMMENT 'buy count'
Note:
- Spark does not support changing nullable column to nonnull column.
Drop Table #
DROP TABLE tablestore.default.OrderTable
Attention: Drop a table will delete both metadata and files on the disk.
Create Namespace #
CREATE NAMESPACE [IF NOT EXISTS] tablestore.bar
Drop Namespace #
DROP NAMESPACE tablestore.bar
Attention: Drop a namespace will delete all table’s metadata and files under this namespace on the disk.
Spark Type Conversion #
This section lists all supported type conversion between Spark and Flink.
All Spark’s data types are available in package org.apache.spark.sql.types
.
Spark Data Type | Flink Data Type | Atomic Type |
---|---|---|
StructType |
RowType |
false |
MapType |
MapType |
false |
ArrayType |
ArrayType |
false |
BooleanType |
BooleanType |
true |
ByteType |
TinyIntType |
true |
ShortType |
SmallIntType |
true |
IntegerType |
IntType |
true |
LongType |
BigIntType |
true |
FloatType |
FloatType |
true |
DoubleType |
DoubleType |
true |
StringType |
VarCharType , CharType |
true |
DateType |
DateType |
true |
TimestampType |
TimestampType , LocalZonedTimestamp |
true |
DecimalType(precision, scale) |
DecimalType(precision, scale) |
true |
BinaryType |
VarBinaryType , BinaryType |
true |
Note:
- Currently, Spark’s field comment cannot be described under Flink CLI.
- Conversion between Spark’s
UserDefinedType
and Flink’sUserDefinedType
is not supported.