Quick Start
This documentation is for an unreleased version of Apache Flink Machine Learning Library. We recommend you use the latest stable version.

Quick Start #

This document provides a quick introduction to using Flink ML. Readers of this document will be guided to create a simple Flink job that trains a Machine Learning Model and uses it to provide prediction service.

What Will You Be Building? #

Kmeans is a widely-used clustering algorithm and has been supported by Flink ML. This walkthrough guides you to create a Flink job with Flink ML that initializes and trains a Kmeans model, and finally uses it to predict the cluster id of certain data points.

Prerequisites #

This walkthrough assumes that you have some familiarity with Python, but you should be able to follow along even if you come from a different programming language.

Help, I’m Stuck! #

If you get stuck, check out the community support resources. In particular, Apache Flink’s user mailing list is consistently ranked as one of the most active of any Apache project and a great way to get help quickly.

How To Follow Along #

If you want to follow along, you will require a computer with:

  • Java 8
  • Maven 3
  • Python 3.7 or 3.8

Please walk through this guideline to build and install Flink ML’s Python SDK in your local environment.

Flink ML programs begin by setting up the StreamExecutionEnvironment to execute the Flink ML job. You would have been familiar with this concept if you have experience using Flink. For the example program in this document, a simple StreamExecutionEnvironment without specific configurations would be enough.

Given that Flink ML uses Flink’s Table API, a StreamTableEnvironment would also be necessary for the following program.

# create a new StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()

# create a StreamTableEnvironment
t_env = StreamTableEnvironment.create(env)

Then you can create the Table containing data for the training and prediction process of the following Kmeans algorithm. Flink ML operators search the names of the columns of the input table for input data, and produce prediction results to designated column of the output Table.

# generate input data
input_data = t_env.from_data_stream(
    env.from_collection([
        (Vectors.dense([0.0, 0.0]),),
        (Vectors.dense([0.0, 0.3]),),
        (Vectors.dense([0.3, 3.0]),),
        (Vectors.dense([9.0, 0.0]),),
        (Vectors.dense([9.0, 0.6]),),
        (Vectors.dense([9.6, 0.0]),),
    ],
        type_info=Types.ROW_NAMED(
            ['features'],
            [DenseVectorTypeInfo()])))

Flink ML classes for Kmeans algorithm include KMeans and KMeansModel. KMeans implements the training process of Kmeans algorithm based on the provided training data, and finally generates a KMeansModel. KmeansModel.transform() method encodes the Transformation logic of this algorithm and is used for predictions.

Both KMeans and KMeansModel provides getter/setter methods for Kmeans algorithm’s configuration parameters. This example program explicitly sets the following parameters, and other configuration parameters will have their default values used.

  • k, the number of clusters to create
  • seed, the random seed to initialize cluster centers

When the program invokes KMeans.fit() to generate a KMeansModel, the KMeansModel will inherit the KMeans object’s configuration parameters. Thus it is supported to set KMeansModel’s parameters directly in KMeans object.

# create a kmeans object and initialize its parameters
kmeans = KMeans().set_k(2).set_seed(1)

# train the kmeans model
model = kmeans.fit(input_data)

# use the kmeans model for predictions
output = model.transform(input_data)[0]

Like all other Flink programs, the codes described in the sections above only configures the computation graph of a Flink job, and the program only evaluates the computation logic and collects outputs after the execute() method is invoked. Collected outputs from the output table would be Rows in which featuresCol contains input feature vectors, and predictionCol contains output prediction results, i.e., cluster IDs.

# extract and display the results
field_names = output.get_schema().get_field_names()
for result in t_env.to_data_stream(output).execute_and_collect():
    features = result[field_names.index(kmeans.get_features_col())]
    cluster_id = result[field_names.index(kmeans.get_prediction_col())]
    print('Features: ' + str(features) + ' \tCluster Id: ' + str(cluster_id))

The complete code so far:

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.ml.linalg import Vectors, DenseVectorTypeInfo
from pyflink.ml.clustering.kmeans import KMeans
from pyflink.table import StreamTableEnvironment

# create a new StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()

# create a StreamTableEnvironment
t_env = StreamTableEnvironment.create(env)

# generate input data
input_data = t_env.from_data_stream(
    env.from_collection([
        (Vectors.dense([0.0, 0.0]),),
        (Vectors.dense([0.0, 0.3]),),
        (Vectors.dense([0.3, 3.0]),),
        (Vectors.dense([9.0, 0.0]),),
        (Vectors.dense([9.0, 0.6]),),
        (Vectors.dense([9.6, 0.0]),),
    ],
        type_info=Types.ROW_NAMED(
            ['features'],
            [DenseVectorTypeInfo()])))

# create a kmeans object and initialize its parameters
kmeans = KMeans().set_k(2).set_seed(1)

# train the kmeans model
model = kmeans.fit(input_data)

# use the kmeans model for predictions
output = model.transform(input_data)[0]

# extract and display the results
field_names = output.get_schema().get_field_names()
for result in t_env.to_data_stream(output).execute_and_collect():
    features = result[field_names.index(kmeans.get_features_col())]
    cluster_id = result[field_names.index(kmeans.get_prediction_col())]
    print('Features: ' + str(features) + ' \tCluster Id: ' + str(cluster_id))

After creating a python file (e.g. kmeans_example.py) and saving the code above into the file, you can run the example on the command line:

python kmeans_example.py

The command above would build the example job and run it in a local mini cluster. A sample output in your terminal is as follows.

Features: [9.6,0.0]     Cluster Id: 0
Features: [9.0,0.6]     Cluster Id: 0
Features: [0.0,0.3]     Cluster Id: 1
Features: [0.0,0.0]     Cluster Id: 1
Features: [0.3,3.0]     Cluster Id: 1
Features: [9.0,0.0]     Cluster Id: 0

Prerequisites #

Make sure Java 8 or a higher version has been installed in your local machine. To check the Java version installed, type in your terminal:

$ java -version

Download Flink 1.17, then extract the archive:

$ tar -xzf flink-*.tgz

Run the following commands after having downloaded Flink:

cd ${path_to_flink}
cp opt/flink-python* lib/
export FLINK_HOME=`pwd`

You need to copy Flink ML’s library files to Flink’s folder for proper initialization.

Given that you have followed this guideline, you would have already built Flink ML’s Java SDK. Now, you need to copy the generated library files to Flink’s folder with the following commands.

cd ${path_to_flink_ml}
cp ./flink-ml-dist/target/flink-ml-*-bin/flink-ml*/lib/*.jar $FLINK_HOME/lib/

Please start a Flink standalone cluster in your local environment with the following command.

$FLINK_HOME/bin/start-cluster.sh

You should be able to navigate to the web UI at localhost:8081 to view the Flink dashboard and see that the cluster is up and running.

After creating a python file (e.g. kmeans_example.py) and saving the code above into the file, you may submit the example job to the cluster as follows.

$FLINK_HOME/bin/flink run -py kmeans_example.py

A sample output in your terminal is as follows.

Features: [9.6,0.0]     Cluster Id: 0
Features: [9.0,0.6]     Cluster Id: 0
Features: [0.0,0.3]     Cluster Id: 1
Features: [0.0,0.0]     Cluster Id: 1
Features: [0.3,3.0]     Cluster Id: 1
Features: [9.0,0.0]     Cluster Id: 0

Now you have successfully run the Flink ML job on a Flink Cluster. Other detailed instructions to submit it to a Flink cluster can be found in Job Submission Examples.

Finally, you can stop the Flink standalone cluster with the following command.

$FLINK_HOME/bin/stop-cluster.sh