Writing Tables
This documentation is for an unreleased version of Apache Flink Table Store. We recommend you use the latest stable version.

Writing Tables #

You can use the INSERT statement to inserts new rows into a table or overwrites the existing data in the table. The inserted rows can be specified by value expressions or result from a query.

Syntax #

INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] { value_expr | query }
  • part_spec

    An optional parameter that specifies a comma-separated list of key and value pairs for partitions. Note that one can use a typed literal (e.g., date’2019-01-02’) in the partition spec.

    Syntax: PARTITION ( partition_col_name = partition_col_val [ , … ] )

  • column_list

    An optional parameter that specifies a comma-separated list of columns belonging to the table_identifier table.

    Syntax: (col_name1 [, column_name2, …])

    All specified columns should exist in the table and not be duplicated from each other.
    It includes all columns except the static partition columns.
      
    The size of the column list should be exactly the size of the data from VALUES clause or query.
    
  • value_expr

    Specifies the values to be inserted. Either an explicitly specified value or a NULL can be inserted. A comma must be used to separate each value in the clause. More than one set of values can be specified to insert multiple rows.

    Syntax: VALUES ( { value | NULL } [ , … ] ) [ , ( … ) ]

    Currently, Flink doesn't support use NULL directly, so the NULL should be cast to actual 
    data type by `CAST (NULL AS data_type)`.
    

For more information, please check the syntax document:

Flink INSERT Statement

Spark INSERT Statement

Streaming reading will ignore the commits generated by INSERT OVERWRITE by default. If you want to read the commits of OVERWRITE, you can configure streaming-read-overwrite.

Applying Records/Changes to Tables #

Use INSERT INTO to apply records and changes to tables.

INSERT INTO MyTable SELECT ...

Table Store supports shuffle data by bucket in sink phase. To improve data skew, Table Store also supports shuffling data by partition fields. You can add option sink.partition-shuffle to the table.

Use INSERT INTO to apply records and changes to tables.

INSERT INTO MyTable SELECT ...

Overwriting the Whole Table #

For unpartitioned tables, Table Store supports overwriting the whole table.

Use INSERT OVERWRITE to overwrite the whole unpartitioned table.

INSERT OVERWRITE MyTable SELECT ...

Overwriting a Partition #

For partitioned tables, Table Store supports overwriting a partition.

Use INSERT OVERWRITE to overwrite a partition.

INSERT OVERWRITE MyTable PARTITION (key1 = value1, key2 = value2, ...) SELECT ...

Purging tables #

You can use INSERT OVERWRITE to purge tables by inserting empty value.

INSERT OVERWRITE MyTable SELECT * FROM MyTable WHERE false

Purging Partitions #

Currently, Table Store supports two ways to purge partitions.

  1. Like purging tables, you can use INSERT OVERWRITE to purge data of partitions by inserting empty value to them.

  2. Method #1 does not support to drop multiple partitions. In case that you need to drop multiple partitions, you can submit the drop-partition job through flink run.

-- Syntax
INSERT OVERWRITE MyTable PARTITION (key1 = value1, key2 = value2, ...) SELECT selectSpec FROM MyTable WHERE false

-- The following SQL is an example:
-- table definition
CREATE TABLE MyTable (
    k0 INT,
    k1 INT,
    v STRING
) PARTITIONED BY (k0, k1);

-- you can use
INSERT OVERWRITE MyTable PARTITION (k0 = 0) SELECT k1, v FROM MyTable WHERE false

-- or
INSERT OVERWRITE MyTable PARTITION (k0 = 0, k1 = 0) SELECT v FROM MyTable WHERE false

Run the following command to submit a drop-partition job for the table.

<FLINK_HOME>/bin/flink run \
    -c org.apache.flink.table.store.connector.action.FlinkActions \
    /path/to/flink-table-store-flink-**-0.4-SNAPSHOT.jar \
    drop-partition \
    --warehouse <warehouse-path> \
    --database <database-name> \
    --table <table-name>
    --partition <partition_spec>
    [--partition <partition_spec> ...]

partition_spec:
key1=value1,key2=value2...

For more information of drop-partition, see

<FLINK_HOME>/bin/flink run \
    -c org.apache.flink.table.store.connector.action.FlinkActions \
    /path/to/flink-table-store-flink-**-0.4-SNAPSHOT.jar \
    drop-partition --help

Deleting from table #

Currently, Table Store supports deleting records via submitting the ‘delete’ job through flink run.

Run the following command to submit a ‘delete’ job for the table.

<FLINK_HOME>/bin/flink run \
    -c org.apache.flink.table.store.connector.action.FlinkActions \
    /path/to/flink-table-store-flink-**-0.4-SNAPSHOT.jar \
    delete \
    --warehouse <warehouse-path> \
    --database <database-name> \
    --table <table-name>
    --where <filter_spec>
    
filter_spec is equal to the 'WHERE' clause in SQL DELETE statement. Examples:
    age >= 18 AND age <= 60
    animal <> 'cat'
    id > (SELECT count(*) FROM employee)

For more information of ‘delete’, see

<FLINK_HOME>/bin/flink run \
    -c org.apache.flink.table.store.connector.action.FlinkActions \
    /path/to/flink-table-store-flink-**-0.4-SNAPSHOT.jar \
    delete --help

Merging into table #

Table Store supports “MERGE INTO” via submitting the ‘merge-into’ job through flink run.

Important table properties setting:

  1. Only primary key table supports this feature.
  2. The action won’t produce UPDATE_BEFORE, so it’s not recommended to set ‘changelog-producer’ = ‘input’.

The design referenced such syntax:

MERGE INTO target-table
  USING source-table | source-expr AS source-alias
  ON merge-condition
  WHEN MATCHED [AND matched-condition]
    THEN UPDATE SET xxx
  WHEN MATCHED [AND matched-condition]
    THEN DELETE
  WHEN NOT MATCHED [AND not-matched-condition]
    THEN INSERT VALUES (xxx)
  WHEN NOT MATCHED BY SOURCE [AND not-matched-by-source-condition]
    THEN UPDATE SET xxx
  WHEN NOT MATCHED BY SOURCE [AND not-matched-by-source-condition]
    THEN DELETE

The merge-into action use “upsert” semantics instead of “update”, which means if the row exists, then do update, else do insert. For example, for non-primary-key table, you can update every column, but for primary key table, if you want to update primary keys, you have to insert a new row which has different primary keys from rows in the table. In this scenario, “upsert” is useful.

Run the following command to submit a ‘merge-into’ job for the table.

<FLINK_HOME>/bin/flink run \
    -c org.apache.flink.table.store.connector.action.FlinkActions \
    /path/to/flink-table-store-flink-**-0.4-SNAPSHOT.jar \
    merge-into \
    --warehouse <warehouse-path> \
    --database <database-name> \
    --table <target-table> \
    [--target-as <target-table-alias>] \
    --source-table <source-table> \
    [--source-as <source-table-alias>] \
    --on <merge-condition> \
    --merge-actions <matched-upsert,matched-delete,not-matched-insert,not-matched-by-source-upsert,not-matched-by-source-delete> \
    --matched-upsert-condition <matched-condition> \
    --matched-upsert-set <upsert-changes> \
    --matched-delete-condition <matched-condition> \
    --not-matched-insert-condition <not-matched-condition> \
    --not-matched-insert-values <insert-values> \
    --not-matched-by-source-upsert-condition <not-matched-by-source-condition> \
    --not-matched-by-source-upsert-set <not-matched-upsert-changes> \
    --not-matched-by-source-delete-condition <not-matched-by-source-condition>
    
Alternatively, you can use '--source-sql <sql> [, --source-sql <sql> ...]' to create a new table as source table at runtime.
    
-- Examples:
-- Find all orders mentioned in the source table, then mark as important if the price is above 100 
-- or delete if the price is under 10.
./flink run \
    -c org.apache.flink.table.store.connector.action.FlinkActions \
    /path/to/flink-table-store-flink-**-0.4-SNAPSHOT.jar \
    merge-into \
    --warehouse <warehouse-path> \
    --database <database-name> \
    --table T \
    --source-table S \
    --on "T.id = S.order_id" \
    --merge-actions \
    matched-upsert,matched-delete \
    --matched-upsert-condition "T.price > 100" \
    --matched-upsert-set "mark = 'important'" \
    --matched-delete-condition "T.price < 10" 
    
-- For matched order rows, increase the price, and if there is no match, insert the order from the 
-- source table:
./flink run \
    -c org.apache.flink.table.store.connector.action.FlinkActions \
    /path/to/flink-table-store-flink-**-0.4-SNAPSHOT.jar \
    merge-into \
    --warehouse <warehouse-path> \
    --database <database-name> \
    --table T \
    --source-table S \
    --on "T.id = S.order_id" \
    --merge-actions \
    matched-upsert,not-matched-insert \
    --matched-upsert-set "price = T.price + 20" \
    --not-matched-insert-values * 

-- For not matched by source order rows (which are in the target table and does not match any row in the
-- source table based on the merge-condition), decrease the price or if the mark is 'trivial', delete them:
./flink run \
    -c org.apache.flink.table.store.connector.action.FlinkActions \
    /path/to/flink-table-store-flink-**-0.4-SNAPSHOT.jar \
    merge-into \
    --warehouse <warehouse-path> \
    --database <database-name> \
    --table T \
    --source-table S \
    --on "T.id = S.order_id" \
    --merge-actions \
    not-matched-by-source-upsert,not-matched-by-source-delete \
    --not-matched-by-source-upsert-condition "T.mark <> 'trivial'" \
    --not-matched-by-source-upsert-set "price = T.price - 20" \
    --not-matched-by-source-delete-condition "T.mark = 'trivial'"
    
-- An source-sql example: 
-- Create a temporary view S in new catalog and use it as source table
./flink run \
    -c org.apache.flink.table.store.connector.action.FlinkActions \
    /path/to/flink-table-store-flink-**-0.4-SNAPSHOT.jar \
    merge-into \
    --warehouse <warehouse-path> \
    --database <database-name> \
    --table T \
    --source-sql "CREATE CATALOG test WITH (...)" \
    --source-sql "USE CATALOG test" \
    --source-sql "USE DATABASE default" \
    --source-sql "CREATE TEMPORARY VIEW S AS SELECT order_id, price, 'important' FROM important_order" \
    --source-as test.default.S \
    --on "T.id = S.order_id" \
    --merge-actions not-matched-insert\
    --not-matched-insert-values *

The term ‘matched’ explanation:

  1. matched: changed rows are from target table and each can match a source table row based on merge-condition and optional matched-condition (source ∩ target).
  2. not-matched: changed rows are from source table and all rows cannot match any target table row based on merge-condition and optional not-matched-condition (source - target).
  3. not-matched-by-source: changed rows are from target table and all row cannot match any source table row based on merge-condition and optional not-matched-by-source-condition (target - source).

Parameters format:
All conditions, set changes and values should use Flink SQL syntax. Please quote them with " to escape special characters.

  1. matched-upsert-changes:
    col = .col | expression [, …] (Means set target.col with given value. Do not add ‘.’ before ‘col’.)
    Especially, you can use ‘*’ to set columns with all source columns (require target table’s schema is equal to source’s).
  2. not-matched-upsert-changes is similar to matched-upsert-changes, but you cannot reference source table’s column or use ‘*’.
  3. insert-values:
    col1,col2,…,col_end\ Must specify values of all columns. For each column, you can reference .col or use an expression.
    Especially, you can use ‘*’ to insert with all source columns (require target table’s schema is equal to source’s).
  4. not-matched-condition cannot use target table’s columns to construct condition expression.
  5. not-matched-by-source-condition cannot use source table’s columns to construct condition expression.
  1. source-alias cannot be duplicated with existed table name. If you use –source-ddl, source-alias must be specified and equal to the table name in “CREATE” statement.
  2. If the source table is not in the same place as target table, the source-table-name or the source-alias should be qualified (database.table or catalog.database.table if in different catalog).
  3. At least one merge action must be specified.
  4. If both matched-upsert and matched-delete actions are present, their conditions must both be present too (same to not-matched-by-source-upsert and not-matched-by-source-delete). Otherwise, all conditions are optional.

For more information of ‘merge-into’, see

<FLINK_HOME>/bin/flink run \
    -c org.apache.flink.table.store.connector.action.FlinkActions \
    /path/to/flink-table-store-flink-**-0.4-SNAPSHOT.jar \
    merge-into --help