Min Max Scaler
This documentation is for an unreleased version of Apache Flink Machine Learning Library. We recommend you use the latest stable version.

Min Max Scaler #

Min Max Scaler is an algorithm that rescales feature values to a common range [min, max] which defined by user.

Input Columns #

Param name Type Default Description
inputCol Vector "input" features to be scaled

Output Columns #

Param name Type Default Description
outputCol Vector "output" scaled features

Parameters #

Key Default Type Required Description
inputCol "input" String no Input column name.
outputCol "output" String no Output column name.
min 0.0 Double no Lower bound of the output feature range.
max 1.0 Double no Upper bound of the output feature range.

Examples #

import org.apache.flink.ml.feature.minmaxscaler.MinMaxScaler;
import org.apache.flink.ml.feature.minmaxscaler.MinMaxScalerModel;
import org.apache.flink.ml.linalg.DenseVector;
import org.apache.flink.ml.linalg.Vectors;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;

/** Simple program that trains a MinMaxScaler model and uses it for feature engineering. */
public class MinMaxScalerExample {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // Generates input training and prediction data.
        DataStream<Row> trainStream =
                env.fromElements(
                        Row.of(Vectors.dense(0.0, 3.0)),
                        Row.of(Vectors.dense(2.1, 0.0)),
                        Row.of(Vectors.dense(4.1, 5.1)),
                        Row.of(Vectors.dense(6.1, 8.1)),
                        Row.of(Vectors.dense(200, 400)));
        Table trainTable = tEnv.fromDataStream(trainStream).as("input");

        DataStream<Row> predictStream =
                env.fromElements(
                        Row.of(Vectors.dense(150.0, 90.0)),
                        Row.of(Vectors.dense(50.0, 40.0)),
                        Row.of(Vectors.dense(100.0, 50.0)));
        Table predictTable = tEnv.fromDataStream(predictStream).as("input");

        // Creates a MinMaxScaler object and initializes its parameters.
        MinMaxScaler minMaxScaler = new MinMaxScaler();

        // Trains the MinMaxScaler Model.
        MinMaxScalerModel minMaxScalerModel = minMaxScaler.fit(trainTable);

        // Uses the MinMaxScaler Model for predictions.
        Table outputTable = minMaxScalerModel.transform(predictTable)[0];

        // Extracts and displays the results.
        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
            Row row = it.next();
            DenseVector inputValue = (DenseVector) row.getField(minMaxScaler.getInputCol());
            DenseVector outputValue = (DenseVector) row.getField(minMaxScaler.getOutputCol());
            System.out.printf("Input Value: %-15s\tOutput Value: %s\n", inputValue, outputValue);
        }
    }
}

# Simple program that trains a MinMaxScaler model and uses it for feature
# engineering.
#
# Before executing this program, please make sure you have followed Flink ML's
# quick start guideline to set up Flink ML and Flink environment. The guideline
# can be found at
#
# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
from pyflink.ml.lib.feature.minmaxscaler import MinMaxScaler
from pyflink.table import StreamTableEnvironment

# create a new StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()

# create a StreamTableEnvironment
t_env = StreamTableEnvironment.create(env)

# generate input training and prediction data
train_data = t_env.from_data_stream(
    env.from_collection([
        (Vectors.dense(0.0, 3.0),),
        (Vectors.dense(2.1, 0.0),),
        (Vectors.dense(4.1, 5.1),),
        (Vectors.dense(6.1, 8.1),),
        (Vectors.dense(200, 400),),
    ],
        type_info=Types.ROW_NAMED(
            ['input'],
            [DenseVectorTypeInfo()])
    ))

predict_data = t_env.from_data_stream(
    env.from_collection([
        (Vectors.dense(150.0, 90.0),),
        (Vectors.dense(50.0, 40.0),),
        (Vectors.dense(100.0, 50.0),),
    ],
        type_info=Types.ROW_NAMED(
            ['input'],
            [DenseVectorTypeInfo()])
    ))

# create a min-max-scaler object and initialize its parameters
min_max_scaler = MinMaxScaler()

# train the min-max-scaler model
model = min_max_scaler.fit(train_data)

# use the min-max-scaler model for predictions
output = model.transform(predict_data)[0]

# extract and display the results
field_names = output.get_schema().get_field_names()
for result in t_env.to_data_stream(output).execute_and_collect():
    input_value = result[field_names.index(min_max_scaler.get_input_col())]
    output_value = result[field_names.index(min_max_scaler.get_output_col())]
    print('Input Value: ' + str(input_value) + ' \tOutput Value: ' + str(output_value))