Kmeans

K-means #

K-means is a commonly-used clustering algorithm. It groups given data points into a predefined number of clusters.

Input Columns #

Param name Type Default Description
featuresCol Vector "features" Feature vector

Output Columns #

Param name Type Default Description
predictionCol Integer "prediction" Predicted cluster center

Parameters #

Below are the parameters required by KMeansModel.

Key Default Type Required Description
distanceMeasure EuclideanDistanceMeasure.NAME String no Distance measure. Supported values: EuclideanDistanceMeasure.NAME
featuresCol "features" String no Features column name.
predictionCol "prediction" String no Prediction column name.
k 2 Integer no The max number of clusters to create.

KMeans needs parameters above and also below.

Key Default Type Required Description
initMode "random" String no The initialization algorithm. Supported options: ‘random’.
seed null Long no The random seed.
maxIter 20 Integer no Maximum number of iterations.

Examples #

import org.apache.flink.ml.clustering.kmeans.KMeans;
import org.apache.flink.ml.clustering.kmeans.KMeansModel;
import org.apache.flink.ml.linalg.DenseVector;
import org.apache.flink.ml.linalg.Vectors;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;

/** Simple program that trains a KMeans model and uses it for clustering. */
public class KMeansExample {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // Generates input data.
        DataStream<DenseVector> inputStream =
                env.fromElements(
                        Vectors.dense(0.0, 0.0),
                        Vectors.dense(0.0, 0.3),
                        Vectors.dense(0.3, 0.0),
                        Vectors.dense(9.0, 0.0),
                        Vectors.dense(9.0, 0.6),
                        Vectors.dense(9.6, 0.0));
        Table inputTable = tEnv.fromDataStream(inputStream).as("features");

        // Creates a K-means object and initializes its parameters.
        KMeans kmeans = new KMeans().setK(2).setSeed(1L);

        // Trains the K-means Model.
        KMeansModel kmeansModel = kmeans.fit(inputTable);

        // Uses the K-means Model for predictions.
        Table outputTable = kmeansModel.transform(inputTable)[0];

        // Extracts and displays the results.
        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
            Row row = it.next();
            DenseVector features = (DenseVector) row.getField(kmeans.getFeaturesCol());
            int clusterId = (Integer) row.getField(kmeans.getPredictionCol());
            System.out.printf("Features: %s \tCluster ID: %s\n", features, clusterId);
        }
    }
}

# Simple program that trains a KMeans model and uses it for clustering.
#
# Before executing this program, please make sure you have followed Flink ML's
# quick start guideline to set up Flink ML and Flink environment. The guideline
# can be found at
#
# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
from pyflink.ml.lib.clustering.kmeans import KMeans
from pyflink.table import StreamTableEnvironment

# create a new StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()

# create a StreamTableEnvironment
t_env = StreamTableEnvironment.create(env)

# generate input data
input_data = t_env.from_data_stream(
    env.from_collection([
        (Vectors.dense([0.0, 0.0]),),
        (Vectors.dense([0.0, 0.3]),),
        (Vectors.dense([0.3, 3.0]),),
        (Vectors.dense([9.0, 0.0]),),
        (Vectors.dense([9.0, 0.6]),),
        (Vectors.dense([9.6, 0.0]),),
    ],
        type_info=Types.ROW_NAMED(
            ['features'],
            [DenseVectorTypeInfo()])))

# create a kmeans object and initialize its parameters
kmeans = KMeans().set_k(2).set_seed(1)

# train the kmeans model
model = kmeans.fit(input_data)

# use the kmeans model for predictions
output = model.transform(input_data)[0]

# extract and display the results
field_names = output.get_schema().get_field_names()
for result in t_env.to_data_stream(output).execute_and_collect():
    features = result[field_names.index(kmeans.get_features_col())]
    cluster_id = result[field_names.index(kmeans.get_prediction_col())]
    print('Features: ' + str(features) + ' \tCluster Id: ' + str(cluster_id))

Online K-means #

Online K-Means extends the function of K-Means, supporting to train a K-Means model continuously according to an unbounded stream of train data.

Online K-Means makes updates with the “mini-batch” K-Means rule, generalized to incorporate forgetfulness (i.e. decay). After the centroids estimated on the current batch are acquired, Online K-Means computes the new centroids from the weighted average between the original and the estimated centroids. The weight of the estimated centroids is the number of points assigned to them. The weight of the original centroids is also the number of points, but additionally multiplying with the decay factor.

The decay factor scales the contribution of the clusters as estimated thus far. If the decay factor is 1, all batches are weighted equally. If the decay factor is 0, new centroids are determined entirely by recent data. Lower values correspond to more forgetting.

Input Columns #

Param name Type Default Description
featuresCol Vector "features" Feature vector

Output Columns #

Param name Type Default Description
predictionCol Integer "prediction" Predicted cluster center

Parameters #

Below are the parameters required by OnlineKMeansModel.

Key Default Type Required Description
distanceMeasure EuclideanDistanceMeasure.NAME String no Distance measure. Supported values: EuclideanDistanceMeasure.NAME
featuresCol "features" String no Features column name.
predictionCol "prediction" String no Prediction column name.
k 2 Integer no The max number of clusters to create.

OnlineKMeans needs parameters above and also below.

Key Default Type Required Description
batchStrategy COUNT_STRATEGY String no Strategy to create mini batch from online train data.
globalBatchSize 32 Integer no Global batch size of training algorithms.
decayFactor 0. Double no The forgetfulness of the previous centroids.
seed null Long no The random seed.

Examples #

import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.ml.clustering.kmeans.KMeansModelData;
import org.apache.flink.ml.clustering.kmeans.OnlineKMeans;
import org.apache.flink.ml.clustering.kmeans.OnlineKMeansModel;
import org.apache.flink.ml.examples.util.PeriodicSourceFunction;
import org.apache.flink.ml.linalg.DenseVector;
import org.apache.flink.ml.linalg.Vectors;
import org.apache.flink.ml.linalg.typeinfo.DenseVectorTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

/** Simple program that trains an OnlineKMeans model and uses it for clustering. */
public class OnlineKMeansExample {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // Generates input training and prediction data. Both are infinite streams that periodically
        // sends out provided data to trigger model update and prediction.
        List<Row> trainData1 =
                Arrays.asList(
                        Row.of(Vectors.dense(0.0, 0.0)),
                        Row.of(Vectors.dense(0.0, 0.3)),
                        Row.of(Vectors.dense(0.3, 0.0)),
                        Row.of(Vectors.dense(9.0, 0.0)),
                        Row.of(Vectors.dense(9.0, 0.6)),
                        Row.of(Vectors.dense(9.6, 0.0)));

        List<Row> trainData2 =
                Arrays.asList(
                        Row.of(Vectors.dense(10.0, 100.0)),
                        Row.of(Vectors.dense(10.0, 100.3)),
                        Row.of(Vectors.dense(10.3, 100.0)),
                        Row.of(Vectors.dense(-10.0, -100.0)),
                        Row.of(Vectors.dense(-10.0, -100.6)),
                        Row.of(Vectors.dense(-10.6, -100.0)));

        List<Row> predictData =
                Arrays.asList(
                        Row.of(Vectors.dense(10.0, 10.0)), Row.of(Vectors.dense(-10.0, 10.0)));

        SourceFunction<Row> trainSource =
                new PeriodicSourceFunction(1000, Arrays.asList(trainData1, trainData2));
        DataStream<Row> trainStream =
                env.addSource(trainSource, new RowTypeInfo(DenseVectorTypeInfo.INSTANCE));
        Table trainTable = tEnv.fromDataStream(trainStream).as("features");

        SourceFunction<Row> predictSource =
                new PeriodicSourceFunction(1000, Collections.singletonList(predictData));
        DataStream<Row> predictStream =
                env.addSource(predictSource, new RowTypeInfo(DenseVectorTypeInfo.INSTANCE));
        Table predictTable = tEnv.fromDataStream(predictStream).as("features");

        // Creates an online K-means object and initializes its parameters and initial model data.
        OnlineKMeans onlineKMeans =
                new OnlineKMeans()
                        .setFeaturesCol("features")
                        .setPredictionCol("prediction")
                        .setGlobalBatchSize(6)
                        .setInitialModelData(
                                KMeansModelData.generateRandomModelData(tEnv, 2, 2, 0.0, 0));

        // Trains the online K-means Model.
        OnlineKMeansModel onlineModel = onlineKMeans.fit(trainTable);

        // Uses the online K-means Model for predictions.
        Table outputTable = onlineModel.transform(predictTable)[0];

        // Extracts and displays the results. As training data stream continuously triggers the
        // update of the internal k-means model data, clustering results of the same predict dataset
        // would change over time.
        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
            Row row1 = it.next();
            DenseVector features1 = (DenseVector) row1.getField(onlineKMeans.getFeaturesCol());
            Integer clusterId1 = (Integer) row1.getField(onlineKMeans.getPredictionCol());
            Row row2 = it.next();
            DenseVector features2 = (DenseVector) row2.getField(onlineKMeans.getFeaturesCol());
            Integer clusterId2 = (Integer) row2.getField(onlineKMeans.getPredictionCol());
            if (Objects.equals(clusterId1, clusterId2)) {
                System.out.printf("%s and %s are now in the same cluster.\n", features1, features2);
            } else {
                System.out.printf(
                        "%s and %s are now in different clusters.\n", features1, features2);
            }
        }
    }
}