This documentation is for an unreleased version of Apache Flink Table Store. We recommend you use the latest stable version.
Create Table #
Managed Table in Table Store Catalog #
If you need to manage tables uniformly according to the catalog and database. You can consider using the Table Store Catalog to create managed tables. In this case, creating tables will actually create file structures, and deleting tables will actually delete table data.
Catalog #
Table Store uses its own catalog to manage all the databases and tables. Users
need to configure the type table-store
and a root directory warehouse
to use it.
CREATE CATALOG my_catalog WITH (
'type'='table-store',
'warehouse'='hdfs://nn:8020/warehouse/path' -- or 'file://tmp/foo/bar'
);
USE CATALOG my_catalog;
Table Store catalog supports SQL DDL commands:
CREATE TABLE ... PARTITIONED BY
DROP TABLE ...
ALTER TABLE ...
SHOW DATABASES
SHOW TABLES
Create Managed Table #
CREATE TABLE MyTable (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
PRIMARY KEY (dt, user_id) NOT ENFORCED
) PARTITIONED BY (dt);
This will create a directory under ${warehouse}/${database_name}.db/${table_name}
.
Mapping Table in Generic Catalog #
If you do not want to create a Table Store Catalog, you only want to read and write a table separately. You can use the mapping table, which is a standard Flink connector table.
The SQL CREATE TABLE T (..) WITH ('connector'='table-store', 'path'='...')
will
create a Table Store table in current catalog, the catalog should support generic
Flink connector tables, the available catalogs are GenericInMemoryCatalog
(by default)
and HiveCatalog
. The generic catalog only manages the mapping relationship between
tables and underlying file structure in path
, but does not really create and delete
tables.
- By default, the mapping table needs to be mapped to an actual underlying file structure
in FileSystem
path
. If the file structure inpath
does not exist, an exception will be thrown. - If you want to create the file structure automatically when reading or writing a table,
you can configure
auto-create
totrue
:CREATE TABLE T (..) WITH ('connector'='table-store', 'path'='...', 'auto-create'='true')
.
For example:
CREATE TABLE MyTable (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
PRIMARY KEY (dt, user_id) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
'connector'='table-store',
'path'='hdfs://nn:8020/my_table_path',
'auto-create'='true'
);
-- This will create a directory structure under path.
INSERT INTO MyTable SELECT ...;
Table Syntax #
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
(
{ <physical_column_definition> | <computed_column_definition> }[ , ...n]
[ <watermark_definition> ]
[ <table_constraint> ][ , ...n]
)
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (key1=val1, key2=val2, ...)
<physical_column_definition>:
column_name column_type [ <column_constraint> ] [COMMENT column_comment]
<column_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED
<table_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
<computed_column_definition>:
column_name AS computed_column_expression [COMMENT column_comment]
<watermark_definition>:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
Note:
- To ensure the uniqueness of the primary key, the primary key must contain the partition field.
- If your actual primary key does not contain partition fields, but the input is complete CDC data, including
UPDATE_BEFORE
records, even if you declare the primary key containing partition field, you can achieve the unique effect.
Table Options #
Important options include the following:
Option | Required | Default | Type | Description |
---|---|---|---|---|
bucket |
Yes | 1 | Integer | The bucket number for table store. |
log.system |
No | (none) | String | The log system used to keep changes of the table, supports 'kafka'. |
kafka.bootstrap.servers |
No | (none) | String | Required Kafka server connection string for log store. |
kafka.topic |
No | (none) | String | Topic of this kafka table. |
Distribution #
The data distribution of Table Store consists of three concepts: Partition, Bucket, and Primary Key.
CREATE TABLE MyTable (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
PRIMARY KEY (dt, user_id) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
'bucket' = '4'
);
For example, the above MyTable
has its data distribution
in the following order:
- Partition: isolating different data based on partition fields.
- Bucket: Within a single partition, distributed into 4 different buckets based on the hash value of the primary key.
- Primary key: Within a single bucket, sorted by primary key to build LSM structure.
Partition #
Table Store adopts the same partitioning concept as Apache Hive to separate data, and thus various operations can be managed by partition as a management unit.
Partitioned filtering is the most effective way to improve performance, your query statements should contain partition filtering conditions as much as possible.
Bucket #
Bucket is the concept of dividing data into more manageable parts for more efficient queries.
With N
as bucket number, records are falling into (0, 1, ..., N-1)
buckets. For each record, which bucket
it belongs is computed by the hash value of one or more columns (denoted as bucket key), and mod by bucket number.
bucket_id = hash_func(bucket_key) % num_of_buckets
Users can specify the bucket key as follows
CREATE TABLE MyTable (
catalog_id BIGINT,
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING
) WITH (
'bucket-key' = 'catalog_id'
);
Note:
- If users do not specify the bucket key explicitly, the primary key (if present) or the whole row is used as bucket key.
- Bucket key cannot be changed once the table is created.
ALTER TALBE SET ('bucket-key' = ...)
orALTER TABLE RESET ('bucket-key')
will throw exception.
The number of buckets is very important as it determines the worst-case maximum processing parallelism. But it should not be too big, otherwise, the system will create a lot of small files.
In general, the desired file size is 128 MB, the recommended data to be kept on disk in each sub-bucket is about 1 GB.
Primary Key #
The primary key is unique and indexed.
Flink Table Store imposes an ordering of data, which means the system will sort the primary key within each bucket. All fields will be used to sort if no primary key is defined. Using this feature, you can achieve high performance by adding filter conditions on the primary key.
The primary key’s choice is critical, especially when setting the composite primary key. A rule of thumb is to put the most frequently queried field in the front. For example:
CREATE TABLE MyTable (
catalog_id BIGINT,
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
......
);
For this table, assuming that the composite primary keys are
the catalog_id
and user_id
fields, there are two ways to
set the primary key:
- PRIMARY KEY (user_id, catalog_id)
- PRIMARY KEY (catalog_id, user_id)
The two methods do not behave in the same way when querying.
Use approach one if you have a large number of filtered queries
with only user_id
, and use approach two if you have a large
number of filtered queries with only catalog_id
.
Partial Update #
You can configure partial update from options:
CREATE TABLE MyTable (
product_id BIGINT,
price DOUBLE,
number BIGINT,
detail STRING,
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'merge-engine' = 'partial-update'
);
INSERT INTO MyTable
SELECT product_id, price, number, CAST(NULL AS STRING) FROM Src1 UNION ALL
SELECT product_id, CAST(NULL AS DOUBLE), CAST(NULL AS BIGINT), detail FROM Src2;
The value fields are updated to the latest data one by one under the same primary key, but null values are not overwritten.
For example, the inputs:
- <1, 23.0, 10, NULL>
- <1, NULL, NULL, ‘This is a book’>
- <1, 25.2, NULL, NULL>
Output:
- <1, 25.2, 10, ‘This is a book’>
Note:
- Partial update is only supported for table with primary key.
- Partial update is only supported for streaming consuming when using full-compaction changelog producer.
- It is best not to have NULL values in the fields, NULL will not overwrite data.
Pre-aggregate #
You can configure pre-aggregate for each value field from options:
CREATE TABLE MyTable (
product_id BIGINT,
price DOUBLE,
sales BIGINT,
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'merge-engine' = 'aggregation',
'fields.price.aggregate-function'='max',
'fields.sales.aggregate-function'='sum'
);
Each value field is aggregated with the latest data one by one under the same primary key according to the aggregate function.
For example, the inputs:
- <1, 23.0, 15>
- <1, 30.2, 20>
Output:
- <1, 30.2, 35>
Supported Aggregate Functions and Related Data Types #
Supported aggregate functions include sum
, max/min
, last_non_null_value
, last_value
, listagg
, bool_or/bool_and
.
These functions support different data types.
sum
supports DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE data types.max/min
support DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ data types.last_non_null_value/last_value
support all data types.listagg
supports STRING data types.bool_and/bool_or
support BOOLEAN data type.
Note:
- Pre-aggregate is only supported for table with primary key.
- Pre-aggregate is only supported for streaming consuming when using full-compaction changelog producer.
- Pre-aggregate currently only support INSERT changes.
Append-only Table #
Append-only tables are a performance feature that only accepts INSERT_ONLY
data to append to the storage instead of
updating or de-duplicating the existing data, and hence suitable for use cases that do not require updates (such as log data synchronization).
Create Append-only Table #
By specifying the core option 'write-mode'
to 'append-only'
, users can create an append-only table as follows.
CREATE TABLE IF NOT EXISTS T1 (
f0 INT,
f1 DOUBLE,
f2 STRING
) WITH (
'write-mode' = 'append-only',
'bucket' = '1' --specify the total number of buckets
)
Note:
- By definition, users cannot define primary keys on an append-only table.
- Append-only table is different from a change-log table which does not define primary keys. For the latter, updating or deleting the whole row is accepted, although no primary key is present.
Query Append-only Table #
Table Store supports reading append-only table with preserved sequential order within each bucket.
For example, with following write
INSERT INTO T1 VALUES
(1, 1.0, 'AAA'), (2, 2.0, 'BBB'),
(3, 3.0, 'CCC'), (1, 1.0, 'AAA')
The query
SELECT * FROM T1
will return exactly the order of (1, 1.0, 'AAA'), (2, 2.0, 'BBB'), (3, 3.0, 'CCC'), (1, 1.0, 'AAA')
because the total number of bucket is 1, and all records falls into the same bucket.
If we create another table with more than one bucket and specify the bucket key
CREATE TABLE IF NOT EXISTS T2 (
f0 INT,
f1 DOUBLE,
f2 STRING
) WITH (
'write-mode' = 'append-only',
'bucket' = '2',
'bucket-key' = 'f0'
)
The following write will write (1, 1.0, 'AAA'), (2, 2.0, 'BBB'), (1, 1.0, 'AAA')
to bucket-0 and (3, 3.0, 'CCC')
to bucket-1
INSERT INTO T2 VALUES
(1, 1.0, 'AAA'), (2, 2.0, 'BBB'),
(3, 3.0, 'CCC'), (1, 1.0, 'AAA')
The query
SELECT * FROM T2
will return either (1, 1.0, 'AAA'), (2, 2.0, 'BBB'), (1, 1.0, 'AAA'), (3, 3.0, 'CCC')
or (3, 3.0, 'CCC'), (1, 1.0, 'AAA'), (2, 2.0, 'BBB'), (1, 1.0, 'AAA')
.
Supported Flink Data Type #
Users can refer to Flink Data Types.
Note:MULTISET
is not supported for allwrite-mode
, andMAP
is only supported as a non-primary key field in a primary-keyed table.