
Hadoop formats #

Project Configuration #

对 Hadoop 的支持位于 flink-hadoop-compatibility Maven 模块中。

将以下依赖添加到 pom.xml 中使用 hadoop


如果你想在本地运行你的 Flink 应用(例如在 IDE 中),你需要按照如下所示将 hadoop-client 依赖也添加到 pom.xml


Using Hadoop InputFormats #

在 Flink 中使用 Hadoop InputFormats,必须首先使用 HadoopInputs 工具类的 readHadoopFilecreateHadoopInput 包装 Input Format。 前者用于从 FileInputFormat 派生的 Input Format,而后者必须用于通用的 Input Format。 生成的 InputFormat 可通过使用 ExecutionEnvironment#createInput 创建数据源。

生成的 DataStream 包含 2 元组,其中第一个字段是键,第二个字段是从 Hadoop InputFormat 接收的值。

下面的示例展示了如何使用 Hadoop 的 TextInputFormat

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  KeyValueTextInputFormat textInputFormat = new KeyValueTextInputFormat();

  DataStream<Tuple2<Text, Text>> input = env.createInput(HadoopInputs.readHadoopFile(
  textInputFormat, Text.class, Text.class, textPath));

  // Do something with the data.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val textInputFormat = new KeyValueTextInputFormat
val input: DataStream[(Text, Text)] =
    textInputFormat, classOf[Text], classOf[Text], textPath))

    // Do something with the data.

Back to top