Starting with Flink 1.12 the DataSet API has been soft deprecated.
We recommend that you use the Table API and SQL to run efficient batch pipelines in a fully unified API. Table API is well integrated with common batch connectors and catalogs.
Alternatively, you can also use the DataStream API with
BATCH
execution mode. The linked section also outlines cases where it makes sense to use the DataSet API but those cases will become rarer as development progresses and the DataSet API will eventually be removed. Please also see FLIP-131 for background information on this decision.
Zipping Elements in a DataSet #
In certain algorithms, one may need to assign unique identifiers to data set elements. This document shows how DataSetUtils can be used for that purpose.
Zip with a Dense Index #
zipWithIndex
assigns consecutive labels to the elements, receiving a data set as input and returning a new data set of (unique id, initial value)
2-tuples.
This process requires two passes, first counting then labeling elements, and cannot be pipelined due to the synchronization of counts.
The alternative zipWithUniqueId
works in a pipelined fashion and is preferred when a unique labeling is sufficient.
For example, the following code:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataSet<String> in = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H");
DataSet<Tuple2<Long, String>> result = DataSetUtils.zipWithIndex(in);
result.writeAsCsv(resultPath, "\n", ",");
env.execute();
import org.apache.flink.api.scala._
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
val input: DataSet[String] = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H")
val result: DataSet[(Long, String)] = input.zipWithIndex
result.writeAsCsv(resultPath, "\n", ",")
env.execute()
from flink.plan.Environment import get_environment
env = get_environment()
env.set_parallelism(2)
input = env.from_elements("A", "B", "C", "D", "E", "F", "G", "H")
result = input.zip_with_index()
result.write_text(result_path)
env.execute()
may yield the tuples: (0,G), (1,H), (2,A), (3,B), (4,C), (5,D), (6,E), (7,F)
Zip with a Unique Identifier #
In many cases one may not need to assign consecutive labels.
zipWithUniqueId
works in a pipelined fashion, speeding up the label assignment process. This method receives a data set as input and returns a new data set of (unique id, initial value)
2-tuples.
For example, the following code:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataSet<String> in = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H");
DataSet<Tuple2<Long, String>> result = DataSetUtils.zipWithUniqueId(in);
result.writeAsCsv(resultPath, "\n", ",");
env.execute();
import org.apache.flink.api.scala._
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
val input: DataSet[String] = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H")
val result: DataSet[(Long, String)] = input.zipWithUniqueId
result.writeAsCsv(resultPath, "\n", ",")
env.execute()
may yield the tuples: (0,G), (1,A), (2,H), (3,B), (5,C), (7,D), (9,E), (11,F)