如何迁移 DataSet 到 DataStream
#
DataSet API 已被正式弃用,并且将不再获得主动的维护和支持,它将在 Flink 2.0 版本被删除。
建议 Flink 用户从 DataSet API 迁移到 DataStream API、Table API 和 SQL 来满足数据处理需求。
请注意,DataStream 中的 API 并不总是与 DataSet 完全匹配。
本文档的目的是帮助用户理解如何使用 DataStream API 实现与使用 DataSet API 相同的数据处理行为。
根据迁移过程中开发和执行效率的变化程度,我们将 DataSet API 分为四类:
-
第一类:在 DataStream 中具有完全相同的 API,几乎不需要任何更改即可迁移;
-
第二类:其行为可以通过 DataStream 中具有不同语义的其他 API 来实现,这可能需要更改一些代码,但仍保持相同的执行效率;
-
第三类:其行为可以通过 DataStream 中具有不同语义的其他 API 来实现,但可能会增加额外的执行效率成本;
-
第四类:其行为不被 DataStream API 支持。
后续章节将首先介绍如何设置执行环境和 source/sink ,然后详细解释每种类别的 DataSet API 如何迁移到 DataStream API,强调与每个类别迁移过程中相关的考虑因素和面临的挑战。
设置执行环境
#
将应用程序从 DataSet API 迁移到 DataStream API 的第一步是将 ExecutionEnvironment
替换为 StreamExecutionEnvironment
。
DataSet |
DataStream |
// 创建执行环境
ExecutionEnvironment.getExecutionEnvironment();
// 创建本地执行环境
ExecutionEnvironment.createLocalEnvironment();
// 创建 collection 环境
new CollectionEnvironment();
// 创建远程执行环境
ExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles);
|
// 创建执行环境
StreamExecutionEnvironment.getExecutionEnvironment();
// 创建本地执行环境
StreamExecutionEnvironment.createLocalEnvironment();
// 不支持 collection 环境
// 创建远程执行环境
StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles);
|
与 DataSet 不同,DataStream 支持对有界和无界数据流进行处理。
如果需要的话,用户可以显式地将执行模式设置为 RuntimeExecutionMode.BATCH
。
StreamExecutionEnvironment executionEnvironment = // [...];
executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
设置 streaming 类型的 Source 和 Sink
#
Sources
#
DataStream API 使用 DataStreamSource
从外部系统读取记录,而 DataSet API 使用 DataSource
。
DataSet |
DataStream |
// Read data from file
DataSource<> source = ExecutionEnvironment.readFile(inputFormat, filePath);
// Read data from collection
DataSource<> source = ExecutionEnvironment.fromCollection(data);
// Read data from inputformat
DataSource<> source = ExecutionEnvironment.createInput(inputFormat)
|
// Read data from file
DataStreamSource<> source = StreamExecutionEnvironment.readFile(inputFormat, filePath);
// Read data from collection
DataStreamSource<> source = StreamExecutionEnvironment.fromCollection(data);
// Read data from inputformat
DataStreamSource<> source = StreamExecutionEnvironment.createInput(inputFormat)
|
Sinks
#
DataStream API 使用 DataStreamSink
将记录写入外部系统,而 DataSet API 使用 DataSink
。
DataSet |
DataStream |
// Write to outputformat
DataSink<> sink = dataSet.output(outputFormat);
// Write to csv file
DataSink<> sink = dataSet.writeAsCsv(filePath);
// Write to text file
DataSink<> sink = dataSet.writeAsText(filePath);
|
// Write to sink
DataStreamSink<> sink = dataStream.sinkTo(sink)
// Write to csv file
DataStreamSink<> sink = dataStream.writeAsCsv(path);
// Write to text file
DataStreamSink<> sink = dataStream.writeAsText(path);
|
如果您正在寻找 DataStream 预定义的连接器,请查看连接器。
迁移 DataSet APIs
#
第一类
#
对于第一类,这些 DataSet API 在 DataStream 中具有完全相同的功能,几乎不需要任何更改即可迁移。
Operations |
DataSet |
DataStream |
Map |
dataSet.map(new MapFunction<>(){
// implement user-defined map logic
});
|
dataStream.map(new MapFunction<>(){
// implement user-defined map logic
});
|
FlatMap |
dataSet.flatMap(new FlatMapFunction<>(){
// implement user-defined flatmap logic
});
|
dataStream.flatMap(new FlatMapFunction<>(){
// implement user-defined flatmap logic
});
|
Filter |
dataSet.filter(new FilterFunction<>(){
// implement user-defined filter logic
});
|
dataStream.filter(new FilterFunction<>(){
// implement user-defined filter logic
});
|
Union |
dataSet1.union(dataSet2);
|
dataStream1.union(dataStream2);
|
Rebalance |
|
|
Project |
DataSet<Tuple3<>> dataSet = // [...]
dataSet.project(2,0);
|
DataStream<Tuple3<>> dataStream = // [...]
dataStream.project(2,0);
|
Reduce on Grouped DataSet |
DataSet<Tuple2<>> dataSet = // [...]
dataSet.groupBy(value -> value.f0)
.reduce(new ReduceFunction<>(){
// implement user-defined reduce logic
});
|
DataStream<Tuple2<>> dataStream = // [...]
dataStream.keyBy(value -> value.f0)
.reduce(new ReduceFunction<>(){
// implement user-defined reduce logic
});
|
Aggregate on Grouped DataSet |
DataSet<Tuple2<>> dataSet = // [...]
// compute sum of the second field
dataSet.groupBy(value -> value.f0)
.aggregate(SUM, 1);
// compute min of the second field
dataSet.groupBy(value -> value.f0)
.aggregate(MIN, 1);
// compute max of the second field
dataSet.groupBy(value -> value.f0)
.aggregate(MAX, 1);
|
DataStream<Tuple2<>> dataStream = // [...]
// compute sum of the second field
dataStream.keyBy(value -> value.f0)
.sum(1);
// compute min of the second field
dataStream.keyBy(value -> value.f0)
.min(1);
// compute max of the second field
dataStream.keyBy(value -> value.f0)
.max(1);
|
第二类
#
对于第二类,这些 DataSet API 的行为可以通过 DataStream 中具有不同语义的其他 API 来实现,这可能需要更改一些代码来进行迁移,但仍保持相同的执行效率。
DataSet 中存在对整个 DataSet 进行操作的 API。这些 API 在 DataStream 中可以用一个全局窗口来实现,该全局窗口只会在输入数据结束时触发窗口内数据的计算。
附录中的 EndOfStreamWindows
显示了如何实现这样的窗口,我们将在本文档的其余部分重复使用它。
Operations |
DataSet |
DataStream |
Distinct |
DataSet<Integer> dataSet = // [...]
dataSet.distinct();
|
DataStream<Integer> dataStream = // [...]
dataStream.keyBy(value -> value)
.reduce((value1, value2) -> value1);
|
Hash-Partition |
DataSet<Tuple2<>> dataSet = // [...]
dataSet.partitionByHash(value -> value.f0);
|
DataStream<Tuple2<>> dataStream = // [...]
// partition by the hashcode of key
dataStream.partitionCustom(
(key, numSubpartition) -> key.hashCode() % numSubpartition,
value -> value.f0);
|
Reduce on Full DataSet |
DataSet<String> dataSet = // [...]
dataSet.reduce(new ReduceFunction<>(){
// implement user-defined reduce logic
});
|
DataStream<String> dataStream = // [...]
dataStream.windowAll(EndOfStreamWindows.get())
.reduce(new ReduceFunction<>(){
// implement user-defined reduce logic
});
|
Aggregate on Full DataSet |
DataSet<Tuple2<>> dataSet = // [...]
// compute sum of the second field
dataSet.aggregate(SUM, 1);
// compute min of the second field
dataSet.aggregate(MIN, 1);
// compute max of the second field
dataSet.aggregate(MAX, 1);
|
DataStream<Tuple2<>> dataStream = // [...]
// compute sum of the second field
dataStream.windowAll(EndOfStreamWindows.get())
.sum(1);
// compute min of the second field
dataStream.windowAll(EndOfStreamWindows.get())
.min(1);
// compute max of the second field
dataStream.windowAll(EndOfStreamWindows.get())
.max(1);
|
GroupReduce on Full DataSet |
DataSet<Integer> dataSet = // [...]
dataSet.reduceGroup(new GroupReduceFunction<>(){
// implement user-defined group reduce logic
});
|
DataStream<Integer> dataStream = // [...]
dataStream.windowAll(EndOfStreamWindows.get())
.apply(new WindowFunction<>(){
// implement user-defined group reduce logic
});
|
GroupReduce on Grouped DataSet |
DataSet<Tuple2<>> dataSet = // [...]
dataSet.groupBy(value -> value.f0)
.reduceGroup(new GroupReduceFunction<>(){
// implement user-defined group reduce logic
});
|
DataStream<Tuple2<>> dataStream = // [...]
dataStream.keyBy(value -> value.f0)
.window(EndOfStreamWindows.get())
.apply(new WindowFunction<>(){
// implement user-defined group reduce logic
});
|
First-n |
|
dataStream.windowAll(EndOfStreamWindows.get())
.apply(new AllWindowFunction<>(){
// implement first-n logic
});
|
Join |
DataSet<Tuple2<>> dataSet1 = // [...]
DataSet<Tuple2<>> dataSet2 = // [...]
dataSet1.join(dataSet2)
.where(value -> value.f0)
.equalTo(value -> value.f0)
.with(new JoinFunction<>(){
// implement user-defined join logic
});
|
DataStream<Tuple2<>> dataStream1 = // [...]
DataStream<Tuple2<>> dataStream2 = // [...]
dataStream1.join(dataStream2)
.where(value -> value.f0)
.equalTo(value -> value.f0)
.window(EndOfStreamWindows.get()))
.apply(new JoinFunction<>(){
// implement user-defined join logic
});
|
CoGroup |
DataSet<Tuple2<>> dataSet1 = // [...]
DataSet<Tuple2<>> dataSet2 = // [...]
dataSet1.coGroup(dataSet2)
.where(value -> value.f0)
.equalTo(value -> value.f0)
.with(new CoGroupFunction<>(){
// implement user-defined co group logic
});
|
DataStream<Tuple2<>> dataStream1 = // [...]
DataStream<Tuple2<>> dataStream2 = // [...]
dataStream1.coGroup(dataStream2)
.where(value -> value.f0)
.equalTo(value -> value.f0)
.window(EndOfStreamWindows.get()))
.apply(new CoGroupFunction<>(){
// implement user-defined co group logic
});
|
OuterJoin |
DataSet<Tuple2<>> dataSet1 = // [...]
DataSet<Tuple2<>> dataSet2 = // [...]
// left outer join
dataSet1.leftOuterJoin(dataSet2)
.where(dataSet1.f0)
.equalTo(dataSet2.f0)
.with(new JoinFunction<>(){
// implement user-defined left outer join logic
});
// right outer join
dataSet1.rightOuterJoin(dataSet2)
.where(dataSet1.f0)
.equalTo(dataSet2.f0)
.with(new JoinFunction<>(){
// implement user-defined right outer join logic
});
|
DataStream<Tuple2<>> dataStream1 = // [...]
DataStream<Tuple2<>> dataStream2 = // [...]
// left outer join
dataStream1.coGroup(dataStream2)
.where(value -> value.f0)
.equalTo(value -> value.f0)
.window(EndOfStreamWindows.get())
.apply((leftIterable, rightInterable, collector) -> {
if(!rightInterable.iterator().hasNext()){
// implement user-defined left outer join logic
}
});
// right outer join
dataStream1.coGroup(dataStream2)
.where(value -> value.f0)
.equalTo(value -> value.f0)
.window(EndOfStreamWindows.get())
.apply((leftIterable, rightInterable, collector) -> {
if(!leftIterable.iterator().hasNext()){
// implement user-defined right outer join logic
}
});
|
第三类
#
对于第三类,这些 DataSet API 的行为可以通过 DataStream 中具有不同语义的其他 API 来实现,但可能会增加额外的执行效率成本。
目前,DataStream API 不直接支持 non-keyed 流上的聚合(对 subtask 内的数据进行聚合)。为此,我们需要首先将 subtask ID 分配给记录,然后将流转换为 keyed 流。
附录中的 AddSubtaskIdMapFunction
显示了如何执行此操作,我们将在本文档的其余部分中重复使用它。
Operations |
DataSet |
DataStream |
MapPartition/SortPartition |
DataSet<Integer> dataSet = // [...]
// MapPartition
dataSet.mapPartition(new MapPartitionFunction<>(){
// implement user-defined map partition logic
});
// SortPartition
dataSet.sortPartition(0, Order.ASCENDING);
dataSet.sortPartition(0, Order.DESCENDING);
|
DataStream<Integer> dataStream = // [...]
// assign subtask ID to all records
DataStream<Tuple2<String, Integer>> dataStream1 = dataStream.map(new AddSubtaskIDMapFunction());
dataStream1.keyBy(value -> value.f0)
.window(EndOfStreamWindows.get())
.apply(new WindowFunction<>(){
// implement user-defined map partition or sort partition logic
});
|
Cross |
DataSet<Integer> dataSet1 = // [...]
DataSet<Integer> dataSet2 = // [...]
// Cross
dataSet1.cross(dataSet2)
.with(new CrossFunction<>(){
// implement user-defined cross logic
})
|
// the parallelism of dataStream1 and dataStream2 should be same
DataStream<Integer> dataStream1 = // [...]
DataStream<Integer> dataStream2 = // [...]
DataStream<Tuple2<String, Integer>> datastream3 = dataStream1.broadcast().map(new AddSubtaskIDMapFunction());
DataStream<Tuple2<String, Integer>> datastream4 = dataStream2.map(new AddSubtaskIDMapFunction());
// join the two streams according to the subtask ID
dataStream3.join(dataStream4)
.where(value -> value.f0)
.equalTo(value -> value.f0)
.window(EndOfStreamWindows.get())
.apply(new JoinFunction<>(){
// implement user-defined cross logic
})
|
第四类
#
以下 DataSet API 的行为不被 DataStream 支持。
- RangePartition
- GroupCombine
附录
#
EndOfStreamWindows
#
以下代码展示了 EndOfStreamWindows
示例实现。
public class EndOfStreamWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private static final EndOfStreamWindows INSTANCE = new EndOfStreamWindows();
private static final TimeWindow TIME_WINDOW_INSTANCE =
new TimeWindow(Long.MIN_VALUE, Long.MAX_VALUE);
private EndOfStreamWindows() {}
public static EndOfStreamWindows get() {
return INSTANCE;
}
@Override
public Collection<TimeWindow> assignWindows(
Object element, long timestamp, WindowAssignerContext context) {
return Collections.singletonList(TIME_WINDOW_INSTANCE);
}
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return new EndOfStreamTrigger();
}
@Override
public String toString() {
return "EndOfStreamWindows()";
}
@Override
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
@Override
public boolean isEventTime() {
return true;
}
@Internal
public static class EndOfStreamTrigger extends Trigger<Object, TimeWindow> {
@Override
public TriggerResult onElement(
Object element, long timestamp, TimeWindow window, TriggerContext ctx)
throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
}
}
AddSubtaskIDMapFunction
#
以下代码展示了 AddSubtaskIDMapFunction
示例实现。
public static class AddSubtaskIDMapFunction<T> extends RichMapFunction<T, Tuple2<String, T>> {
@Override
public Tuple2<String, T> map(T value) {
return Tuple2.of(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()), value);
}
}
Back to top