VectorIndexer

VectorIndexer #

VectorIndexer is an algorithm that implements the vector indexing algorithm. A vector indexer maps each column of the input vector into a continuous/categorical feature. Whether one feature is transformed into a continuous or categorical feature depends on the number of distinct values in this column. If the number of distinct values in one column is greater than a specified parameter (i.e., maxCategories), the corresponding output column is unchanged. Otherwise, it is transformed into a categorical value. For categorical outputs, the indices are in [0, numDistinctValuesInThisColumn].

The output model is organized in ascending order except that 0.0 is always mapped to 0 (for sparsity).

Input Columns #

Param name Type Default Description
inputCol Vector "input" Vectors to be indexed.

Output Columns #

Param name Type Default Description
outputCol Vector "output" Indexed vectors.

Parameters #

Below are the parameters required by VectorIndexerModel.

Key Default Type Required Description
inputCol "input" String no Input column name.
outputCol "output" String no Output column name.
handleInvalid "error" String no Strategy to handle invalid entries. Supported values: 'error', 'skip', 'keep'.

VectorIndexer needs parameters above and also below.

Key Default Type Required Description
maxCategories 20 Integer no Threshold for the number of values a categorical feature can take (>= 2). If a feature is found to have > maxCategories values, then it is declared continuous.

Examples #

import org.apache.flink.ml.common.param.HasHandleInvalid;
import org.apache.flink.ml.feature.vectorindexer.VectorIndexer;
import org.apache.flink.ml.feature.vectorindexer.VectorIndexerModel;
import org.apache.flink.ml.linalg.Vectors;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;

import java.util.Arrays;
import java.util.List;

/** Simple program that creates a VectorIndexer instance and uses it for feature engineering. */
public class VectorIndexerExample {
	public static void main(String[] args) {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

		// Generates input data.
		List<Row> trainInput =
			Arrays.asList(
				Row.of(Vectors.dense(1, 1)),
				Row.of(Vectors.dense(2, -1)),
				Row.of(Vectors.dense(3, 1)),
				Row.of(Vectors.dense(4, 0)),
				Row.of(Vectors.dense(5, 0)));

		List<Row> predictInput =
			Arrays.asList(
				Row.of(Vectors.dense(0, 2)),
				Row.of(Vectors.dense(0, 0)),
				Row.of(Vectors.dense(0, -1)));

		Table trainTable = tEnv.fromDataStream(env.fromCollection(trainInput)).as("input");
		Table predictTable = tEnv.fromDataStream(env.fromCollection(predictInput)).as("input");

		// Creates a VectorIndexer object and initializes its parameters.
		VectorIndexer vectorIndexer =
			new VectorIndexer()
				.setInputCol("input")
				.setOutputCol("output")
				.setHandleInvalid(HasHandleInvalid.KEEP_INVALID)
				.setMaxCategories(3);

		// Trains the VectorIndexer Model.
		VectorIndexerModel model = vectorIndexer.fit(trainTable);

		// Uses the VectorIndexer Model for predictions.
		Table outputTable = model.transform(predictTable)[0];

		// Extracts and displays the results.
		for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
			Row row = it.next();
			System.out.printf(
				"Input Value: %s \tOutput Value: %s\n",
				row.getField(vectorIndexer.getInputCol()),
				row.getField(vectorIndexer.getOutputCol()));
		}
	}
}

# Simple program that trains a VectorIndexer model and uses it for feature
# engineering.

from pyflink.common import Types
from pyflink.ml.linalg import Vectors, DenseVectorTypeInfo
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.ml.feature.vectorindexer import VectorIndexer
from pyflink.table import StreamTableEnvironment

# Creates a new StreamExecutionEnvironment.
env = StreamExecutionEnvironment.get_execution_environment()

# Creates a StreamTableEnvironment.
t_env = StreamTableEnvironment.create(env)

# Generates input training and prediction data.
train_table = t_env.from_data_stream(
    env.from_collection([
        (Vectors.dense(1, 1),),
        (Vectors.dense(2, -1),),
        (Vectors.dense(3, 1),),
        (Vectors.dense(4, 0),),
        (Vectors.dense(5, 0),)
    ],
        type_info=Types.ROW_NAMED(
            ['input', ],
            [DenseVectorTypeInfo(), ])))

predict_table = t_env.from_data_stream(
    env.from_collection([
        (Vectors.dense(0, 2),),
        (Vectors.dense(0, 0),),
        (Vectors.dense(0, -1),),
    ],
        type_info=Types.ROW_NAMED(
            ['input', ],
            [DenseVectorTypeInfo(), ])))

# Creates a VectorIndexer object and initializes its parameters.
vector_indexer = VectorIndexer() \
    .set_input_col('input') \
    .set_output_col('output') \
    .set_handle_invalid('keep') \
    .set_max_categories(3)

# Trains the VectorIndexer Model.
model = vector_indexer.fit(train_table)

# Uses the VectorIndexer Model for predictions.
output = model.transform(predict_table)[0]

# Extracts and displays the results.
field_names = output.get_schema().get_field_names()
for result in t_env.to_data_stream(output).execute_and_collect():
    print('Input Value: ' + str(result[field_names.index(vector_indexer.get_input_col())])
          + '\tOutput Value: ' + str(result[field_names.index(vector_indexer.get_output_col())]))