Get a Flink example program up and running in a few simple steps.
Flink runs on Linux, Mac OS X, and Windows. To be able to run Flink, the only requirement is to have a working Java 7.x (or higher) installation. Windows users, please take a look at the Flink on Windows guide which describes how to run Flink on Windows for local setups.
You can check the correct installation of Java by issuing the following command:
java -version
If you have Java 8, the output will look something like this:
java version "1.8.0_111"
Java(TM) SE Runtime Environment (build 1.8.0_111-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode)
$ cd ~/Downloads # Go to download directory
$ tar xzf flink-*.tgz # Unpack the downloaded archive
$ cd flink-1.3.2
For MacOS X users, Flink can be installed through Homebrew.
$ brew install apache-flink
...
$ flink --version
Version: 1.2.0, Commit ID: 1c659cf
$ ./bin/start-local.sh # Start Flink
Check the JobManager’s web frontend at http://localhost:8081 and make sure everything is up and running. The web frontend should report a single available TaskManager instance.
You can also verify that the system is running by checking the log files in the logs
directory:
$ tail log/flink-*-jobmanager-*.log
INFO ... - Starting JobManager
INFO ... - Starting JobManager web frontend
INFO ... - Web frontend listening at 127.0.0.1:8081
INFO ... - Registered TaskManager at 127.0.0.1 (akka://flink/user/taskmanager)
You can find the complete source code for this SocketWindowWordCount example in scala and java on GitHub.
object SocketWindowWordCount {
def main(args: Array[String]) : Unit = {
// the port to connect to
val port: Int = try {
ParameterTool.fromArgs(args).getInt("port")
} catch {
case e: Exception => {
System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")
return
}
}
// get the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// get input data by connecting to the socket
val text = env.socketTextStream("localhost", port, '\n')
// parse the data, group it, window it, and aggregate the counts
val windowCounts = text
.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.sum("count")
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1)
env.execute("Socket Window WordCount")
}
// Data type for words with count
case class WordWithCount(word: String, count: Long)
}
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// the port to connect to
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
port = params.getInt("port");
} catch (Exception e) {
System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
return;
}
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("localhost", port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
// Data type for words with count
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
Now, we are going to run this Flink application. It will read text from a socket and once every 5 seconds print the number of occurrences of each distinct word during the previous 5 seconds, i.e. a tumbling window of processing time, as long as words are floating in.
First of all, we use netcat to start local server via
$ nc -l 9000
Submit the Flink program:
$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
Using address 127.0.0.1:6123 to connect to JobManager.
JobManager web interface address http://127.0.0.1:8081
Starting execution of program
Submitting job with JobID: 574a10c8debda3dccd0c78a3bde55e1b. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@127.0.0.1:6123/user/jobmanager#297388688]
11/04/2016 14:04:50 Job execution switched to status RUNNING.
11/04/2016 14:04:50 Source: Socket Stream -> Flat Map(1/1) switched to SCHEDULED
11/04/2016 14:04:50 Source: Socket Stream -> Flat Map(1/1) switched to DEPLOYING
11/04/2016 14:04:50 Fast TumblingProcessingTimeWindows(5000) of WindowedStream.main(SocketWindowWordCount.java:79) -> Sink: Unnamed(1/1) switched to SCHEDULED
11/04/2016 14:04:51 Fast TumblingProcessingTimeWindows(5000) of WindowedStream.main(SocketWindowWordCount.java:79) -> Sink: Unnamed(1/1) switched to DEPLOYING
11/04/2016 14:04:51 Fast TumblingProcessingTimeWindows(5000) of WindowedStream.main(SocketWindowWordCount.java:79) -> Sink: Unnamed(1/1) switched to RUNNING
11/04/2016 14:04:51 Source: Socket Stream -> Flat Map(1/1) switched to RUNNING
The program connects to the socket and waits for input. You can check the web interface to verify that the job is running as expected:
Words are counted in time windows of 5 seconds (processing time, tumbling
windows) and are printed to stdout
. Monitor the JobManager’s output file
and write some text in nc
(input is sent to Flink line by line after
hitting
$ nc -l 9000
lorem ipsum
ipsum ipsum ipsum
bye
The .out
file will print the counts at the end of each time window as long
as words are floating in, e.g.:
$ tail -f log/flink-*-jobmanager-*.out
lorem : 1
bye : 1
ipsum : 4
To stop Flink when you’re done type:
$ ./bin/stop-local.sh
Check out some more examples to get a better feel for Flink’s programming APIs. When you are done with that, go ahead and read the streaming guide.