This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.
The following example programs showcase different applications of Flink
from simple word counting to graph algorithms. The code samples illustrate the
use of Flink’s DataSet API.
The full source code of the following and more examples can be found in the flink-examples-batch
or flink-examples-streaming module of the Flink source repository.
In order to run a Flink example, we assume you have a running Flink instance available. The “Quickstart” and “Setup” tabs in the navigation describe various ways of starting Flink.
The easiest way is running the ./bin/start-cluster.sh, which by default starts a local cluster with one JobManager and one TaskManager.
Each binary release of Flink contains an examples directory with jar files for each of the examples on this page.
To run the WordCount example, issue the following command:
The other examples can be started in a similar way.
Note that many examples run without passing any arguments for them, by using build-in data. To run WordCount with real data, you have to pass the path to the data:
Note that non-local file systems require a schema prefix, such as hdfs://.
Word Count
WordCount is the “Hello World” of Big Data processing systems. It computes the frequency of words in a text collection. The algorithm works in two steps: First, the texts are splits the text to individual words. Second, the words are grouped and counted.
The WordCount example implements the above described algorithm with input parameters: --input <path> --output <path>. As test data, any text file will do.
The WordCount example implements the above described algorithm with input parameters: --input <path> --output <path>. As test data, any text file will do.
Page Rank
The PageRank algorithm computes the “importance” of pages in a graph defined by links, which point from one pages to another page. It is an iterative graph algorithm, which means that it repeatedly applies the same computation. In each iteration, each page distributes its current rank over all its neighbors, and compute its new rank as a taxed sum of the ranks it received from its neighbors. The PageRank algorithm was popularized by the Google search engine which uses the importance of webpages to rank the results of search queries.
In this simple example, PageRank is implemented with a bulk iteration and a fixed number of iterations.
The PageRank program implements the above example.
It requires the following parameters to run: --pages <path> --links <path> --output <path> --numPages <n> --iterations <n>.
he PageRank program implements the above example.
It requires the following parameters to run: --pages <path> --links <path> --output <path> --numPages <n> --iterations <n>.
Input files are plain text files and must be formatted as follows:
Pages represented as an (long) ID separated by new-line characters.
For example "1\n2\n12\n42\n63\n" gives five pages with IDs 1, 2, 12, 42, and 63.
Links are represented as pairs of page IDs which are separated by space characters. Links are separated by new-line characters:
For example "1 2\n2 12\n1 12\n42 63\n" gives four (directed) links (1)->(2), (2)->(12), (1)->(12), and (42)->(63).
For this simple implementation it is required that each page has at least one incoming and one outgoing link (a page can point to itself).
Connected Components
The Connected Components algorithm identifies parts of a larger graph which are connected by assigning all vertices in the same connected part the same component ID. Similar to PageRank, Connected Components is an iterative algorithm. In each step, each vertex propagates its current component ID to all its neighbors. A vertex accepts the component ID from a neighbor, if it is smaller than its own component ID.
This implementation uses a delta iteration: Vertices that have not changed their component ID do not participate in the next step. This yields much better performance, because the later iterations typically deal only with a few outlier vertices.
The ConnectedComponents program implements the above example. It requires the following parameters to run: --vertices <path> --edges <path> --output <path> --iterations <n>.
The ConnectedComponents program implements the above example. It requires the following parameters to run: --vertices <path> --edges <path> --output <path> --iterations <n>.
Input files are plain text files and must be formatted as follows:
Vertices represented as IDs and separated by new-line characters.
For example "1\n2\n12\n42\n63\n" gives five vertices with (1), (2), (12), (42), and (63).
Edges are represented as pairs for vertex IDs which are separated by space characters. Edges are separated by new-line characters:
For example "1 2\n2 12\n1 12\n42 63\n" gives four (undirected) links (1)-(2), (2)-(12), (1)-(12), and (42)-(63).
Relational Query
The Relational Query example assumes two tables, one with orders and the other with lineitems as specified by the TPC-H decision support benchmark. TPC-H is a standard benchmark in the database industry. See below for instructions how to generate the input data.
The example implements the following SQL query.
The Flink program, which implements the above query looks as follows.
The Relational Query program implements the above query. It requires the following parameters to run: --orders <path> --lineitem <path> --output <path>.
Coming soon…
The Relational Query program implements the above query. It requires the following parameters to run: --orders <path> --lineitem <path> --output <path>.
The orders and lineitem files can be generated using the TPC-H benchmark suite’s data generator tool (DBGEN).
Take the following steps to generate arbitrary large input files for the provided Flink programs:
Download and unpack DBGEN
Make a copy of makefile.suite called Makefile and perform the following changes:
Build DBGEN using make
Generate lineitem and orders relations using dbgen. A scale factor
(-s) of 1 results in a generated data set with about 1 GB size.