Hadoop MapReduce compatibility with Flink
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

Starting with Flink 1.12 the DataSet API has been soft deprecated.

We recommend that you use the Table API and SQL to run efficient batch pipelines in a fully unified API. Table API is well integrated with common batch connectors and catalogs.

Alternatively, you can also use the DataStream API with BATCH execution mode. The linked section also outlines cases where it makes sense to use the DataSet API but those cases will become rarer as development progresses and the DataSet API will eventually be removed. Please also see FLIP-131 for background information on this decision.

Flink and Map Reduce compatibility #

Flink is compatible with Apache Hadoop MapReduce interfaces and therefore allows reusing code that was implemented for Hadoop MapReduce.

You can:

This document shows how to use existing Hadoop MapReduce code with Flink. Please refer to the Connecting to other systems guide for reading from Hadoop supported file systems.

Project Configuration #

Support for Hadoop is contained in the flink-hadoop-compatibility Maven module.

Add the following dependency to your pom.xml to use hadoop

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-hadoop-compatibility_2.12</artifactId>
	<version>1.18-SNAPSHOT</version>
</dependency>

If you want to run your Flink application locally (e.g. from your IDE), you also need to add a hadoop-client dependency such as:

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.10.2</version>
    <scope>provided</scope>
</dependency>

Using Hadoop Mappers and Reducers #

Hadoop Mappers are semantically equivalent to Flink’s FlatMapFunctions and Hadoop Reducers are equivalent to Flink’s GroupReduceFunctions. Flink provides wrappers for implementations of Hadoop MapReduce’s Mapper and Reducer interfaces, i.e., you can reuse your Hadoop Mappers and Reducers in regular Flink programs. At the moment, only the Mapper and Reduce interfaces of Hadoop’s mapred API (org.apache.hadoop.mapred) are supported.

The wrappers take a DataSet<Tuple2<KEYIN,VALUEIN>> as input and produce a DataSet<Tuple2<KEYOUT,VALUEOUT>> as output where KEYIN and KEYOUT are the keys and VALUEIN and VALUEOUT are the values of the Hadoop key-value pairs that are processed by the Hadoop functions. For Reducers, Flink offers a wrapper for a GroupReduceFunction with (HadoopReduceCombineFunction) and without a Combiner (HadoopReduceFunction). The wrappers accept an optional JobConf object to configure the Hadoop Mapper or Reducer.

Flink’s function wrappers are

  • org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction,
  • org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction, and
  • org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction.

and can be used as regular Flink FlatMapFunctions or GroupReduceFunctions.

The following example shows how to use Hadoop Mapper and Reducer functions.

// Obtain data to process somehow.
DataSet<Tuple2<LongWritable, Text>> text = [...];

DataSet<Tuple2<Text, LongWritable>> result = text
  // use Hadoop Mapper (Tokenizer) as MapFunction
  .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
    new Tokenizer()
  ))
  .groupBy(0)
  // use Hadoop Reducer (Counter) as Reduce- and CombineFunction
  .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
    new Counter(), new Counter()
  ));

Please note: The Reducer wrapper works on groups as defined by Flink’s groupBy() operation. It does not consider any custom partitioners, sort or grouping comparators you might have set in the JobConf.

Complete Hadoop WordCount Example #

The following example shows a complete WordCount implementation using Hadoop data types, Input- and OutputFormats, and Mapper and Reducer implementations.

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Set up the Hadoop TextInputFormat.
Job job = Job.getInstance();
HadoopInputFormat<LongWritable, Text> hadoopIF =
  new HadoopInputFormat<LongWritable, Text>(
    new TextInputFormat(), LongWritable.class, Text.class, job
  );
TextInputFormat.addInputPath(job, new Path(inputPath));

// Read data using the Hadoop TextInputFormat.
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);

DataSet<Tuple2<Text, LongWritable>> result = text
  // use Hadoop Mapper (Tokenizer) as MapFunction
  .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
    new Tokenizer()
  ))
  .groupBy(0)
  // use Hadoop Reducer (Counter) as Reduce- and CombineFunction
  .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
    new Counter(), new Counter()
  ));

// Set up the Hadoop TextOutputFormat.
HadoopOutputFormat<Text, LongWritable> hadoopOF =
  new HadoopOutputFormat<Text, LongWritable>(
    new TextOutputFormat<Text, LongWritable>(), job
  );
hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(job, new Path(outputPath));

// Emit data using the Hadoop TextOutputFormat.
result.output(hadoopOF);

// Execute Program
env.execute("Hadoop WordCount");

Back to top