语法说明

物化表语法 #

Flink SQL 目前支持以下物化表操作:

CREATE MATERIALIZED TABLE #

CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
 
[ ([ <table_constraint> ]) ]
 
[COMMENT table_comment]
 
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
 
[WITH (key1=val1, key2=val2, ...)]
 
FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }
 
[REFRESH_MODE = { CONTINUOUS | FULL }]
 
AS <select_statement>
 
<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

PRIMARY KEY #

PRIMARY KEY 定义了一组可选的列,用于唯一标识表中的每一行。主键列必须非空。

PARTITIONED BY #

PARTITIONED BY 定义了一组可选的列,用于对物化表进行分区。如果物化表使用了 filesystem connector ,每个分区将创建一个目录。

示例:

-- 创建一个物化表并将分区字段设置为 `ds`。
CREATE MATERIALIZED TABLE my_materialized_table
    PARTITIONED BY (ds)
    FRESHNESS = INTERVAL '1' HOUR
    AS SELECT 
        ds
    FROM
        ...

注意

  • 分区字段必须是物化表查询语句中的字段。

WITH Options #

WITH Options 可以定义创建物化表所需的属性,包括连接器参数和分区字段的时间格式参数

-- 创建一个物化表,指定分区字段为 'ds' 和对应的时间格式为 'yyyy-MM-dd'
CREATE MATERIALIZED TABLE my_materialized_table
    PARTITIONED BY (ds)
    WITH (
        'format' = 'json',
        'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
    )
    ...

如上例所示,我们为 ds 分区列指定了 date-formatter 选项。每次调度时,调度时间将转换为相应的 ds 分区值。例如,在 2024-01-01 00:00:00 的调度时间下,只有分区 ds = '2024-01-01' 会被刷新。

注意

FRESHNESS #

FRESHNESS 用于指定物化表的数据新鲜度。

数据新鲜度与刷新模式关系

数据新鲜度定义了物化表内容滞后于基础表更新的最长时间。它有两个作用,首先通过配置确定物化表的刷新模式,然后确定数据刷新频率以满足实际数据新鲜度要求。

FRESHNESS 参数详解

FRESHNESS 参数的取值范围为 INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }。'<num>' 必须为正整数,并且在全量模式下,'<num>' 应该是相应时间间隔单位的公约数。

示例: (假定 materialized-table.refresh-mode.freshness-threshold 为 30 分钟)

-- 对应的刷新管道是一个 checkpoint 间隔为 1 秒的流处理作业
FRESHNESS = INTERVAL '1' SECOND

-- 对应的刷新管道是一个 checkpoint 间隔为 1 分钟的流处理作业
FRESHNESS = INTERVAL '1' MINUTE

-- 对应的刷新管道是一个调度周期为 1 小时的调度工作流
FRESHNESS = INTERVAL '1' HOUR

-- 对应的刷新管道是一个调度周期为 1 天的调度工作流
FRESHNESS = INTERVAL '1' DAY

不合法的 FRESHNESS 示例:

-- 间隔为负数
FRESHNESS = INTERVAL '-1' SECOND

-- 间隔为0
FRESHNESS = INTERVAL '0' SECOND

-- 间隔为月或者年
FRESHNESS = INTERVAL '1' MONTH
FRESHNESS = INTERVAL '1' YEAR

-- 全量模式下,间隔不是对应时间间隔单位的公约数
FRESHNESS = INTERVAL '60' SECOND
FRESHNESS = INTERVAL '5' HOUR

注意

  • 尽管物化表数据将尽可能在定义的新鲜度内刷新,但不能保证完全满足新鲜度要求。
  • 在连续模式下,数据新鲜度和 checkpoint 间隔一致,设置过短的数据新鲜度可能会对作业性能产生影响。此外,为了优化 checkpoint 性能,建议开启 Changelog
  • 在全量模式下,数据新鲜度会转换为 cron 表达式,因此目前仅支持在预定义时间间隔单位内的新鲜度间隔,这种设计确保了与 cron 表达式语义的一致性。具体支持以下新鲜度间隔:
    • 秒:30、15、10、5、2 和 1 秒间隔。
    • 分钟:30、15、10、5、2 和 1 分钟间隔。
    • 小时:8、4、2 和 1 小时间隔。
    • 天:1 天。

REFRESH_MODE #

REFRESH_MODE 用于显式指定物化表的刷新模式。指定的刷新模式比框架自动推导的模式具有更高的优先级,以满足特定场景的需求。

示例: (假定 materialized-table.refresh-mode.freshness-threshold 为 30 分钟)

-- 创建的物化表的刷新模式为连续模式,作业的 checkpoint 间隔为 1 小时。
CREATE MATERIALIZED TABLE my_materialized_table
    REFRESH_MODE = CONTINUOUS
    FRESHNESS = INTERVAL '1' HOUR
    AS SELECT
       ...    

-- 创建的物化表的刷新模式为全量模式,作业的调度周期为 10 分钟。
CREATE MATERIALIZED TABLE my_materialized_table
    REFRESH_MODE = FULL
    FRESHNESS = INTERVAL '10' MINUTE
    AS SELECT
       ...    

AS <select_statement> #

该子句用于定义填充物化表数据的查询。上游表可以是物化表、表或视图。select 语句支持所有 Flink SQL 查询

示例:

CREATE MATERIALIZED TABLE my_materialized_table
    FRESHNESS = INTERVAL '10' SECOND
    AS SELECT * FROM kafka_catalog.db1.kafka_table;

示例 #

(假设 materialized-table.refresh-mode.freshness-threshold 为 30 分钟)

创建一个数据新鲜度为 10 秒的物化表,推导出的刷新模式为连续模式:

CREATE MATERIALIZED TABLE my_materialized_table_continuous
    PARTITIONED BY (ds)
    WITH (
        'format' = 'json',
        'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
    )
    FRESHNESS = INTERVAL '10' SECOND
    REFRESH_MODE = 'CONTINUOUS'
    AS 
    SELECT 
        k.ds,
        k.user_id,
        COUNT(*) AS event_count,
        SUM(k.amount) AS total_amount,
        MAX(u.age) AS max_age
    FROM 
        kafka_catalog.db1.kafka_table k
    JOIN 
        user_catalog.db1.user_table u
    ON 
        k.user_id = u.user_id
    WHERE 
        k.event_type = 'purchase'
    GROUP BY 
        k.ds, k.user_id

创建一个数据新鲜度为 1 小时的物化表,推导出的刷新模式为全量模式:

CREATE MATERIALIZED TABLE my_materialized_table_full
    PARTITIONED BY (ds)
    WITH (
        'format' = 'json',
        'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
    )
    FRESHNESS = INTERVAL '10' MINUTE
    REFRESH_MODE = 'FULL'
    AS 
    SELECT 
        p.ds,
        p.product_id,
        p.product_name,
        AVG(s.sale_price) AS avg_sale_price,
        SUM(s.quantity) AS total_quantity
    FROM 
        paimon_catalog.db1.product_table p
    LEFT JOIN 
        paimon_catalog.db1.sales_table s
    ON 
        p.product_id = s.product_id
    WHERE 
        p.category = 'electronics'
    GROUP BY 
        p.ds, p.product_id, p.product_name

限制 #

  • 不支持显式指定列
  • 不支持修改查询语句
  • 不支持在 select 查询中使用临时表、临时视图或临时函数

ALTER MATERIALIZED TABLE #

ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name SUSPEND | RESUME [WITH (key1=val1, key2=val2, ...)] | REFRESH [PARTITION partition_spec]

ALTER MATERIALIZED TABLE 用于管理物化表。用户可以使用此命令暂停和恢复物化表的刷新管道,并手动触发数据刷新。

SUSPEND #

ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name SUSPEND

SUSPEND 用于暂停物化表的后台刷新管道。

示例:

-- 暂停前指定 SAVEPOINT 路径
SET 'execution.checkpointing.savepoint-dir' = 'hdfs://savepoint_path';

-- 暂停指定的物化表
ALTER MATERIALIZED TABLE my_materialized_table SUSPEND;

注意

  • 暂停连续模式的表时,默认会使用 STOP WITH SAVEPOINT 暂停作业,你需要使用参数设置 SAVEPOINT 保存路径。

RESUME #

ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name RESUME [WITH (key1=val1, key2=val2, ...)]

RESUME 用于恢复物化表的刷新管道。在恢复时,可以通过 WITH 子句动态指定物化表的参数,该参数仅对当前恢复的刷新管道生效,并不会持久化到物化表中。

示例:

-- 恢复指定的物化表
ALTER MATERIALIZED TABLE my_materialized_table RESUME;

-- 恢复指定的物化表并指定 sink 的并行度
ALTER MATERIALIZED TABLE my_materialized_table RESUME WITH ('sink.parallelism'='10');

REFRESH #

ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name REFRESH [PARTITION partition_spec]

REFRESH 用于主动触发物化表的刷新。

示例:

-- 刷新全表数据
ALTER MATERIALIZED TABLE my_materialized_table REFRESH;

-- 刷新指定分区数据
ALTER MATERIALIZED TABLE my_materialized_table REFRESH PARTITION (ds='2024-06-28');

注意

  • REFRESH 操作会启动批作业来刷新表的数据。

DROP MATERIALIZED TABLE #

DROP MATERIALIZED TABLE [IF EXISTS] [catalog_name.][database_name.]table_name

删除物化表时,首先删除后台刷新管道,然后从 Catalog 中移除该物化表的元数据。

示例:

-- 删除指定的物化表
DROP MATERIALIZED TABLE IF EXISTS my_materialized_table;