- 应用开发
- DataStream API
- 状态与容错
- Broadcast State 模式
Broadcast State 模式
本文档是 Apache Flink 的旧版本。建议访问 最新的稳定版本。
你将在本节中了解到如何实际使用 broadcast state。想了解更多有状态流处理的概念,请参考
Stateful Stream Processing。
提供的 API
在这里我们使用一个例子来展现 broadcast state 提供的接口。假设存在一个序列,序列中的元素是具有不同颜色与形状的图形,我们希望在序列里相同颜色的图形中寻找满足一定顺序模式的图形对(比如在红色的图形里,有一个长方形跟着一个三角形)。
同时,我们希望寻找的模式也会随着时间而改变。
在这个例子中,我们定义两个流,一个流包含图形(Item)
,具有颜色
和形状
两个属性。另一个流包含特定的规则(Rule)
,代表希望寻找的模式。
在图形
流中,我们需要首先使用颜色
将流进行进行分区(keyBy),这能确保相同颜色的图形会流转到相同的物理机上。
对于规则
流,它应该被广播到所有的下游 task 中,下游 task 应当存储这些规则并根据它寻找满足规则的图形对。下面这段代码会完成:
i) 将规则
广播给所有下游 task;
ii) 使用 MapStateDescriptor
来描述并创建 broadcast state 在下游的存储结构
最终,为了使用规则
来筛选图形
序列,我们需要:
- 将两个流关联起来
- 完成我们的模式识别逻辑
为了关联一个非广播流(keyed 或者 non-keyed)与一个广播流(BroadcastStream
),我们可以调用非广播流的方法 connect()
,并将 BroadcastStream
当做参数传入。
这个方法的返回参数是 BroadcastConnectedStream
,具有类型方法 process()
,传入一个特殊的 CoProcessFunction
来书写我们的模式识别逻辑。
具体传入 process()
的是哪个类型取决于非广播流的类型:
- 如果流是一个 keyed 流,那就是
KeyedBroadcastProcessFunction
类型;
- 如果流是一个 non-keyed 流,那就是
BroadcastProcessFunction
类型。
在我们的例子中,图形
流是一个 keyed stream,所以我们书写的代码如下:
注意:`connect()` 方法需要由非广播流来进行调用,`BroadcastStream` 作为参数传入。
BroadcastProcessFunction 和 KeyedBroadcastProcessFunction
在传入的 BroadcastProcessFunction
或 KeyedBroadcastProcessFunction
中,我们需要实现两个方法。processBroadcastElement()
方法负责处理广播流中的元素,processElement()
负责处理非广播流中的元素。
两个子类型定义如下:
需要注意的是 processBroadcastElement()
负责处理广播流的元素,而 processElement()
负责处理另一个流的元素。两个方法的第二个参数(Context)不同,均有以下方法:
- 得到广播流的存储状态:
ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)
- 查询元素的时间戳:
ctx.timestamp()
- 查询目前的Watermark:
ctx.currentWatermark()
- 目前的处理时间(processing time):
ctx.currentProcessingTime()
- 产生旁路输出:
ctx.output(OutputTag<X> outputTag, X value)
在 getBroadcastState()
方法中传入的 stateDescriptor
应该与调用 .broadcast(ruleStateDescriptor)
的参数相同。
这两个方法的区别在于对 broadcast state 的访问权限不同。在处理广播流元素这端,是具有读写权限的,而对于处理非广播流元素这端是只读的。
这样做的原因是,Flink 中是不存在跨 task 通讯的。所以为了保证 broadcast state 在所有的并发实例中是一致的,我们在处理广播流元素的时候给予写权限,在所有的 task 中均可以看到这些元素,并且要求对这些元素处理是一致的,
那么最终所有 task 得到的 broadcast state 是一致的。
注意:`processBroadcastElement()` 的实现必须在所有的并发实例中具有确定性的结果。
同时,KeyedBroadcastProcessFunction
在 Keyed Stream 上工作,所以它提供了一些 BroadcastProcessFunction
没有的功能:
processElement()
的参数 ReadOnlyContext
提供了方法能够访问 Flink 的定时器服务,可以注册事件定时器(event-time timer)或者处理时间的定时器(processing-time timer)。当定时器触发时,会调用 onTimer()
方法,
提供了 OnTimerContext
,它具有 ReadOnlyContext
的全部功能,并且提供:
- 查询当前触发的是一个事件还是处理时间的定时器
- 查询定时器关联的key
processBroadcastElement()
方法中的参数 Context
会提供方法 applyToKeyedState(StateDescriptor<S, VS> stateDescriptor, KeyedStateFunction<KS, S> function)
。
这个方法使用一个 KeyedStateFunction
能够对 stateDescriptor
对应的 state 中所有 key 的存储状态进行某些操作。
注意:注册一个定时器只能在 `KeyedBroadcastProcessFunction` 的 `processElement()` 方法中进行。
在 `processBroadcastElement()` 方法中不能注册定时器,因为广播的元素中并没有关联的 key。
回到我们当前的例子中,KeyedBroadcastProcessFunction
应该实现如下:
重要注意事项
这里有一些 broadcast state 的重要注意事项,在使用它时需要时刻清楚:
-
没有跨 task 通讯:如上所述,这就是为什么只有在 (Keyed)-BroadcastProcessFunction
中处理广播流元素的方法里可以更改 broadcast state 的内容。
同时,用户需要保证所有 task 对于 broadcast state 的处理方式是一致的,否则会造成不同 task 读取 broadcast state 时内容不一致的情况,最终导致结果不一致。
-
broadcast state 在不同的 task 的事件顺序可能是不同的:虽然广播流中元素的过程能够保证所有的下游 task 全部能够收到,但在不同 task 中元素的到达顺序可能不同。
所以 broadcast state 的更新不能依赖于流中元素到达的顺序。
-
所有的 task 均会对 broadcast state 进行 checkpoint:虽然所有 task 中的 broadcast state 是一致的,但当 checkpoint 来临时所有 task 均会对 broadcast state 做 checkpoint。
这个设计是为了防止在作业恢复后读文件造成的文件热点。当然这种方式会造成 checkpoint 一定程度的写放大,放大倍数为 p(=并行度)。Flink 会保证在恢复状态/改变并发的时候数据没有重复且没有缺失。
在作业恢复时,如果与之前具有相同或更小的并发度,所有的 task 读取之前已经 checkpoint 过的 state。在增大并发的情况下,task 会读取本身的 state,多出来的并发(p_new
- p_old
)会使用轮询调度算法读取之前 task 的 state。
-
不使用 RocksDB state backend: broadcast state 在运行时保存在内存中,需要保证内存充足。这一特性同样适用于所有其他 Operator State。
Back to top