This documentation is for an unreleased version of Apache Flink Table Store. We recommend you use the latest stable version.
Spark3 #
This documentation is a guide for using Table Store in Spark3.
Preparing Table Store Jar File #
Table Store currently supports Spark 3.3, 3.2 and 3.1. We recommend the latest Spark version for a better experience.
You are using an unreleased version of Table Store so you need to manually build bundled jar from the source code.To build from source code, either download the source of a release or clone the git repository.
Build bundled jar with the following command.
mvn clean install -DskipTests
For Spark 3.3, you can find the bundled jar in ./flink-table-store-spark/flink-table-store-spark-3.3/target/flink-table-store-spark-3.3-0.4-SNAPSHOT.jar
.
Quick Start #
If you are using HDFS, make sure that the environment variableHADOOP_HOME
orHADOOP_CONF_DIR
is set.
Step 1: Specify Table Store Jar File
Append path to table store jar file to the --jars
argument when starting spark-sql
.
spark-sql ... --jars /path/to/flink-table-store-spark-3.3-0.4-SNAPSHOT.jar
Alternatively, you can copy flink-table-store-spark-3.3-0.4-SNAPSHOT.jar
under spark/jars
in your Spark installation directory.
Step 2: Specify Table Store Catalog
When starting spark-sql
, use the following command to register Table Store’s Spark catalog with the name tablestore
. Table files of the warehouse is stored under /tmp/table_store
.
spark-sql ... \
--conf spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog \
--conf spark.sql.catalog.tablestore.warehouse=file:/tmp/table_store
After spark-sql
command line has started, run the following SQL to create and switch to database tablestore.default
.
CREATE DATABASE tablestore.default;
USE tablestore.default;
Step 3: Create a table and Write Some Records
create table my_table (
k int,
v string
) tblproperties (
'primary-key' = 'k'
);
INSERT INTO my_table VALUES (1, 'Hi'), (2, 'Hello');
Step 4: Query Table with SQL
SELECT * FROM my_table;
/*
1 Hi
2 Hello
*/
Step 5: Update the Records
INSERT INTO my_table VALUES (1, 'Hi Again'), (3, 'Test');
SELECT * FROM my_table;
/*
1 Hi Again
2 Hello
3 Test
*/
Step 6: Query Table with Scala API
If you don’t want to use Table Store catalog, you can also run spark-shell
and query the table with Scala API.
spark-shell ... --jars /path/to/flink-table-store-spark-3.3-0.4-SNAPSHOT.jar
val dataset = spark.read.format("tablestore").load("file:/tmp/table_store/default.db/my_table")
dataset.createOrReplaceTempView("my_table")
spark.sql("SELECT * FROM my_table").show()
Spark Type Conversion #
This section lists all supported type conversion between Spark and Flink.
All Spark’s data types are available in package org.apache.spark.sql.types
.
Spark Data Type | Flink Data Type | Atomic Type |
---|---|---|
StructType |
RowType |
false |
MapType |
MapType |
false |
ArrayType |
ArrayType |
false |
BooleanType |
BooleanType |
true |
ByteType |
TinyIntType |
true |
ShortType |
SmallIntType |
true |
IntegerType |
IntType |
true |
LongType |
BigIntType |
true |
FloatType |
FloatType |
true |
DoubleType |
DoubleType |
true |
StringType |
VarCharType , CharType |
true |
DateType |
DateType |
true |
TimestampType |
TimestampType , LocalZonedTimestamp |
true |
DecimalType(precision, scale) |
DecimalType(precision, scale) |
true |
BinaryType |
VarBinaryType , BinaryType |
true |
- Currently, Spark’s field comment cannot be described under Flink CLI.
- Conversion between Spark’s
UserDefinedType
and Flink’sUserDefinedType
is not supported.