Writing Tables #
You can use the INSERT
statement to inserts new rows into a table
or overwrites the existing data in the table. The inserted rows can
be specified by value expressions or result from a query.
Syntax #
INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] { value_expr | query }
-
part_spec
An optional parameter that specifies a comma-separated list of key and value pairs for partitions. Note that one can use a typed literal (e.g., date’2019-01-02’) in the partition spec.
Syntax: PARTITION ( partition_col_name = partition_col_val [ , … ] )
-
column_list
An optional parameter that specifies a comma-separated list of columns belonging to the table_identifier table.
Syntax: (col_name1 [, column_name2, …])
All specified columns should exist in the table and not be duplicated from each other. It includes all columns except the static partition columns. The size of the column list should be exactly the size of the data from VALUES clause or query.
-
value_expr
Specifies the values to be inserted. Either an explicitly specified value or a NULL can be inserted. A comma must be used to separate each value in the clause. More than one set of values can be specified to insert multiple rows.
Syntax: VALUES ( { value | NULL } [ , … ] ) [ , ( … ) ]
Currently, Flink doesn't support use NULL directly, so the NULL should be cast to actual data type by `CAST (NULL AS data_type)`.
For more information, please check the syntax document:
Applying Records/Changes to Tables #
Use INSERT INTO
to apply records and changes to tables.
INSERT INTO MyTable SELECT ...
Use INSERT INTO
to apply records and changes to tables.
INSERT INTO MyTable SELECT ...
Overwriting the Whole Table #
For unpartitioned tables, Table Store supports overwriting the whole table.
Use INSERT OVERWRITE
to overwrite the whole unpartitioned table.
INSERT OVERWRITE MyTable SELECT ...
Overwriting a Partition #
For partitioned tables, Table Store supports overwriting a partition.
Use INSERT OVERWRITE
to overwrite a partition.
INSERT OVERWRITE MyTable PARTITION (key1 = value1, key2 = value2, ...) SELECT ...
Purging tables #
You can use INSERT OVERWRITE
to purge tables by inserting empty value.
INSERT OVERWRITE MyTable SELECT * FROM MyTable WHERE false
Purging Partitions #
Currently, Table Store supports two ways to purge partitions.
-
Like purging tables, you can use
INSERT OVERWRITE
to purge data of partitions by inserting empty value to them. -
Method #1 dose not support to drop multiple partitions. In case that you need to drop multiple partitions, you can submit the drop-partition job through
flink run
.
-- Syntax
INSERT OVERWRITE MyTable PARTITION (key1 = value1, key2 = value2, ...) SELECT selectSpec FROM MyTable WHERE false
-- The following SQL is an example:
-- table definition
CREATE TABLE MyTable (
k0 INT,
k1 INT,
v STRING
) PARTITIONED BY (k0, k1);
-- you can use
INSERT OVERWRITE MyTable PARTITION (k0 = 0) SELECT k1, v FROM MyTable WHERE false
-- or
INSERT OVERWRITE MyTable PARTITION (k0 = 0, k1 = 0) SELECT v FROM MyTable WHERE false
Run the following command to submit a drop-partition job for the table.
<FLINK_HOME>/bin/flink run \
-c org.apache.flink.table.store.connector.action.FlinkActions \
/path/to/flink-table-store-dist-0.3.0.jar \
drop-partition \
--warehouse <warehouse-path> \
--database <database-name> \
--table <table-name>
--partition <partition_spec>
[--partition <partition_spec> ...]
partition_spec:
key1=value1,key2=value2...
For more information of drop-partition, see
<FLINK_HOME>/bin/flink run \
-c org.apache.flink.table.store.connector.action.FlinkActions \
/path/to/flink-table-store-dist-0.3.0.jar \
drop-partition --help