Overview
This documentation is for an unreleased version of Apache Flink Machine Learning Library. We recommend you use the latest stable version.

Overview #

This document provides a brief introduction to the basic concepts in Flink ML.

Table API #

Flink ML’s API is based on Flink’s Table API. The Table API is a language-integrated query API for Java, Scala, and Python that allows the composition of queries from relational operators such as selection, filter, and join in a very intuitive way.

Table API allows the usage of a wide range of data types. Flink Document Data Types page provides a list of supported types. In addition to these types, Flink ML also provides support for Vector Type.

The Table API integrates seamlessly with Flink’s DataStream API. You can easily switch between all APIs and libraries which build upon them. Please refer to Flink’s document for how to convert between Table and DataStream, as well as other usage of Flink Table API.

Stage #

A Stage is a node in a Pipeline or Graph. It is the fundamental component in Flink ML. This interface is only a concept, and does not have any actual functionality. Its subclasses include the follows.

  • Estimator: An Estimator is a Stage that is reponsible for the training process in machine learning algorithms. It implements a fit() method that takes a list of tables and produces a Model.

  • AlgoOperator: An AlgoOperator is a Stage that is used to encode generic multi-input multi-output computation logic. It implements a transform() method, which applies certain computation logic on the given input tables and returns a list of result tables.

  • Transformer: A Transformer is an AlgoOperator with the semantic difference that it encodes the Transformation logic, such that a record in the output typically corresponds to one record in the input. In contrast, an AlgoOperator is a better fit to express aggregation logic where a record in the output could be computed from an arbitrary number of records in the input.

  • Model: A Model is a Transformer with the extra APIs to set and get model data. It is typically generated by fitting an Estimator on a list of tables. It provides getModelData() and setModelData(), which allows users to explicitly read or write model data tables to the transformer. Each table could be an unbounded stream of model data changes.

A typical usage of Stage is to create an Estimator instance first, trigger its training process by invoking its fit() method, and to perform predictions with the resulting Model instance. This example usage is shown in the code below.

// Suppose SumModel is a concrete subclass of Model, SumEstimator is a concrete subclass of Estimator.

Table trainData = ...;
Table predictData = ...;

SumEstimator estimator = new SumEstimator();
SumModel model = estimator.fit(trainData);
Table predictResult = model.transform(predictData)[0];

Builders #

In order to organize Flink ML stages into more complexed format so as to achieve advanced functionalities, like chaining data processing and machine learning algorithms together, Flink ML provides APIs that help to manage the relationship and structure of stages in Flink jobs. The entry of these APIs includes Pipeline and Graph.

Pipeline #

A Pipeline acts as an Estimator. It consists of an ordered list of stages, each of which could be an Estimator, Model, Transformer or AlgoOperator. Its fit() method goes through all stages of this pipeline in order and does the following on each stage until the last Estimator (inclusive).

  • If a stage is an Estimator, it would invoke the stage’s fit() method with the input tables to generage a Model. And if there is Estimator after this stage, it would transform the input tables using the generated Model to get result tables, then pass the result tables to the next stage as inputs.
  • If a stage is an AlgoOperator AND there is Estimator after this stage, it would transform the input tables using this stage to get result tables, then pass the result tables to the next stage as inputs.

After all the Estimators are trained to fit their input tables, a new PipelineModel will be created with the same stages in this pipeline, except that all the Estimators in the PipelineModel are replaced with the models generated in the above process.

A PipelineModel acts as a Model. It consists of an ordered list of stages, each of which could be a Model, Transformer or AlgoOperator. Its transform() method applies all stages in this PipelineModel on the input tables in order. The output of one stage is used as the input of the next stage (if any). The output of the last stage is returned as the result of this method.

A Pipeline can be created by passing a list of Stages to Pipeline’s constructor. For example,

// Suppose SumModel is a concrete subclass of Model, SumEstimator is a concrete subclass of Estimator.

Model modelA = new SumModel().setModelData(tEnv.fromValues(10));
Estimator estimatorA = new SumEstimator();
Model modelB = new SumModel().setModelData(tEnv.fromValues(30));

List<Stage<?>> stages = Arrays.asList(modelA, estimatorA, modelB);
Estimator<?, ?> estimator = new Pipeline(stages);

The commands above creates a Pipeline like follows.

graph LR empty0[ ] --> modelA --> estimatorA --> modelB --> empty1[ ] style empty0 fill:#FFFFFF, stroke:#FFFFFF; style empty1 fill:#FFFFFF, stroke:#FFFFFF;

Graph #

A Graph acts as an Estimator. A Graph consists of a DAG of stages, each of which could be an Estimator, Model, Transformer or AlgoOperator. When Graph::fit is called, the stages are executed in a topologically-sorted order. If a stage is an Estimator, its Estimator::fit method will be called on the input tables (from the input edges) to fit a Model. Then the Model will be used to transform the input tables and produce output tables to the output edges. If a stage is an AlgoOperator, its AlgoOperator::transform method will be called on the input tables and produce output tables to the output edges. The GraphModel fitted from a Graph consists of the fitted Models and AlgoOperators, corresponding to the Graph’s stages.

A GraphModel acts as a Model. A GraphModel consists of a DAG of stages, each of which could be an Estimator, Model, Transformer or AlgoOperator. When GraphModel::transform is called, the stages are executed in a topologically-sorted order. When a stage is executed, its AlgoOperator::transform method will be called on the input tables (from the input edges) and produce output tables to the output edges.

A Graph can be constructed via the GraphBuilder class, which provides methods like addAlgoOperator or addEstimator to help adding stages to a graph. Flink ML also introduces TableId class to represent the input/output of a stage and to help express the relationship between stages in a graph, thus allowing users to construct the DAG before they have the concrete tables available.

The example codes below shows how to build a Graph.

// Suppose SumModel is a concrete subclass of Model.

GraphBuilder builder = new GraphBuilder();
// Creates nodes.
SumModel stage1 = new SumModel().setModelData(tEnv.fromValues(1));
SumModel stage2 = new SumModel();
SumModel stage3 = new SumModel().setModelData(tEnv.fromValues(3));
// Creates inputs and modelDataInputs.
TableId input = builder.createTableId();
TableId modelDataInput = builder.createTableId();
// Feeds inputs and gets outputs.
TableId output1 = builder.addAlgoOperator(stage1, input)[0];
TableId output2 = builder.addAlgoOperator(stage2, output1)[0];
builder.setModelDataOnModel(stage2, modelDataInput);
TableId output3 = builder.addAlgoOperator(stage3, output2)[0];
TableId modelDataOutput = builder.getModelDataFromModel(stage3)[0];

// Builds a Model from the graph.
TableId[] inputs = new TableId[] {input};
TableId[] outputs = new TableId[] {output3};
TableId[] modelDataInputs = new TableId[] {modelDataInput};
TableId[] modelDataOutputs = new TableId[] {modelDataOutput};
Model<?> model = builder.buildModel(inputs, outputs, modelDataInputs, modelDataOutputs);

The code above constructs a Graph like follows.

graph LR empty0[ ] --> |input| stage1 stage1 --> |output1| stage2 empty1[ ] --> |modelDataInput| stage2 stage2 --> |output2| stage3 stage3 --> |output3| empty3[ ] stage3 --> |modelDataOutput| empty2[ ] style empty0 fill:#FFFFFF, stroke:#FFFFFF; style empty1 fill:#FFFFFF, stroke:#FFFFFF; style empty2 fill:#FFFFFF, stroke:#FFFFFF; style empty3 fill:#FFFFFF, stroke:#FFFFFF;

Parameter #

Flink ML Stage is a subclass of WithParams, which provides a uniform API to get and set parameters.

A Param is the definition of a parameter, including name, class, description, default value and the validator.

In order to set the parameter of an algorithm, users can use any of the following ways.

  • Invoke the parameter’s specific set method. For example, in order to set K, the number of clusters, of a K-means algorithm, users can directly invoke setK() method on that KMeans instance.
  • Pass a parameter map containing new values to the stage through ParamUtils.updateExistingParams() method.

If a Model is generated through an Estimator’s fit() method, the Model would inherit the Estimator object’s parameters. Thus there is no need to set the parameters for a second time if the parameters are not changed.