去重 #
Batch Streaming
去重是去掉重复的行,只保留第一或者最后一行。有时,上游的 ETL 生成的数据不是端到端精确一次的。在发生故障恢复后,可能会导致结果下游中出现重复记录。这些重复记录会影响下游的分析工作,比如:SUM
、COUNT
的结果会偏大,所以需要先去重再进行下一步的分析。
Flink 使用 ROW_NUMBER()
去除重复数据,就像 Top-N 查询一样。其实,去重就是 Top-N 在 N 为 1 时的特例,并且去重必须要求按照处理或者事件时间排序。
下面的例子展示了去重语句的语法:
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY time_attr [asc|desc]) AS rownum
FROM table_name)
WHERE rownum = 1
参数说明:
ROW_NUMBER()
:为每一行分配一个唯一且连续的数字,从 1 开始。PARTITION BY col1[, col2...]
:指定分区键,即需要去重的键。ORDER BY time_attr [asc|desc]
:指定排序列,必须是 时间属性。目前 Flink 支持 处理时间属性 和 事件时间属性。Order by ASC 保留第一行,DESC 保留最后一行。WHERE rownum = 1
:Flink 需要这个条件来识别去重语句。
注意:上述格式必须严格遵守,否则优化器无法识别。
下面的例子展示了在流表上如何指定去重 SQL 查询:
CREATE TABLE Orders (
order_id STRING,
user STRING,
product STRING,
num BIGINT,
proctime AS PROCTIME()
) WITH (...);
-- remove duplicate rows on order_id and keep the first occurrence row,
-- because there shouldn't be two orders with the same order_id.
SELECT order_id, user, product, num
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime ASC) AS row_num
FROM Orders)
WHERE row_num = 1