UnivariateFeatureSelector

UnivariateFeatureSelector #

UnivariateFeatureSelector is an algorithm that selects features based on univariate statistical tests against labels.

Currently, Flink supports three UnivariateFeatureSelectors: chi-squared, ANOVA F-test and F-value. User can choose UnivariateFeatureSelector by setting featureType and labelType, and Flink will pick the score function based on the specified featureType and labelType.

The following combination of featureType and labelType are supported:

  • `featureType` `categorical` and `labelType` `categorical`: Flink uses chi-squared, i.e. chi2 in sklearn.
  • `featureType` `continuous` and `labelType` `categorical`: Flink uses ANOVA F-test, i.e. f_classif in sklearn.
  • `featureType` `continuous` and `labelType` `continuous`: Flink uses F-value, i.e. f_regression in sklearn.

UnivariateFeatureSelector supports different selection modes:

  • numTopFeatures: chooses a fixed number of top features according to a hypothesis.
  • percentile: similar to numTopFeatures but chooses a fraction of all features instead of a fixed number.
  • fpr: chooses all features whose p-value are below a threshold, thus controlling the false positive rate of selection.
  • fdr: uses the Benjamini-Hochberg procedure to choose all features whose false discovery rate is below a threshold.
  • fwe: chooses all features whose p-values are below a threshold. The threshold is scaled by 1/numFeatures, thus controlling the family-wise error rate of selection.

By default, the selection mode is numTopFeatures.

Input Columns #

Param name Type Default Description
featuresCol Vector "features" Feature vector.
labelCol Number "label" Label of the features.

Output Columns #

Param name Type Default Description
outputCol Vector "output" Selected features.

Parameters #

Below are the parameters required by UnivariateFeatureSelectorModel.

Key Default Type Required Description
featuresCol "features" String no Features column name.
outputCol "output" String no Output column name.

UnivariateFeatureSelector needs parameters above and also below.

Key Default Type Required Description
labelCol "label" String no Label column name.
featureType null String yes The feature type. Supported values: ‘categorical’, ‘continuous’.
labelType null String yes The label type. Supported values: ‘categorical’, ‘continuous’.
selectionMode "numTopFeatures" String no The feature selection mode. Supported values: ‘numTopFeatures’, ‘percentile’, ‘fpr’, ‘fdr’, ‘fwe’.
selectionThreshold null Number no The upper bound of the features that selector will select. If not set, it will be replaced with a meaningful value according to different selection modes at runtime. When the mode is numTopFeatures, it will be replaced with 50; when the mode is percentile, it will be replaced with 0.1; otherwise, it will be replaced with 0.05.

Examples #

import org.apache.flink.ml.feature.univariatefeatureselector.UnivariateFeatureSelector;
import org.apache.flink.ml.feature.univariatefeatureselector.UnivariateFeatureSelectorModel;
import org.apache.flink.ml.linalg.DenseVector;
import org.apache.flink.ml.linalg.Vectors;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;

/**
 * Simple program that trains a {@link UnivariateFeatureSelector} model and uses it for feature
 * selection.
 */
public class UnivariateFeatureSelectorExample {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // Generates input training and prediction data.
        DataStream<Row> trainStream =
                env.fromElements(
                        Row.of(Vectors.dense(1.7, 4.4, 7.6, 5.8, 9.6, 2.3), 3.0),
                        Row.of(Vectors.dense(8.8, 7.3, 5.7, 7.3, 2.2, 4.1), 2.0),
                        Row.of(Vectors.dense(1.2, 9.5, 2.5, 3.1, 8.7, 2.5), 1.0),
                        Row.of(Vectors.dense(3.7, 9.2, 6.1, 4.1, 7.5, 3.8), 2.0),
                        Row.of(Vectors.dense(8.9, 5.2, 7.8, 8.3, 5.2, 3.0), 4.0),
                        Row.of(Vectors.dense(7.9, 8.5, 9.2, 4.0, 9.4, 2.1), 4.0));
        Table trainTable = tEnv.fromDataStream(trainStream).as("features", "label");

        // Creates a UnivariateFeatureSelector object and initializes its parameters.
        UnivariateFeatureSelector univariateFeatureSelector =
                new UnivariateFeatureSelector()
                        .setFeaturesCol("features")
                        .setLabelCol("label")
                        .setFeatureType("continuous")
                        .setLabelType("categorical")
                        .setSelectionThreshold(1);

        // Trains the UnivariateFeatureSelector model.
        UnivariateFeatureSelectorModel model = univariateFeatureSelector.fit(trainTable);

        // Uses the UnivariateFeatureSelector model for predictions.
        Table outputTable = model.transform(trainTable)[0];

        // Extracts and displays the results.
        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
            Row row = it.next();
            DenseVector inputValue =
                    (DenseVector) row.getField(univariateFeatureSelector.getFeaturesCol());
            DenseVector outputValue =
                    (DenseVector) row.getField(univariateFeatureSelector.getOutputCol());
            System.out.printf("Input Value: %-15s\tOutput Value: %s\n", inputValue, outputValue);
        }
    }
}

# Simple program that creates a UnivariateFeatureSelector instance and uses it for feature
# engineering.

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.ml.feature.univariatefeatureselector import UnivariateFeatureSelector
from pyflink.table import StreamTableEnvironment

from pyflink.ml.linalg import Vectors, DenseVectorTypeInfo

env = StreamExecutionEnvironment.get_execution_environment()

t_env = StreamTableEnvironment.create(env)

# Generates input training and prediction data.
input_table = t_env.from_data_stream(
    env.from_collection([
        (Vectors.dense(1.7, 4.4, 7.6, 5.8, 9.6, 2.3), 3.0,),
        (Vectors.dense(8.8, 7.3, 5.7, 7.3, 2.2, 4.1), 2.0,),
        (Vectors.dense(1.2, 9.5, 2.5, 3.1, 8.7, 2.5), 1.0,),
        (Vectors.dense(3.7, 9.2, 6.1, 4.1, 7.5, 3.8), 2.0,),
        (Vectors.dense(8.9, 5.2, 7.8, 8.3, 5.2, 3.0), 4.0,),
        (Vectors.dense(7.9, 8.5, 9.2, 4.0, 9.4, 2.1), 4.0,),
    ],
        type_info=Types.ROW_NAMED(
            ['features', 'label'],
            [DenseVectorTypeInfo(), Types.FLOAT()])
    ))

# Creates an UnivariateFeatureSelector object and initializes its parameters.
univariate_feature_selector = UnivariateFeatureSelector() \
    .set_features_col('features') \
    .set_label_col('label') \
    .set_feature_type('continuous') \
    .set_label_type('categorical') \
    .set_selection_threshold(1)

# Trains the UnivariateFeatureSelector Model.
model = univariate_feature_selector.fit(input_table)

# Uses the UnivariateFeatureSelector Model for predictions.
output = model.transform(input_table)[0]

# Extracts and displays the results.
field_names = output.get_schema().get_field_names()
for result in t_env.to_data_stream(output).execute_and_collect():
    input_index = field_names.index(univariate_feature_selector.get_features_col())
    output_index = field_names.index(univariate_feature_selector.get_output_col())
    print('Input Value: ' + str(result[input_index]) +
          '\tOutput Value: ' + str(result[output_index]))