The ability to chain together different transformers and predictors is an important feature for any Machine Learning (ML) library. In FlinkML we wanted to provide an intuitive API, and at the same time utilize the capabilities of the Scala language to provide type-safe implementations of our pipelines. What we hope to achieve then is an easy to use API, that protects users from type errors at pre-flight (before the job is launched) time, thereby eliminating cases where long running jobs are submitted to the cluster only to see them fail due to some error in the series of data transformations that commonly happen in an ML pipeline.
In this guide then we will describe the choices we made during the implementation of chainable transformers and predictors in FlinkML, and provide guidelines on how developers can create their own algorithms that make use of these capabilities.
So what do we mean by “ML pipelines”? Pipelines in the ML context can be thought of as chains of operations that have some data as input, perform a number of transformations to that data, and then output the transformed data, either to be used as the input (features) of a predictor function, such as a learning model, or just output the transformed data themselves, to be used in some other task. The end learner can of course be a part of the pipeline as well. ML pipelines can often be complicated sets of operations (in-depth explanation) and can become sources of errors for end-to-end learning systems.
The purpose of ML pipelines is then to create a framework that can be used to manage the complexity introduced by these chains of operations. Pipelines should make it easy for developers to define chained transformations that can be applied to the training data, in order to create the end features that will be used to train a learning model, and then perform the same set of transformations just as easily to unlabeled (test) data. Pipelines should also simplify cross-validation and model selection on these chains of operations.
Finally, by ensuring that the consecutive links in the pipeline chain “fit together” we also avoid costly type errors. Since each step in a pipeline can be a computationally-heavy operation, we want to avoid running a pipelined job, unless we are sure that all the input/output pairs in a pipeline “fit”.
The building blocks for pipelines in FlinkML can be found in the
FlinkML follows an API inspired by sklearn which means that we have
Predictor interfaces. For an in-depth look at the design of the
sklearn API the interested reader is referred to this paper.
In short, the
Estimator is the base class from which
Estimator defines a
fit method, and
Transformer also defines a
transform method and
Predictor defines a
fit method of the
Estimator performs the actual training of the model, for example
finding the correct weights in a linear regression task, or the mean and standard deviation of
the data in a feature scaler.
As evident by the naming, classes that implement
Transformer are transform operations like scaling the input and
Predictor implementations are learning algorithms such as Multiple Linear Regression.
Pipelines can be created by chaining together a number of Transformers, and the final link in a pipeline can be a Predictor or another Transformer.
Pipelines that end with Predictor cannot be chained any further.
Below is an example of how a pipeline can be formed:
As we mentioned, FlinkML pipelines are type-safe.
If we tried to chain a transformer with output of type
A to another with input of type
would get an error at pre-flight time if
B. FlinkML achieves this kind of type-safety
through the use of Scala’s implicits.
If you are not familiar with Scala’s implicits we can recommend this excerpt from Martin Odersky’s “Programming in Scala”. In short, implicit conversions allow for ad-hoc polymorphism in Scala by providing conversions from one type to another, and implicit values provide the compiler with default values that can be supplied to function calls through implicit parameters. The combination of implicit conversions and implicit parameters is what allows us to chain transform and predict operations together in a type-safe manner.
As we mentioned, the trait (abstract class)
Estimator defines a
fit method. The method has two
(i.e. is a curried function). The
first parameter list
takes the input (training)
DataSet and the parameters for the estimator. The second parameter
list takes one
implicit parameter, of type
FitOperation is a class that also
fit method, and this is where the actual logic of training the concrete Estimators
should be implemented. The
fit method of
Estimator is essentially a wrapper around the fit
predict method of
Predictor and the
transform method of
Transform are designed in a similar manner, with a respective operation class.
In these methods the operation object is provided as an implicit parameter. Scala will look for implicits in the companion object of a type, so classes that implement these interfaces should provide these objects as implicit objects inside the companion object.
As an example we can look at the
Transformer, so it has access to its
These two functions expect objects of
TransformOperation as implicit parameters,
transform methods respectively, which
StandardScaler provides in its companion
StandardScaler does not override the
fit method of
Estimator or the
Transformer. Rather, its implementations of
override their respective
transform methods, which are then called by the
transform methods of
Transformer. Similarly, a class that implements
Predictor should define an implicit
PredictOperation object inside its companion object.
Apart from the
transform operations that we listed above, the
transform operations for input of type
This allows us to use the algorithm for input that is labeled or unlabeled, and this happens
automatically, depending on the type of the input that we give to the fit and transform
operations. The correct implicit operation is chosen by the compiler, depending on the input type.
If we try to call the
transform methods with types that are not supported we will get a
runtime error before the job is launched.
While it would be possible to catch these kinds of errors at compile time as well, the error
messages that we are able to provide the user would be much less informative, which is why we chose
to throw runtime exceptions instead.
Chaining is achieved by calling
chainPredictor on an object
of a class that implements
Transformer. These methods return a
ChainedPredictor object respectively. As we mentioned,
ChainedTransformer objects can be
chained further, while
ChainedPredictor objects cannot. These classes take care of applying
fit, transform, and predict operations for a pair of successive transformers or
a transformer and a predictor. They also act recursively if the length of the
chain is larger than two, since every
ChainedTransformer defines a
operation that can be further chained with more transformers or a predictor.
It is important to note that developers and users do not need to worry about chaining when implementing their algorithms, all this is handled automatically by FlinkML.
In order to support FlinkML’s pipelining, algorithms have to adhere to a certain design pattern, which we will describe in this section.
Let’s assume that we want to implement a pipeline operator which changes the mean of your data.
Since centering data is a common pre-processing step in many analysis pipelines, we will implement it as a
Therefore, we first create a
MeanTransformer class which inherits from
Since we want to be able to configure the mean of the resulting data, we have to add a configuration parameter.
Parameters are defined in the companion object of the transformer class and extend the
Since the parameter instances are supposed to act as immutable keys for a parameter map, they should be implemented as
The default value will be used if no other value has been set by the user of this component.
If no default value has been specified, meaning that
defaultValue = None, then the algorithm has to handle this situation accordingly.
We can now instantiate a
MeanTransformer object and set the mean value of the transformed data.
But we still have to implement how the transformation works.
The workflow can be separated into two phases.
Within the first phase, the transformer learns the mean of the given training data.
This knowledge can then be used in the second phase to transform the provided data with respect to the configured resulting mean value.
The learning of the mean can be implemented within the
fit operation of our
Transformer, which it inherited from
fit operation, a pipeline component is trained with respect to the given training data.
The algorithm is, however, not implemented by overriding the
fit method but by providing an implementation of a corresponding
FitOperation for the correct type.
Taking a look at the definition of the
fit method in
Estimator, which is the parent class of
Transformer, reveals what why this is the case.
We see that the
fit method is called with an input data set of type
Training, an optional parameter list and in the second parameter list with an implicit parameter of type
Within the body of the function, first some machine learning types are registered and then the
fit method of the
FitOperation parameter is called.
The instance gives itself, the parameter map and the training data set as a parameters to the method.
Thus, all the program logic takes place within the
FitOperation has two type parameters.
The first defines the pipeline operator type for which this
FitOperation shall work and the second type parameter defines the type of the data set elements.
If we first wanted to implement the
MeanTransformer to work on
DenseVector, we would, thus, have to provide an implementation for
FitOperation[T, I] has a
fit method which is called with an instance of type
T, a parameter map and an input
In our case
The parameter map is necessary if our fit step depends on some parameter values which were not given directly at creation time of the
FitOperation of the
MeanTransformer sums the
DenseVector instances of the given input data set up and divides the result by the total number of vectors.
That way, we obtain a
DataSet[DenseVector] with a single element which is the mean value.
But if we look closely at the implementation, we see that the result of the mean computation is never stored anywhere.
If we want to use this knowledge in a later step to adjust the mean of some other input, we have to keep it around.
And here is where the parameter of type
MeanTransformer which is given to the
fit method comes into play.
We can use this instance to store state, which is used by a subsequent
transform operation which works on the same object.
But first we have to extend
MeanTransformer by a member field and then adjust the
If we look at the
transform method in
Transformer, we will see that we also need an implementation of
A possible mean transforming implementation could look like the following.
Now we have everything implemented to fit our
MeanTransformer to a training data set of
DenseVector instances and to transform them.
However, when we execute the
we receive the following error at runtime:
"There is no FitOperation defined for class MeanTransformer which trains on a DataSet[org.apache.flink.ml.math.DenseVector]".
The reason is that the Scala compiler could not find a fitting
FitOperation value with the right type parameters for the implicit parameter of the
Therefore, it chose a fallback implicit value which gives you this error message at runtime.
In order to make the compiler aware of our implementation, we have to define it as an implicit value and put it in the scope of the
MeanTransformer's companion object.
Now we can call
transform of our
DataSet[DenseVector] as input.
Furthermore, we can now use this transformer as part of an analysis pipeline where we have a
DenseVector as input and expected output.
It is noteworthy that there is no additional code needed to enable chaining. The system automatically constructs the pipeline logic using the operations of the individual components.
So far everything works fine with
But what happens, if we call our transformer with
As before we see the following exception upon execution of the program:
"There is no FitOperation defined for class MeanTransformer which trains on a DataSet[org.apache.flink.ml.common.LabeledVector]".
It is noteworthy, that this exception is thrown in the pre-flight phase, which means that the job has not been submitted to the runtime system.
This has the advantage that you won’t see a job which runs for a couple of days and then fails because of an incompatible pipeline component.
Type compatibility is, thus, checked at the very beginning for the complete job.
In order to make the
MeanTransformer work on
LabeledVector as well, we have to provide the corresponding operations.
Consequently, we have to define a
FitOperation[MeanTransformer, LabeledVector] and
TransformOperation[MeanTransformer, LabeledVector, LabeledVector] as implicit values in the scope of
MeanTransformer’s companion object.
If we wanted to implement a
Predictor instead of a
Transformer, then we would have to provide a
Predictor requires a
PredictOperation which implements how predictions are calculated from testing data.