Start working on your Flink Java program in a few simple steps.
The only requirements are working Maven 3.0.4 (or higher) and Java 7.x (or higher) installations.
Use one of the following commands to create a project:
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.3.2
$ curl https://flink.apache.org/q/quickstart.sh | bash
There will be a new directory in your working directory. If you’ve used
the curl approach, the directory is called quickstart
. Otherwise,
it has the name of your artifactId
:
$ tree quickstart/
quickstart/
├── pom.xml
└── src
└── main
├── java
│ └── org
│ └── myorg
│ └── quickstart
│ ├── BatchJob.java
│ ├── SocketTextStreamWordCount.java
│ ├── StreamingJob.java
│ └── WordCount.java
└── resources
└── log4j.properties
The sample project is a Maven project, which contains four classes. StreamingJob and BatchJob are basic skeleton programs, SocketTextStreamWordCount is a working streaming example and WordCountJob is a working batch example. Please note that the main method of all classes allow you to start Flink in a development/testing mode.
We recommend you import this project into your IDE to develop and test it. If you use Eclipse, the m2e plugin allows to import Maven projects. Some Eclipse bundles include that plugin by default, others require you to install it manually. The IntelliJ IDE supports Maven projects out of the box.
A note to Mac OS X users: The default JVM heapsize for Java is too
small for Flink. You have to manually increase it. In Eclipse, choose
Run Configurations -> Arguments
and write into the VM Arguments
box: -Xmx800m
.
If you want to build your project, go to your project directory and
issue the mvn clean install -Pbuild-jar
command. You will
find a jar that runs on every Flink cluster with a compatible
version, target/original-your-artifact-id-your-version.jar. There
is also a fat-jar in target/your-artifact-id-your-version.jar which,
additionally, contains all dependencies that were added to the Maven
project.
Write your application!
The quickstart project contains a WordCount
implementation, the
“Hello World” of Big Data processing systems. The goal of WordCount
is to determine the frequencies of words in a text, e.g., how often do
the terms “the” or “house” occur in all Wikipedia texts.
Sample Input:
big data is big
Sample Output:
big 2
data 1
is 1
The following code shows the WordCount
implementation from the
Quickstart which processes some text lines with two operators (a FlatMap
and a Reduce operation via aggregating a sum), and prints the resulting
words and counts to std-out.
public class WordCount {
public static void main(String[] args) throws Exception {
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input data
DataSet<String> text = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,"
);
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new LineSplitter())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
// execute and print result
counts.print();
}
}
The operations are defined by specialized classes, here the LineSplitter class.
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
Check GitHub for the full example code.
For a complete overview over our API, have a look at the DataStream API and DataSet API sections. If you have any trouble, ask on our Mailing List. We are happy to provide help.