Over聚合

Over聚合 #

Batch Streaming

OVER 聚合通过排序后的范围数据为每行输入计算出聚合值。和 GROUP BY 聚合不同, OVER 聚合不会把结果通过分组减少到一行,它会为每行输入增加一个聚合值。

下面这个查询为每个订单计算前一个小时之内接收到的同一产品所有订单的总金额。

SELECT order_id, order_time, amount,
  SUM(amount) OVER (
    PARTITION BY product
    ORDER BY order_time
    RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
  ) AS one_hour_prod_amount_sum
FROM Orders

下面总结了 OVER 窗口的语法。

SELECT
  agg_func(agg_col) OVER (
    [PARTITION BY col1[, col2, ...]]
    ORDER BY time_col
    range_definition),
  ...
FROM ...

你可以在一个 SELECT 子句中定义多个 OVER 窗口聚合。然而,对于流式查询,由于目前的限制,所有聚合的 OVER 窗口必须是相同的。

ORDER BY #

OVER 窗口需要数据是有序的。因为表没有固定的排序,所以 ORDER BY 子句是强制的。对于流式查询,Flink 目前只支持 OVER 窗口定义在升序(asc)的 时间属性 上。其他的排序不支持。

PARTITION BY #

OVER 窗口可以定义在一个分区表上。PARTITION BY 子句代表着每行数据只在其所属的数据分区进行聚合。

范围(RANGE)定义 #

范围(RANGE)定义指定了聚合中包含了多少行数据。范围通过 BETWEEN 子句定义上下边界,其内的所有行都会聚合。Flink 只支持 CURRENT ROW 作为上边界。

有两种方法可以定义范围:ROWS 间隔 和 RANGE 间隔

RANGE 间隔 #

RANGE 间隔是定义在排序列值上的,在 Flink 里,排序列总是一个时间属性。下面的 RANG 间隔定义了聚合会在比当前行的时间属性小 30 分钟的所有行上进行。

RANGE BETWEEN INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW

ROW 间隔 #

ROWS 间隔基于计数。它定义了聚合操作包含的精确行数。下面的 ROWS 间隔定义了当前行 + 之前的 10 行(也就是11行)都会被聚合。

ROWS BETWEEN 10 PRECEDING AND CURRENT ROW
WINDOW

WINDOW 子句可用于在 SELECT 子句之外定义 OVER 窗口。它让查询可读性更好,也允许多个聚合共用一个窗口定义。

SELECT order_id, order_time, amount,
  SUM(amount) OVER w AS sum_amount,
  AVG(amount) OVER w AS avg_amount
FROM Orders
WINDOW w AS (
  PARTITION BY product
  ORDER BY order_time
  RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)

Back to top