This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.
$$ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \newcommand\rfrac[2]{^{#1}\!/_{#2}} \newcommand{\norm}[1]{\left\lVert#1\right\rVert} $$

Alternating Least Squares

Description

The alternating least squares (ALS) algorithm factorizes a given matrix $R$ into two factors $U$ and $V$ such that $R \approx U^TV$. The unknown row dimension is given as a parameter to the algorithm and is called latent factors. Since matrix factorization can be used in the context of recommendation, the matrices $U$ and $V$ can be called user and item matrix, respectively. The $i$th column of the user matrix is denoted by $u_i$ and the $i$th column of the item matrix is $v_i$. The matrix $R$ can be called the ratings matrix with .

In order to find the user and item matrix, the following problem is solved:

with $\lambda$ being the regularization factor, being the number of items the user $i$ has rated and being the number of times the item $j$ has been rated. This regularization scheme to avoid overfitting is called weighted-$\lambda$-regularization. Details can be found in the work of Zhou et al..

By fixing one of the matrices $U$ or $V$, we obtain a quadratic form which can be solved directly. The solution of the modified problem is guaranteed to monotonically decrease the overall cost function. By applying this step alternately to the matrices $U$ and $V$, we can iteratively improve the matrix factorization.

The matrix $R$ is given in its sparse representation as a tuple of $(i, j, r)$ where $i$ denotes the row index, $j$ the column index and $r$ is the matrix value at position $(i,j)$.

Operations

ALS is a Predictor. As such, it supports the fit and predict operation.

Fit

ALS is trained on the sparse representation of the rating matrix:

  • fit: DataSet[(Int, Int, Double)] => Unit

Predict

ALS predicts for each tuple of row and column index the rating:

  • predict: DataSet[(Int, Int)] => DataSet[(Int, Int, Double)]

Parameters

The alternating least squares implementation can be controlled by the following parameters:

Parameters Description
NumFactors

The number of latent factors to use for the underlying model. It is equivalent to the dimension of the calculated user and item vectors. (Default value: 10)

Lambda

Regularization factor. Tune this value in order to avoid overfitting or poor performance due to strong generalization. (Default value: 1)

Iterations

The maximum number of iterations. (Default value: 10)

Blocks

The number of blocks into which the user and item matrix are grouped. The fewer blocks one uses, the less data is sent redundantly. However, bigger blocks entail bigger update messages which have to be stored on the heap. If the algorithm fails because of an OutOfMemoryException, then try to increase the number of blocks. (Default value: None)

Seed

Random seed used to generate the initial item matrix for the algorithm. (Default value: 0)

TemporaryPath

Path to a temporary directory into which intermediate results are stored. If this value is set, then the algorithm is split into two preprocessing steps, the ALS iteration and a post-processing step which calculates a last ALS half-step. The preprocessing steps calculate the OutBlockInformation and InBlockInformation for the given rating matrix. The results of the individual steps are stored in the specified directory. By splitting the algorithm into multiple smaller steps, Flink does not have to split the available memory amongst too many operators. This allows the system to process bigger individual messages and improves the overall performance. (Default value: None)

Examples

// Read input data set from a csv file
val inputDS: DataSet[(Int, Int, Double)] = env.readCsvFile[(Int, Int, Double)](
  pathToTrainingFile)

// Setup the ALS learner
val als = ALS()
.setIterations(10)
.setNumFactors(10)
.setBlocks(100)
.setTemporaryPath("hdfs://tempPath")

// Set the other parameters via a parameter map
val parameters = ParameterMap()
.add(ALS.Lambda, 0.9)
.add(ALS.Seed, 42L)

// Calculate the factorization
als.fit(inputDS, parameters)

// Read the testing data set from a csv file
val testingDS: DataSet[(Int, Int)] = env.readCsvFile[(Int, Int)](pathToData)

// Calculate the ratings according to the matrix factorization
val predictedRatings = als.predict(testingDS)