This documentation is for an unreleased version of Apache Flink Machine Learning Library. We recommend you use the latest stable version.
Overview #
This document provides a brief introduction to the basic concepts in Flink ML.
Table API #
Flink ML’s API is based on Flink’s Table API. The Table API is a language-integrated query API for Java, Scala, and Python that allows the composition of queries from relational operators such as selection, filter, and join in a very intuitive way.
Table API allows the usage of a wide range of data types. Flink Document Data
Types
page provides a list of supported types. In addition to these types, Flink ML
also provides support for Vector
Type.
The Table API integrates seamlessly with Flink’s DataStream API. You can easily
switch between all APIs and libraries which build upon them. Please refer to
Flink’s document for how to convert between Table
and DataStream
, as well as
other usage of Flink Table API.
Stage #
A Stage
is a node in a Pipeline
or Graph
. It is the fundamental component
in Flink ML. This interface is only a concept, and does not have any actual
functionality. Its subclasses include the follows.
-
Estimator
: AnEstimator
is aStage
that is reponsible for the training process in machine learning algorithms. It implements afit()
method that takes a list of tables and produces aModel
. -
AlgoOperator
: AnAlgoOperator
is aStage
that is used to encode generic multi-input multi-output computation logic. It implements atransform()
method, which applies certain computation logic on the given input tables and returns a list of result tables. -
Transformer
: ATransformer
is anAlgoOperator
with the semantic difference that it encodes the Transformation logic, such that a record in the output typically corresponds to one record in the input. In contrast, anAlgoOperator
is a better fit to express aggregation logic where a record in the output could be computed from an arbitrary number of records in the input. -
Model
: AModel
is aTransformer
with the extra APIs to set and get model data. It is typically generated by fitting anEstimator
on a list of tables. It providesgetModelData()
andsetModelData()
, which allows users to explicitly read or write model data tables to the transformer. Each table could be an unbounded stream of model data changes.
A typical usage of Stage
is to create an Estimator
instance first, trigger
its training process by invoking its fit()
method, and to perform predictions
with the resulting Model
instance. This example usage is shown in the code
below.
// Suppose SumModel is a concrete subclass of Model, SumEstimator is a concrete subclass of Estimator.
Table trainData = ...;
Table predictData = ...;
SumEstimator estimator = new SumEstimator();
SumModel model = estimator.fit(trainData);
Table predictResult = model.transform(predictData)[0];
Builders #
In order to organize Flink ML stages into more complexed format so as to achieve
advanced functionalities, like chaining data processing and machine learning
algorithms together, Flink ML provides APIs that help to manage the relationship
and structure of stages in Flink jobs. The entry of these APIs includes
Pipeline
and Graph
.
Pipeline #
A Pipeline
acts as an Estimator
. It consists of an ordered list of stages,
each of which could be an Estimator
, Model
, Transformer
or AlgoOperator
.
Its fit()
method goes through all stages of this pipeline in order and does
the following on each stage until the last Estimator
(inclusive).
- If a stage is an
Estimator
, it would invoke the stage’sfit()
method with the input tables to generage aModel
. And if there isEstimator
after this stage, it would transform the input tables using the generatedModel
to get result tables, then pass the result tables to the next stage as inputs. - If a stage is an
AlgoOperator
AND there isEstimator
after this stage, it would transform the input tables using this stage to get result tables, then pass the result tables to the next stage as inputs.
After all the Estimators
are trained to fit their input tables, a new
PipelineModel
will be created with the same stages in this pipeline, except
that all the Estimator
s in the PipelineModel
are replaced with the models
generated in the above process.
A PipelineModel
acts as a Model
. It consists of an ordered list of stages,
each of which could be a Model
, Transformer
or AlgoOperator
. Its
transform()
method applies all stages in this PipelineModel
on the input
tables in order. The output of one stage is used as the input of the next stage
(if any). The output of the last stage is returned as the result of this method.
A Pipeline
can be created by passing a list of Stage
s to Pipeline’s
constructor. For example,
// Suppose SumModel is a concrete subclass of Model, SumEstimator is a concrete subclass of Estimator.
Model modelA = new SumModel().setModelData(tEnv.fromValues(10));
Estimator estimatorA = new SumEstimator();
Model modelB = new SumModel().setModelData(tEnv.fromValues(30));
List<Stage<?>> stages = Arrays.asList(modelA, estimatorA, modelB);
Estimator<?, ?> estimator = new Pipeline(stages);
The commands above creates a Pipeline like follows.
graph LR empty0[ ] --> modelA --> estimatorA --> modelB --> empty1[ ] style empty0 fill:#FFFFFF, stroke:#FFFFFF; style empty1 fill:#FFFFFF, stroke:#FFFFFF;
Graph #
A Graph
acts as an Estimator
. A Graph
consists of a DAG of stages, each of
which could be an Estimator
, Model
, Transformer
or AlgoOperator
. When
Graph::fit
is called, the stages are executed in a topologically-sorted order.
If a stage is an Estimator
, its Estimator::fit
method will be called on the
input tables (from the input edges) to fit a Model
. Then the Model
will be
used to transform the input tables and produce output tables to the output
edges. If a stage is an AlgoOperator
, its AlgoOperator::transform
method
will be called on the input tables and produce output tables to the output
edges. The GraphModel
fitted from a Graph
consists of the fitted Models
and AlgoOperators
, corresponding to the Graph
’s stages.
A GraphModel
acts as a Model
. A GraphModel
consists of a DAG of stages,
each of which could be an Estimator
, Model
, Transformer
or AlgoOperator
.
When GraphModel::transform
is called, the stages are executed in a
topologically-sorted order. When a stage is executed, its
AlgoOperator::transform
method will be called on the input tables (from the
input edges) and produce output tables to the output edges.
A Graph
can be constructed via the GraphBuilder
class, which provides
methods like addAlgoOperator
or addEstimator
to help adding stages to a
graph. Flink ML also introduces TableId
class to represent the input/output of
a stage and to help express the relationship between stages in a graph, thus
allowing users to construct the DAG before they have the concrete tables
available.
The example codes below shows how to build a Graph
.
// Suppose SumModel is a concrete subclass of Model.
GraphBuilder builder = new GraphBuilder();
// Creates nodes.
SumModel stage1 = new SumModel().setModelData(tEnv.fromValues(1));
SumModel stage2 = new SumModel();
SumModel stage3 = new SumModel().setModelData(tEnv.fromValues(3));
// Creates inputs and modelDataInputs.
TableId input = builder.createTableId();
TableId modelDataInput = builder.createTableId();
// Feeds inputs and gets outputs.
TableId output1 = builder.addAlgoOperator(stage1, input)[0];
TableId output2 = builder.addAlgoOperator(stage2, output1)[0];
builder.setModelDataOnModel(stage2, modelDataInput);
TableId output3 = builder.addAlgoOperator(stage3, output2)[0];
TableId modelDataOutput = builder.getModelDataFromModel(stage3)[0];
// Builds a Model from the graph.
TableId[] inputs = new TableId[] {input};
TableId[] outputs = new TableId[] {output3};
TableId[] modelDataInputs = new TableId[] {modelDataInput};
TableId[] modelDataOutputs = new TableId[] {modelDataOutput};
Model<?> model = builder.buildModel(inputs, outputs, modelDataInputs, modelDataOutputs);
The code above constructs a Graph
like follows.
graph LR empty0[ ] --> |input| stage1 stage1 --> |output1| stage2 empty1[ ] --> |modelDataInput| stage2 stage2 --> |output2| stage3 stage3 --> |output3| empty3[ ] stage3 --> |modelDataOutput| empty2[ ] style empty0 fill:#FFFFFF, stroke:#FFFFFF; style empty1 fill:#FFFFFF, stroke:#FFFFFF; style empty2 fill:#FFFFFF, stroke:#FFFFFF; style empty3 fill:#FFFFFF, stroke:#FFFFFF;
Parameter #
Flink ML Stage
is a subclass of WithParams
, which provides a uniform API to
get and set parameters.
A Param
is the definition of a parameter, including name, class, description,
default value and the validator.
In order to set the parameter of an algorithm, users can use any of the following ways.
- Invoke the parameter’s specific set method. For example, in order to set
K
, the number of clusters, of a K-means algorithm, users can directly invokesetK()
method on thatKMeans
instance. - Pass a parameter map containing new values to the stage through
ParamUtils.updateExistingParams()
method.
If a Model
is generated through an Estimator
’s fit()
method, the Model
would inherit the Estimator
object’s parameters. Thus there is no need to set
the parameters for a second time if the parameters are not changed.