ALTER 语句

ALTER 语句 #

ALTER 语句用于修改一个已经在 Catalog 中注册的表、视图或函数定义,或 catalog 本身的定义。

Flink SQL 目前支持以下 ALTER 语句:

  • ALTER TABLE
  • ALTER VIEW
  • ALTER DATABASE
  • ALTER FUNCTION
  • ALTER CATALOG

执行 ALTER 语句 #

可以使用 TableEnvironment 中的 executeSql() 方法执行 ALTER 语句。 若 ALTER 操作执行成功,executeSql() 方法返回 ‘OK’,否则会抛出异常。

以下的例子展示了如何在 TableEnvironment 中执行一个 ALTER 语句。

可以使用 TableEnvironment 中的 executeSql() 方法执行 ALTER 语句。 若 ALTER 操作执行成功,executeSql() 方法返回 ‘OK’,否则会抛出异常。

以下的例子展示了如何在 TableEnvironment 中执行一个 ALTER 语句。

可以使用 TableEnvironment 中的 execute_sql() 方法执行 ALTER 语句。 若 ALTER 操作执行成功,execute_sql() 方法返回 ‘OK’,否则会抛出异常。

以下的例子展示了如何在 TableEnvironment 中执行一个 ALTER 语句。

可以在 SQL CLI 中执行 ALTER 语句。

以下的例子展示了如何在 SQL CLI 中执行一个 ALTER 语句。

TableEnvironment tableEnv = TableEnvironment.create(...);

// 注册名为 “Orders” 的表
tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");

// 字符串数组: ["Orders"]
String[] tables = tableEnv.listTables();
// or tableEnv.executeSql("SHOW TABLES").print();

// 新增列 `order` 并置于第一位
tableEnv.executeSql("ALTER TABLE Orders ADD `order` INT COMMENT 'order identifier' FIRST");

// 新增更多列, 以及主键和 watermark
tableEnv.executeSql("ALTER TABLE Orders ADD (ts TIMESTAMP(3), category STRING AFTER product, PRIMARY KEY(`order`) NOT ENFORCED, WATERMARK FOR ts AS ts - INTERVAL '1' HOUR)");

// 修改列类型, 注释及 watermark 策略
tableEnv.executeSql("ALTER TABLE Orders MODIFY (amount DOUBLE NOT NULL, category STRING COMMENT 'category identifier' AFTER `order`, WATERMARK FOR ts AS ts)");

// 删除 watermark
tableEnv.executeSql("ALTER TABLE Orders DROP WATERMARK");

// 删除列
tableEnv.executeSql("ALTER TABLE Orders DROP (amount, ts, category)");

// 重命名列
tableEnv.executeSql("ALTER TABLE Orders RENAME `order` TO order_id");

// "Orders" 的表名改为 "NewOrders"
tableEnv.executeSql("ALTER TABLE Orders RENAME TO NewOrders");

// 字符串数组:["NewOrders"]
String[] tables = tableEnv.listTables();
// or tableEnv.executeSql("SHOW TABLES").print();

// 注册名为 "cat2" 的 catalog
tableEnv.executeSql("CREATE CATALOG cat2 WITH ('type'='generic_in_memory')");

// 增加属性 `default-database`
tableEnv.executeSql("ALTER CATALOG cat2 SET ('default-database'='db')");
val tableEnv = TableEnvironment.create(...)

// 注册名为 “Orders” 的表
tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")

// 新增列 `order` 并置于第一位
tableEnv.executeSql("ALTER TABLE Orders ADD `order` INT COMMENT 'order identifier' FIRST")

// 新增更多列, 以及主键和 watermark
tableEnv.executeSql("ALTER TABLE Orders ADD (ts TIMESTAMP(3), category STRING AFTER product, PRIMARY KEY(`order`) NOT ENFORCED, WATERMARK FOR ts AS ts - INTERVAL '1' HOUR)")

// 修改列类型, 注释, 以及主键和 watermark
tableEnv.executeSql("ALTER TABLE Orders MODIFY (amount DOUBLE NOT NULL, category STRING COMMENT 'category identifier' AFTER `order`, WATERMARK FOR ts AS ts)")

// 删除 watermark
tableEnv.executeSql("ALTER TABLE Orders DROP WATERMARK")

// 删除列
tableEnv.executeSql("ALTER TABLE Orders DROP (amount, ts, category)")

// 重命名列
tableEnv.executeSql("ALTER TABLE Orders RENAME `order` TO order_id")

// 字符串数组: ["Orders"]
val tables = tableEnv.listTables()
// or tableEnv.executeSql("SHOW TABLES").print()

// rename "Orders" to "NewOrders"
tableEnv.executeSql("ALTER TABLE Orders RENAME TO NewOrders")

// 字符串数组:["NewOrders"]
val tables = tableEnv.listTables()
// or tableEnv.executeSql("SHOW TABLES").print()

// 注册名为 "cat2" 的 catalog
tableEnv.executeSql("CREATE CATALOG cat2 WITH ('type'='generic_in_memory')")

// 增加属性 `default-database`
tableEnv.executeSql("ALTER CATALOG cat2 SET ('default-database'='db')")
table_env = TableEnvironment.create(...)

# 字符串数组: ["Orders"]
tables = table_env.list_tables()
# or table_env.execute_sql("SHOW TABLES").print()

# 新增列 `order` 并置于第一位
table_env.execute_sql("ALTER TABLE Orders ADD `order` INT COMMENT 'order identifier' FIRST")

# 新增更多列, 主键及 watermark
table_env.execute_sql("ALTER TABLE Orders ADD (ts TIMESTAMP(3), category STRING AFTER product, PRIMARY KEY(`order`) NOT ENFORCED, WATERMARK FOR ts AS ts - INTERVAL '1' HOUR)")

# 修改列类型, 列注释, 主键及 watermark
table_env.execute_sql("ALTER TABLE Orders MODIFY (amount DOUBLE NOT NULL, category STRING COMMENT 'category identifier' AFTER `order`, WATERMARK FOR ts AS ts)")

# 删除 watermark
table_env.execute_sql("ALTER TABLE Orders DROP WATERMARK")

# 删除列
table_env.execute_sql("ALTER TABLE Orders DROP (amount, ts, category)")

# 重命名列
table_env.execute_sql("ALTER TABLE Orders RENAME `order` TO order_id")

# 把 "Orders" 的表名改为 "NewOrders"
table_env.execute_sql("ALTER TABLE Orders RENAME TO NewOrders")

# 字符串数组:["NewOrders"]
tables = table_env.list_tables()
# or table_env.execute_sql("SHOW TABLES").print()

# 注册名为 "cat2" 的 catalog
table_env.execute_sql("CREATE CATALOG cat2 WITH ('type'='generic_in_memory')")

# 增加属性 `default-database`
table_env.execute_sql("ALTER CATALOG cat2 SET ('default-database'='db')")
Flink SQL> CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...);
[INFO] Execute statement succeeded.

Flink SQL> ALTER TABLE Orders ADD `order` INT COMMENT 'order identifier' FIRST;
[INFO] Execute statement succeeded.

Flink SQL> DESCRIBE Orders;
+---------+--------+------+-----+--------+-----------+------------------+
|    name |   type | null | key | extras | watermark |          comment |
+---------+--------+------+-----+--------+-----------+------------------+
|   order |    INT | TRUE |     |        |           | order identifier |
|    user | BIGINT | TRUE |     |        |           |                  |
| product | STRING | TRUE |     |        |           |                  |
|  amount |    INT | TRUE |     |        |           |                  |
+---------+--------+------+-----+--------+-----------+------------------+
4 rows in set

Flink SQL> ALTER TABLE Orders ADD (ts TIMESTAMP(3), category STRING AFTER product, PRIMARY KEY(`order`) NOT ENFORCED, WATERMARK FOR ts AS ts - INTERVAL '1' HOUR);
[INFO] Execute statement succeeded. 

Flink SQL> DESCRIBE Orders;
+----------+------------------------+-------+------------+--------+--------------------------+------------------+
|     name |                   type |  null |        key | extras |                watermark |          comment |
+----------+------------------------+-------+------------+--------+--------------------------+------------------+
|    order |                    INT | FALSE | PRI(order) |        |                          | order identifier |
|     user |                 BIGINT |  TRUE |            |        |                          |                  |
|  product |                 STRING |  TRUE |            |        |                          |                  |
| category |                 STRING |  TRUE |            |        |                          |                  |
|   amount |                    INT |  TRUE |            |        |                          |                  |
|       ts | TIMESTAMP(3) *ROWTIME* |  TRUE |            |        | `ts` - INTERVAL '1' HOUR |                  |
+----------+------------------------+-------+------------+--------+--------------------------+------------------+
6 rows in set

Flink SQL> ALTER TABLE Orders MODIFY (amount DOUBLE NOT NULL, category STRING COMMENT 'category identifier' AFTER `order`, WATERMARK FOR ts AS ts);
[INFO] Execute statement succeeded. 

Flink SQL> DESCRIBE Orders;
+----------+------------------------+-------+------------+--------+-----------+---------------------+
|     name |                   type |  null |        key | extras | watermark |             comment |
+----------+------------------------+-------+------------+--------+-----------+---------------------+
|    order |                    INT | FALSE | PRI(order) |        |           |    order identifier |
| category |                 STRING |  TRUE |            |        |           | category identifier |
|     user |                 BIGINT |  TRUE |            |        |           |                     |
|  product |                 STRING |  TRUE |            |        |           |                     |
|   amount |                 DOUBLE | FALSE |            |        |           |                     |
|       ts | TIMESTAMP(3) *ROWTIME* |  TRUE |            |        |      `ts` |                     |
+----------+------------------------+-------+------------+--------+-----------+---------------------+
6 rows in set

Flink SQL> ALTER TABLE Orders DROP WATERMARK;
[INFO] Execute statement succeeded.

Flink SQL> DESCRIBE Orders;
+----------+--------------+-------+------------+--------+-----------+---------------------+
|     name |         type |  null |        key | extras | watermark |             comment |
+----------+--------------+-------+------------+--------+-----------+---------------------+
|    order |          INT | FALSE | PRI(order) |        |           |    order identifier |
| category |       STRING |  TRUE |            |        |           | category identifier |
|     user |       BIGINT |  TRUE |            |        |           |                     |
|  product |       STRING |  TRUE |            |        |           |                     |
|   amount |       DOUBLE | FALSE |            |        |           |                     |
|       ts | TIMESTAMP(3) |  TRUE |            |        |           |                     |
+----------+--------------+-------+------------+--------+-----------+---------------------+
6 rows in set

Flink SQL> ALTER TABLE Orders DROP (amount, ts, category);
[INFO] Execute statement succeeded.

Flink SQL> DESCRIBE Orders;
+---------+--------+-------+------------+--------+-----------+------------------+
|    name |   type |  null |        key | extras | watermark |          comment |
+---------+--------+-------+------------+--------+-----------+------------------+
|   order |    INT | FALSE | PRI(order) |        |           | order identifier |
|    user | BIGINT |  TRUE |            |        |           |                  |
| product | STRING |  TRUE |            |        |           |                  |
+---------+--------+-------+------------+--------+-----------+------------------+
3 rows in set

Flink SQL> ALTER TABLE Orders RENAME `order` to `order_id`;
[INFO] Execute statement succeeded.

Flink SQL> DESCRIBE Orders;
+----------+--------+-------+---------------+--------+-----------+------------------+
|     name |   type |  null |           key | extras | watermark |          comment |
+----------+--------+-------+---------------+--------+-----------+------------------+
| order_id |    INT | FALSE | PRI(order_id) |        |           | order identifier |
|     user | BIGINT |  TRUE |               |        |           |                  |
|  product | STRING |  TRUE |               |        |           |                  |
+----------+--------+-------+---------------+--------+-----------+------------------+
3 rows in set

Flink SQL> SHOW TABLES;
+------------+
| table name |
+------------+
|     Orders |
+------------+
1 row in set

Flink SQL> ALTER TABLE Orders RENAME TO NewOrders;
[INFO] Execute statement succeeded.

Flink SQL> SHOW TABLES;
+------------+
| table name |
+------------+
|  NewOrders |
+------------+
1 row in set

Flink SQL> CREATE CATALOG cat2 WITH ('type'='generic_in_memory');
[INFO] Execute statement succeeded.

Flink SQL> ALTER CATALOG cat2 SET ('default-database'='db_new');
[INFO] Execute statement succeeded.

Flink SQL> DESC CATALOG EXTENDED cat2;
+-------------------------+-------------------+
|               info name |        info value |
+-------------------------+-------------------+
|                    name |              cat2 |
|                    type | generic_in_memory |
|                 comment |                   |
| option:default-database |            db_new |
+-------------------------+-------------------+
4 rows in set

Back to top

ALTER TABLE #

当前支持的 ALTER TABLE 语法如下

ALTER TABLE [IF EXISTS] table_name {
    ADD { <schema_component> | (<schema_component> [, ...]) | [IF NOT EXISTS] <partition_component> [<partition_component> ...] | <distribution> }
  | MODIFY { <schema_component> | (<schema_component> [, ...]) | <distribution> }
  | DROP {column_name | (column_name, column_name, ....) | PRIMARY KEY | CONSTRAINT constraint_name | WATERMARK | [IF EXISTS] <partition_component> [, ...] | DISTRIBUTION }
  | RENAME old_column_name TO new_column_name
  | RENAME TO new_table_name
  | SET (key1=val1, ...)
  | RESET (key1, ...)
}

<schema_component>:
  { <column_component> | <constraint_component> | <watermark_component> }

<column_component>:
  column_name <column_definition> [FIRST | AFTER column_name]

<constraint_component>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

<watermark_component>:
  WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

<column_definition>:
  { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> } [COMMENT column_comment]

<physical_column_definition>:
  column_type

<metadata_column_definition>:
  column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]

<computed_column_definition>:
  AS computed_column_expression
  
<partition_component>:
  PARTITION (key1=val1, key2=val2, ...) [WITH (key1=val1, key2=val2, ...)]
  
<distribution>:
{
    DISTRIBUTION BY [ { HASH | RANGE } ] (bucket_column_name1, bucket_column_name2, ...) ] [INTO n BUCKETS]
  | DISTRIBUTION INTO n BUCKETS
} 

IF EXISTS

若表不存在,则不进行任何操作。

ADD #

使用 ADD 语句向已有表中增加 columnsconstraintswatermark, partitions, and a distribution](//nightlies.apache.org/flink/flink-docs-release-2.0-preview1/zh/docs/dev/table/sql/create/#distributed)。

向表新增列时可通过 FIRST or AFTER col_name 指定位置,不指定位置时默认追加在最后。

ADD 语句示例如下。

-- 新增一列
ALTER TABLE MyTable ADD category_id STRING COMMENT 'identifier of the category';

-- 新增列,主键和 watermark
ALTER TABLE MyTable ADD (
    log_ts STRING COMMENT 'log timestamp string' FIRST,
    ts AS TO_TIMESTAMP(log_ts) AFTER log_ts,
    PRIMARY KEY (id) NOT ENFORCED,
    WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
);

-- 新增一个分区 
ALTER TABLE MyTable ADD PARTITION (p1=1,p2='a') with ('k1'='v1');

-- 新增两个分区
ALTER TABLE MyTable ADD PARTITION (p1=1,p2='a') with ('k1'='v1') PARTITION (p1=1,p2='b') with ('k2'='v2');

-- add new distribution using a hash on uid into 4 buckets
ALTER TABLE MyTable ADD DISTRIBUTION BY HASH(uid) INTO 4 BUCKETS;

-- add new distribution on uid into 4 buckets
CREATE TABLE MyTable ADD DISTRIBUTION BY (uid) INTO 4 BUCKETS;

-- add new distribution on uid.
CREATE TABLE MyTable ADD DISTRIBUTION BY (uid);

-- add new distribution into 4 buckets
CREATE TABLE MyTable ADD DISTRIBUTION INTO 4 BUCKETS;

注意 指定列为主键列时会隐式修改该列的 nullability 为 false。

MODIFY #

使用 MODIFY 语句修改列的位置 、类型 、注释 、nullability,主键或 watermark。

可使用 FIRSTAFTER col_name 将已有列移动至指定位置,不指定时默认保持位置不变。

MODIFY 语句示例如下。

-- modify a column type, comment and position
ALTER TABLE MyTable MODIFY measurement double COMMENT 'unit is bytes per second' AFTER `id`;

-- modify definition of column log_ts and ts, primary key, watermark. They must exist in table schema
ALTER TABLE MyTable MODIFY (
    log_ts STRING COMMENT 'log timestamp string' AFTER `id`,  -- reorder columns
    ts AS TO_TIMESTAMP(log_ts) AFTER log_ts,
    PRIMARY KEY (id) NOT ENFORCED,
    WATERMARK FOR ts AS ts -- modify watermark strategy
);

注意 指定列为主键列时会隐式修改该列的 nullability 为 false。

DROP #

使用 DROP 语句删除列 、主键 、 分区或 watermark。

DROP 语句示例如下。

-- 删除一个列
ALTER TABLE MyTable DROP measurement;

-- 删除多个列
ALTER TABLE MyTable DROP (col1, col2, col3);

-- 删除主键
ALTER TABLE MyTable DROP PRIMARY KEY;

-- 删除一个分区
ALTER TABLE MyTable DROP PARTITION (`id` = 1);

-- 删除两个分区
ALTER TABLE MyTable DROP PARTITION (`id` = 1), PARTITION (`id` = 2);

-- 删除 watermark
ALTER TABLE MyTable DROP WATERMARK;

-- drop distribution
ALTER TABLE MyTable DROP DISTRIBUTION;

RENAME #

使用 RENAME 语句修改列名或表名。

RENAME 语句示例如下。

-- rename column
ALTER TABLE MyTable RENAME request_body TO payload;

-- rename table
ALTER TABLE MyTable RENAME TO MyTable2;

SET #

为指定的表设置一个或多个属性。若个别属性已经存在于表中,则使用新值覆盖旧值。

SET 语句示例如下。

-- set 'rows-per-second'
ALTER TABLE DataGenSource SET ('rows-per-second' = '10');

RESET #

为指定的表重置一个或多个属性。

RESET 语句示例如下。

-- reset 'rows-per-second' to the default value
ALTER TABLE DataGenSource RESET ('rows-per-second');

Back to top

ALTER VIEW #

ALTER VIEW [catalog_name.][db_name.]view_name RENAME TO new_view_name

Renames a given view to a new name within the same catalog and database.

ALTER VIEW [catalog_name.][db_name.]view_name AS new_query_expression

Changes the underlying query defining the given view to a new query.

Back to top

ALTER DATABASE #

ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...)

在数据库中设置一个或多个属性。若个别属性已经在数据库中设定,将会使用新值覆盖旧值。

Back to top

ALTER FUNCTION #

ALTER [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
  [IF EXISTS] [catalog_name.][db_name.]function_name
  AS identifier [LANGUAGE JAVA|SCALA|PYTHON]

修改一个有 catalog 和数据库命名空间的 catalog function ,需要指定一个新的 identifier ,可指定 language tag 。若函数不存在,删除会抛出异常。

如果 language tag 是 JAVA 或者 SCALA ,则 identifier 是 UDF 实现类的全限定名。关于 JAVA/SCALA UDF 的实现,请参考 自定义函数

如果 language tag 是 PYTHON , 则 identifier 是 UDF 对象的全限定名,例如 pyflink.table.tests.test_udf.add。关于 PYTHON UDF 的实现,请参考 Python UDFs

TEMPORARY

修改一个有 catalog 和数据库命名空间的临时 catalog function ,并覆盖原有的 catalog function 。

TEMPORARY SYSTEM

修改一个没有数据库命名空间的临时系统 catalog function ,并覆盖系统内置的函数。

IF EXISTS

若函数不存在,则不进行任何操作。

LANGUAGE JAVA|SCALA|PYTHON

Language tag 用于指定 Flink runtime 如何执行这个函数。目前,只支持 JAVA,SCALA 和 PYTHON,且函数的默认语言为 JAVA。

Back to top

ALTER CATALOG #

ALTER CATALOG catalog_name 
    SET (key1=val1, ...)
  | RESET (key1, ...)
  | COMMENT 'comment'

SET #

为指定的 catalog 设置一个或多个属性。若个别属性已经存在,则使用新值覆盖旧值。

SET 语句示例如下。

-- set 'default-database'
ALTER CATALOG cat2 SET ('default-database'='db');

RESET #

为指定的 catalog 重置一个或多个属性。

RESET 语句示例如下。

-- reset 'default-database'
ALTER CATALOG cat2 RESET ('default-database');

COMMENT #

为指定的 catalog 设置注释。若注释已经存在,则使用新值覆盖旧值。

COMMENT 语句示例如下。

ALTER CATALOG cat2 COMMENT 'comment for catalog ''cat2''';

Back to top