Spark3

Spark3 #

Table Store supports reading table store tables through Spark.

Version #

Table Store supports Spark 3+. It is highly recommended to use Spark 3+ version with many improvements.

Install #

Download flink-table-store-spark-0.2.1.jar.

Use --jars in spark-sql:

spark-sql ... --jars flink-table-store-spark-0.2.1.jar

Alternatively, you can copy flink-table-store-spark-0.2.1.jar under spark/jars in your Spark installation.

Note: If you are using HDFS, make sure that the environment variable HADOOP_HOME or HADOOP_CONF_DIR is set.

Catalog #

The following command registers the Table Store’s Spark catalog with the name tablestore:

spark-sql ... \
    --conf spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog \
    --conf spark.sql.catalog.tablestore.warehouse=file:/tmp/warehouse

If you would like to use Table Store Hive catalog from Spark, please also find the jar file with corresponding Hive version and append its path to --jars argument.

Download the jar file with corresponding version.

Jar
Hive 2.3 flink-table-store-hive-catalog-0.2.1_2.3.jar
Hive 2.2 flink-table-store-hive-catalog-0.2.1_2.2.jar
Hive 2.1 flink-table-store-hive-catalog-0.2.1_2.1.jar

Some extra configurations are needed if your Table Store Catalog uses the Hive Metastore (No extra configuration is required for read-only).

spark-sql ... \
    --conf spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog \
    --conf spark.sql.catalog.tablestore.warehouse=file:/tmp/warehouse \
    --conf spark.sql.catalog.tablestore.metastore=hive \
    --conf spark.sql.catalog.tablestore.uri=thrift://...

Query Table #

SELECT * FROM tablestore.default.myTable;

DataSet #

You can load a mapping table as DataSet on top of an existing Table Store table if you don’t want to use Table Store Catalog.

val dataset = spark.read.format("tablestore").load("file:/tmp/warehouse/default.db/myTable")

DDL Statements #

Create Table #

CREATE TABLE [IF NOT EXISTS] table_identifier 
[ ( col_name1[:] col_type1 [ COMMENT col_comment1 ], ... ) ]
[ USING tablestore ]    
[ COMMENT table_comment ]
[ PARTITIONED BY ( col_name1, col_name2, ... ) ]
[ TBLPROPERTIES ( key1=val1, key2=val2, ... ) ]       

For example, create an order table with order_id as primary key and partitioned by dt, hh.

CREATE TABLE tablestore.default.OrderTable (
    order_id BIGINT NOT NULL comment 'biz order id',
    buyer_id BIGINT NOT NULL COMMENT 'buyer id',
    coupon_info ARRAY<STRING> NOT NULL COMMENT 'coupon info',
    order_amount DOUBLE NOT NULL COMMENT 'order amount',
    dt STRING NOT NULL COMMENT 'yyyy-MM-dd',
    hh STRING NOT NULL COMMENT 'HH'
) COMMENT 'my table'
PARTITIONED BY (dt, hh)
TBLPROPERTIES ('foo' = 'bar', 'primary-key' = 'order_id,dt,hh')

Note:

  • Primary key feature is supported via table properties, and composite primary key is delimited with comma.
  • Partition fields should be predefined, complex partition such like PARTITIONED BY ( col_name2[:] col_type2 [ COMMENT col_comment2 ], ... ) is not supported.
  • For Spark 3.0, CREATE TABLE USING tablestore is required.

Alter Table #

ALTER TABLE table_identifier   
SET TBLPROPERTIES ( key1=val1 ) 
    | RESET TBLPROPERTIES (key2)
    | ADD COLUMNS ( col_name col_type [ , ... ] )
    | { ALTER | CHANGE } COLUMN col_name { DROP NOT NULL | COMMENT 'new_comment'}
  • Change/add table properties
ALTER TABLE tablestore.default.OrderTable SET TBLPROPERTIES (
    'write-buffer-size'='256 MB'
)
  • Remove a table property
ALTER TABLE tablestore.default.OrderTable UNSET TBLPROPERTIES ('write-buffer-size')
  • Add a new column
ALTER TABLE tablestore.default.OrderTable ADD COLUMNS (buy_count INT)
  • Change column nullability
ALTER TABLE tablestore.default.OrderTable ALTER COLUMN coupon_info DROP NOT NULL
  • Change column comment
ALTER TABLE tablestore.default.OrderTable ALTER COLUMN buy_count COMMENT 'buy count'

Note:

  • Spark does not support changing nullable column to nonnull column.

Drop Table #

DROP TABLE tablestore.default.OrderTable
Attention: Drop a table will delete both metadata and files on the disk.

Create Namespace #

CREATE NAMESPACE [IF NOT EXISTS] tablestore.bar

Drop Namespace #

DROP NAMESPACE tablestore.bar
Attention: Drop a namespace will delete all table’s metadata and files under this namespace on the disk.

Spark Type Conversion #

This section lists all supported type conversion between Spark and Flink. All Spark’s data types are available in package org.apache.spark.sql.types.

Spark Data Type Flink Data Type Atomic Type
StructType RowType false
MapType MapType false
ArrayType ArrayType false
BooleanType BooleanType true
ByteType TinyIntType true
ShortType SmallIntType true
IntegerType IntType true
LongType BigIntType true
FloatType FloatType true
DoubleType DoubleType true
StringType VarCharType, CharType true
DateType DateType true
TimestampType TimestampType, LocalZonedTimestamp true
DecimalType(precision, scale) DecimalType(precision, scale) true
BinaryType VarBinaryType, BinaryType true

Note:

  • Currently, Spark’s field comment cannot be described under Flink CLI.
  • Conversion between Spark’s UserDefinedType and Flink’s UserDefinedType is not supported.