Flink
This documentation is for an unreleased version of Apache Flink Table Store. We recommend you use the latest stable version.

Flink #

This documentation is a guide for using Table Store in Flink.

Preparing Table Store Jar File #

Table Store currently supports Flink 1.16, 1.15 and 1.14. We recommend the latest Flink version for a better experience.

You are using an unreleased version of Table Store so you need to manually build bundled jar from the source code.

To build from source code, either download the source of a release or clone the git repository.

Build bundled jar with the following command.

Version Command
Flink 1.16 mvn clean install -DskipTests
Flink 1.15 mvn clean install -Dmaven.test.skip=true -Pflink-1.15
Flink 1.14 mvn clean install -Dmaven.test.skip=true -Pflink-1.14

You can find the bundled jar in ./flink-table-store-dist/target/flink-table-store-dist-0.4-SNAPSHOT.jar.

Quick Start #

Step 1: Download Flink

If you haven’t downloaded Flink, you can download Flink 1.16, then extract the archive with the following command.

tar -xzf flink-*.tgz

Step 2: Copy Table Store Bundled Jar

Copy table store bundled jar to the lib directory of your Flink home.

cp flink-table-store-dist-*.jar <FLINK_HOME>/lib/

Step 3: Copy Hadoop Bundled Jar

Download Pre-bundled Hadoop jar and copy the jar file to the lib directory of your Flink home.

cp flink-shaded-hadoop-2-uber-*.jar <FLINK_HOME>/lib/

Step 4: Start a Flink Local Cluster

In order to run multiple Flink jobs at the same time, you need to modify the cluster configuration in <FLINK_HOME>/conf/flink-conf.yaml.

taskmanager.numberOfTaskSlots: 2

To start a local cluster, run the bash script that comes with Flink:

<FLINK_HOME>/bin/start-cluster.sh

You should be able to navigate to the web UI at localhost:8081 to view the Flink dashboard and see that the cluster is up and running.

You can now start Flink SQL client to execute SQL scripts.

<FLINK_HOME>/bin/sql-client.sh

Step 5: Create a Catalog and a Table

-- if you're trying out Table Store in a distributed environment,
-- warehouse path should be set to a shared file system, such as HDFS or OSS
CREATE CATALOG my_catalog WITH (
    'type'='table-store',
    'warehouse'='file:/tmp/table_store'
);

USE CATALOG my_catalog;

-- create a word count table
CREATE TABLE word_count (
    word STRING PRIMARY KEY NOT ENFORCED,
    cnt BIGINT
);

Step 6: Write Data

-- create a word data generator table
CREATE TEMPORARY TABLE word_table (
    word STRING
) WITH (
    'connector' = 'datagen',
    'fields.word.length' = '1'
);

-- table store requires checkpoint interval in streaming mode
SET 'execution.checkpointing.interval' = '10 s';

-- write streaming data to dynamic table
INSERT INTO word_count SELECT word, COUNT(*) FROM word_table GROUP BY word;

Step 7: OLAP Query

-- use tableau result mode
SET 'sql-client.execution.result-mode' = 'tableau';

-- switch to batch mode
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';

-- olap query the table
SELECT * FROM word_count;

You can execute the query multiple times and observe the changes in the results.

Step 8: Streaming Query

-- switch to streaming mode
SET 'execution.runtime-mode' = 'streaming';

-- track the changes of table and calculate the count interval statistics
SELECT `interval`, COUNT(*) AS interval_cnt FROM
    (SELECT cnt / 10000 AS `interval` FROM word_count) GROUP BY `interval`;

Step 9: Exit

Cancel streaming job in localhost:8081, then execute the following SQL script to exit Flink SQL client.

-- uncomment the following line if you want to drop the dynamic table and clear the files
-- DROP TABLE word_count;

-- exit sql-client
EXIT;

Stop the Flink local cluster.

./bin/stop-cluster.sh

See Flink Data Types.

All Flink data types are supported, except that

  • MULTISET is not supported.
  • MAP is not supported as primary keys.