Spark3
This documentation is for an unreleased version of Apache Flink Table Store. We recommend you use the latest stable version.

Spark3 #

This documentation is a guide for using Table Store in Spark3.

Preparing Table Store Jar File #

Table Store currently supports Spark 3.3, 3.2 and 3.1. We recommend the latest Spark version for a better experience.

You are using an unreleased version of Table Store so you need to manually build bundled jar from the source code.

To build from source code, either download the source of a release or clone the git repository.

Build bundled jar with the following command.

mvn clean install -DskipTests

For Spark 3.3, you can find the bundled jar in ./flink-table-store-spark/flink-table-store-spark-3.3/target/flink-table-store-spark-3.3-0.4-SNAPSHOT.jar.

Quick Start #

If you are using HDFS, make sure that the environment variable HADOOP_HOME or HADOOP_CONF_DIR is set.

Step 1: Specify Table Store Jar File

Append path to table store jar file to the --jars argument when starting spark-sql.

spark-sql ... --jars /path/to/flink-table-store-spark-3.3-0.4-SNAPSHOT.jar

Alternatively, you can copy flink-table-store-spark-3.3-0.4-SNAPSHOT.jar under spark/jars in your Spark installation directory.

Step 2: Specify Table Store Catalog

When starting spark-sql, use the following command to register Table Store’s Spark catalog with the name tablestore. Table files of the warehouse is stored under /tmp/table_store.

spark-sql ... \
    --conf spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog \
    --conf spark.sql.catalog.tablestore.warehouse=file:/tmp/table_store

After spark-sql command line has started, run the following SQL to create and switch to database tablestore.default.

CREATE DATABASE tablestore.default;
USE tablestore.default;

Step 3: Create a table and Write Some Records

create table my_table (
    k int,
    v string
) tblproperties (
    'primary-key' = 'k'
);

INSERT INTO my_table VALUES (1, 'Hi'), (2, 'Hello');

Step 4: Query Table with SQL

SELECT * FROM my_table;
/*
1	Hi
2	Hello
*/

Step 5: Update the Records

INSERT INTO my_table VALUES (1, 'Hi Again'), (3, 'Test');

SELECT * FROM my_table;
/*
1	Hi Again
2	Hello
3	Test
*/

Step 6: Query Table with Scala API

If you don’t want to use Table Store catalog, you can also run spark-shell and query the table with Scala API.

spark-shell ... --jars /path/to/flink-table-store-spark-3.3-0.4-SNAPSHOT.jar
val dataset = spark.read.format("tablestore").load("file:/tmp/table_store/default.db/my_table")
dataset.createOrReplaceTempView("my_table")
spark.sql("SELECT * FROM my_table").show()

Spark Type Conversion #

This section lists all supported type conversion between Spark and Flink. All Spark’s data types are available in package org.apache.spark.sql.types.

Spark Data Type Flink Data Type Atomic Type
StructType RowType false
MapType MapType false
ArrayType ArrayType false
BooleanType BooleanType true
ByteType TinyIntType true
ShortType SmallIntType true
IntegerType IntType true
LongType BigIntType true
FloatType FloatType true
DoubleType DoubleType true
StringType VarCharType, CharType true
DateType DateType true
TimestampType TimestampType, LocalZonedTimestamp true
DecimalType(precision, scale) DecimalType(precision, scale) true
BinaryType VarBinaryType, BinaryType true
  • Currently, Spark’s field comment cannot be described under Flink CLI.
  • Conversion between Spark’s UserDefinedType and Flink’s UserDefinedType is not supported.