Spark3 #
This documentation is a guide for using Table Store in Spark3.
Preparing Table Store Jar File #
Download flink-table-store-spark-0.3.0.jar.
You can also manually build bundled jar from the source code.
To build from source code, either download the source of a release or clone the git repository.
Build bundled jar with the following command.
mvn clean install -DskipTests
You can find the bundled jar in ./flink-table-store-spark/target/flink-table-store-spark-0.3.0.jar
.
Quick Start #
If you are using HDFS, make sure that the environment variableHADOOP_HOME
orHADOOP_CONF_DIR
is set.
Step 1: Specify Table Store Jar File
Append path to table store jar file to the --jars
argument when starting spark-sql
.
spark-sql ... --jars /path/to/flink-table-store-spark-0.3.0.jar
Alternatively, you can copy flink-table-store-spark-0.3.0.jar
under spark/jars
in your Spark installation directory.
Step 2: Specify Table Store Catalog
When starting spark-sql
, use the following command to register Table Store’s Spark catalog with the name tablestore
. Table files of the warehouse is stored under /tmp/table_store
.
spark-sql ... \
--conf spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog \
--conf spark.sql.catalog.tablestore.warehouse=file:/tmp/table_store
After spark-sql
command line has started, run the following SQL to create and switch to database tablestore.default
.
CREATE DATABASE tablestore.default;
USE tablestore.default;
Step 3: Create a table and Write Some Records
create table my_table (
k int,
v string
) tblproperties (
'primary-key' = 'k'
);
INSERT INTO my_table VALUES (1, 'Hi'), (2, 'Hello');
Step 4: Query Table with SQL
SELECT * FROM my_table;
/*
1 Hi
2 Hello
*/
Step 5: Update the Records
INSERT INTO my_table VALUES (1, 'Hi Again'), (3, 'Test');
SELECT * FROM my_table;
/*
1 Hi Again
2 Hello
3 Test
*/
Step 6: Query Table with Scala API
If you don’t want to use Table Store catalog, you can also run spark-shell
and query the table with Scala API.
spark-shell ... --jars /path/to/flink-table-store-spark-0.3.0.jar
val dataset = spark.read.format("tablestore").load("file:/tmp/table_store/default.db/my_table")
dataset.createOrReplaceTempView("my_table")
spark.sql("SELECT * FROM my_table").show()
Spark Type Conversion #
This section lists all supported type conversion between Spark and Flink.
All Spark’s data types are available in package org.apache.spark.sql.types
.
Spark Data Type | Flink Data Type | Atomic Type |
---|---|---|
StructType |
RowType |
false |
MapType |
MapType |
false |
ArrayType |
ArrayType |
false |
BooleanType |
BooleanType |
true |
ByteType |
TinyIntType |
true |
ShortType |
SmallIntType |
true |
IntegerType |
IntType |
true |
LongType |
BigIntType |
true |
FloatType |
FloatType |
true |
DoubleType |
DoubleType |
true |
StringType |
VarCharType , CharType |
true |
DateType |
DateType |
true |
TimestampType |
TimestampType , LocalZonedTimestamp |
true |
DecimalType(precision, scale) |
DecimalType(precision, scale) |
true |
BinaryType |
VarBinaryType , BinaryType |
true |
- Currently, Spark’s field comment cannot be described under Flink CLI.
- Conversion between Spark’s
UserDefinedType
and Flink’sUserDefinedType
is not supported.