Flink

Flink #

This documentation is a guide for using Table Store in Flink.

Preparing Table Store Jar File #

Table Store currently supports Flink 1.16, 1.15 and 1.14. We recommend the latest Flink version for a better experience.

Download the jar file with corresponding version.

Version Jar
Flink 1.16 flink-table-store-dist-0.3.0.jar
Flink 1.15 flink-table-store-dist-0.3.0_1.15.jar
Flink 1.14 flink-table-store-dist-0.3.0_1.14.jar

You can also manually build bundled jar from the source code.

To build from source code, either download the source of a release or clone the git repository.

Build bundled jar with the following command.

Version Command
Flink 1.16 mvn clean install -DskipTests
Flink 1.15 mvn clean install -Dmaven.test.skip=true -Pflink-1.15
Flink 1.14 mvn clean install -Dmaven.test.skip=true -Pflink-1.14

You can find the bundled jar in ./flink-table-store-dist/target/flink-table-store-dist-0.3.0.jar.

Quick Start #

Step 1: Download Flink

If you haven’t downloaded Flink, you can download Flink 1.16, then extract the archive with the following command.

tar -xzf flink-*.tgz

Step 2: Copy Table Store Bundled Jar

Copy table store bundled jar to the lib directory of your Flink home.

cp flink-table-store-dist-*.jar <FLINK_HOME>/lib/

Step 3: Copy Hadoop Bundled Jar

Download Pre-bundled Hadoop jar and copy the jar file to the lib directory of your Flink home.

cp flink-shaded-hadoop-2-uber-*.jar <FLINK_HOME>/lib/

Step 4: Start a Flink Local Cluster

In order to run multiple Flink jobs at the same time, you need to modify the cluster configuration in <FLINK_HOME>/conf/flink-conf.yaml.

taskmanager.numberOfTaskSlots: 2

To start a local cluster, run the bash script that comes with Flink:

<FLINK_HOME>/bin/start-cluster.sh

You should be able to navigate to the web UI at localhost:8081 to view the Flink dashboard and see that the cluster is up and running.

You can now start Flink SQL client to execute SQL scripts.

<FLINK_HOME>/bin/sql-client.sh embedded

Step 5: Create a Catalog and a Table

-- if you're trying out Table Store in a distributed environment,
-- warehouse path should be set to a shared file system, such as HDFS or OSS
CREATE CATALOG my_catalog WITH (
    'type'='table-store',
    'warehouse'='file:/tmp/table_store'
);

USE CATALOG my_catalog;

-- create a word count table
CREATE TABLE word_count (
    word STRING PRIMARY KEY NOT ENFORCED,
    cnt BIGINT
);

Step 6: Write Data

-- create a word data generator table
CREATE TEMPORARY TABLE word_table (
    word STRING
) WITH (
    'connector' = 'datagen',
    'fields.word.length' = '1'
);

-- table store requires checkpoint interval in streaming mode
SET 'execution.checkpointing.interval' = '10 s';

-- write streaming data to dynamic table
INSERT INTO word_count SELECT word, COUNT(*) FROM word_table GROUP BY word;

Step 7: OLAP Query

-- use tableau result mode
SET 'sql-client.execution.result-mode' = 'tableau';

-- switch to batch mode
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';

-- olap query the table
SELECT * FROM word_count;

You can execute the query multiple times and observe the changes in the results.

Step 8: Streaming Query

-- switch to streaming mode
SET 'execution.runtime-mode' = 'streaming';

-- track the changes of table and calculate the count interval statistics
SELECT `interval`, COUNT(*) AS interval_cnt FROM
    (SELECT cnt / 10000 AS `interval` FROM word_count) GROUP BY `interval`;

Step 9: Exit

Cancel streaming job in localhost:8081, then execute the following SQL script to exit Flink SQL client.

-- uncomment the following line if you want to drop the dynamic table and clear the files
-- DROP TABLE word_count;

-- exit sql-client
EXIT;

Stop the Flink local cluster.

./bin/stop-cluster.sh

See Flink Data Types.

All Flink data types are supported, except that

  • MULTISET is not supported.
  • MAP is not supported as primary keys.