RobustScaler #
RobustScaler is an algorithm that scales features using statistics that are robust to outliers.
This Scaler removes the median and scales the data according to the quantile range (defaults to IQR: Interquartile Range). The IQR is the range between the 1st quartile (25th quantile) and the 3rd quartile (75th quantile) but can be configured.
Centering and scaling happen independently on each feature by computing the relevant statistics on the samples in the training set. Median and quantile range are then stored to be used on later data using the transform method.
Standardization of a dataset is a common requirement for many machine learning estimators. Typically this is done by removing the mean and scaling to unit variance. However, outliers can often influence the sample mean / variance in a negative way. In such cases, the median and the interquartile range often give better results.
Note that NaN values are ignored in the computation of medians and ranges.
Input Columns #
Param name | Type | Default | Description |
---|---|---|---|
inputCol | Vector | "input" |
Features to be scaled. |
Output Columns #
Param name | Type | Default | Description |
---|---|---|---|
outputCol | Vector | "output" |
Scaled features. |
Parameters #
Below are the parameters required by RobustScalerModel
.
Key | Default | Type | Required | Description |
---|---|---|---|---|
inputCol | "input" |
String | no | Input column name. |
outputCol | "output" |
String | no | Output column name. |
withCentering | false |
Boolean | no | Whether to center the data with median before scaling. |
withScaling | true |
Boolean | no | Whether to scale the data to quantile range. |
RobustScaler
needs parameters above and also below.
Key | Default | Type | Required | Description |
---|---|---|---|---|
lower | 0.25 |
Double | no | Lower quantile to calculate quantile range. |
upper | 0.75 |
Double | no | Upper quantile to calculate quantile range. |
relativeError | 0.001 |
Double | no | The relative target precision for the approximate quantile algorithm. |
Examples #
import org.apache.flink.ml.feature.robustscaler.RobustScaler;
import org.apache.flink.ml.feature.robustscaler.RobustScalerModel;
import org.apache.flink.ml.linalg.DenseVector;
import org.apache.flink.ml.linalg.Vectors;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
/** Simple program that trains a {@link RobustScaler} model and uses it for feature selection. */
public class RobustScalerExample {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// Generates input training and prediction data.
DataStream<Row> trainStream =
env.fromElements(
Row.of(1, Vectors.dense(0.0, 0.0)),
Row.of(2, Vectors.dense(1.0, -1.0)),
Row.of(3, Vectors.dense(2.0, -2.0)),
Row.of(4, Vectors.dense(3.0, -3.0)),
Row.of(5, Vectors.dense(4.0, -4.0)),
Row.of(6, Vectors.dense(5.0, -5.0)),
Row.of(7, Vectors.dense(6.0, -6.0)),
Row.of(8, Vectors.dense(7.0, -7.0)),
Row.of(9, Vectors.dense(8.0, -8.0)));
Table trainTable = tEnv.fromDataStream(trainStream).as("id", "input");
// Creates a RobustScaler object and initializes its parameters.
RobustScaler robustScaler =
new RobustScaler()
.setLower(0.25)
.setUpper(0.75)
.setRelativeError(0.001)
.setWithScaling(true)
.setWithCentering(true);
// Trains the RobustScaler model.
RobustScalerModel model = robustScaler.fit(trainTable);
// Uses the RobustScaler model for predictions.
Table outputTable = model.transform(trainTable)[0];
// Extracts and displays the results.
for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
Row row = it.next();
DenseVector inputValue = (DenseVector) row.getField(robustScaler.getInputCol());
DenseVector outputValue = (DenseVector) row.getField(robustScaler.getOutputCol());
System.out.printf("Input Value: %-15s\tOutput Value: %s\n", inputValue, outputValue);
}
}
}
# Simple program that creates a RobustScaler instance and uses it for feature
# engineering.
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
from pyflink.ml.linalg import Vectors, DenseVectorTypeInfo
from pyflink.ml.feature.robustscaler import RobustScaler
# Creates a new StreamExecutionEnvironment.
env = StreamExecutionEnvironment.get_execution_environment()
# Creates a StreamTableEnvironment.
t_env = StreamTableEnvironment.create(env)
# Generates input training and prediction data.
train_data = t_env.from_data_stream(
env.from_collection([
(1, Vectors.dense(0.0, 0.0),),
(2, Vectors.dense(1.0, -1.0),),
(3, Vectors.dense(2.0, -2.0),),
(4, Vectors.dense(3.0, -3.0),),
(5, Vectors.dense(4.0, -4.0),),
(6, Vectors.dense(5.0, -5.0),),
(7, Vectors.dense(6.0, -6.0),),
(8, Vectors.dense(7.0, -7.0),),
(9, Vectors.dense(8.0, -8.0),),
],
type_info=Types.ROW_NAMED(
['id', 'input'],
[Types.INT(), DenseVectorTypeInfo()])
))
# Creates an RobustScaler object and initializes its parameters.
robust_scaler = RobustScaler()\
.set_lower(0.25)\
.set_upper(0.75)\
.set_relative_error(0.001)\
.set_with_scaling(True)\
.set_with_centering(True)
# Trains the RobustScaler Model.
model = robust_scaler.fit(train_data)
# Uses the RobustScaler Model for predictions.
output = model.transform(train_data)[0]
# Extracts and displays the results.
field_names = output.get_schema().get_field_names()
for result in t_env.to_data_stream(output).execute_and_collect():
input_index = field_names.index(robust_scaler.get_input_col())
output_index = field_names.index(robust_scaler.get_output_col())
print('Input Value: ' + str(result[input_index]) +
'\tOutput Value: ' + str(result[output_index]))