String Indexer

String Indexer #

String Indexer maps one or more columns (string/numerical value) of the input to one or more indexed output columns (integer value). The output indices of two data points are the same iff their corresponding input columns are the same. The indices are in [0, numDistinctValuesInThisColumn].

IndexToStringModel transforms input index column(s) to string column(s) using the model data computed by StringIndexer. It is a reverse operation of StringIndexerModel.

Input Columns #

Param name Type Default Description
inputCols Number/String null string/numerical values to be indexed.

Output Columns #

Param name Type Default Description
outputCols Double null Indices of string/numerical values.

Parameters #

Below are the parameters required by StringIndexerModel.

Key Default Type Required Description
inputCols null String yes Input column names.
outputCols null String yes Output column names.
handleInvalid HasHandleInvalid.ERROR_INVALID String No Strategy to handle invalid entries.

StringIndexer needs parameters above and also below.

Key Default Type Required Description
stringOrderType StringIndexerParams.ARBITRARY_ORDER String no How to order strings of each column.

Examples #

import org.apache.flink.ml.feature.stringindexer.StringIndexer;
import org.apache.flink.ml.feature.stringindexer.StringIndexerModel;
import org.apache.flink.ml.feature.stringindexer.StringIndexerParams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;

import java.util.Arrays;

/** Simple program that trains a StringIndexer model and uses it for feature engineering. */
public class StringIndexerExample {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // Generates input training and prediction data.
        DataStream<Row> trainStream =
                env.fromElements(
                        Row.of("a", 1.0),
                        Row.of("b", 1.0),
                        Row.of("b", 2.0),
                        Row.of("c", 0.0),
                        Row.of("d", 2.0),
                        Row.of("a", 2.0),
                        Row.of("b", 2.0),
                        Row.of("b", -1.0),
                        Row.of("a", -1.0),
                        Row.of("c", -1.0));
        Table trainTable = tEnv.fromDataStream(trainStream).as("inputCol1", "inputCol2");

        DataStream<Row> predictStream =
                env.fromElements(Row.of("a", 2.0), Row.of("b", 1.0), Row.of("c", 2.0));
        Table predictTable = tEnv.fromDataStream(predictStream).as("inputCol1", "inputCol2");

        // Creates a StringIndexer object and initializes its parameters.
        StringIndexer stringIndexer =
                new StringIndexer()
                        .setStringOrderType(StringIndexerParams.ALPHABET_ASC_ORDER)
                        .setInputCols("inputCol1", "inputCol2")
                        .setOutputCols("outputCol1", "outputCol2");

        // Trains the StringIndexer Model.
        StringIndexerModel model = stringIndexer.fit(trainTable);

        // Uses the StringIndexer Model for predictions.
        Table outputTable = model.transform(predictTable)[0];

        // Extracts and displays the results.
        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
            Row row = it.next();

            Object[] inputValues = new Object[stringIndexer.getInputCols().length];
            double[] outputValues = new double[stringIndexer.getInputCols().length];
            for (int i = 0; i < inputValues.length; i++) {
                inputValues[i] = row.getField(stringIndexer.getInputCols()[i]);
                outputValues[i] = (double) row.getField(stringIndexer.getOutputCols()[i]);
            }

            System.out.printf(
                    "Input Values: %s \tOutput Values: %s\n",
                    Arrays.toString(inputValues), Arrays.toString(outputValues));
        }
    }
}

# Simple program that trains a StringIndexer model and uses it for feature
# engineering.
#
# Before executing this program, please make sure you have followed Flink ML's
# quick start guideline to set up Flink ML and Flink environment. The guideline
# can be found at
#
# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.ml.lib.feature.stringindexer import StringIndexer
from pyflink.table import StreamTableEnvironment

# create a new StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()

# create a StreamTableEnvironment
t_env = StreamTableEnvironment.create(env)

# generate input training and prediction data
train_table = t_env.from_data_stream(
    env.from_collection([
        ('a', 1.),
        ('b', 1.),
        ('b', 2.),
        ('c', 0.),
        ('d', 2.),
        ('a', 2.),
        ('b', 2.),
        ('b', -1.),
        ('a', -1.),
        ('c', -1.),
    ],
        type_info=Types.ROW_NAMED(
            ['input_col1', 'input_col2'],
            [Types.STRING(), Types.DOUBLE()])
    ))

predict_table = t_env.from_data_stream(
    env.from_collection([
        ('a', 2.),
        ('b', 1.),
        ('c', 2.),
    ],
        type_info=Types.ROW_NAMED(
            ['input_col1', 'input_col2'],
            [Types.STRING(), Types.DOUBLE()])
    ))

# create a string-indexer object and initialize its parameters
string_indexer = StringIndexer() \
    .set_string_order_type('alphabetAsc') \
    .set_input_cols('input_col1', 'input_col2') \
    .set_output_cols('output_col1', 'output_col2')

# train the string-indexer model
model = string_indexer.fit(train_table)

# use the string-indexer model for feature engineering
output = model.transform(predict_table)[0]

# extract and display the results
field_names = output.get_schema().get_field_names()
input_values = [None for _ in string_indexer.get_input_cols()]
output_values = [None for _ in string_indexer.get_input_cols()]
for result in t_env.to_data_stream(output).execute_and_collect():
    for i in range(len(string_indexer.get_input_cols())):
        input_values[i] = result[field_names.index(string_indexer.get_input_cols()[i])]
        output_values[i] = result[field_names.index(string_indexer.get_output_cols()[i])]
    print('Input Values: ' + str(input_values) + '\tOutput Values: ' + str(output_values))

Index To String #

IndexToStringModel transforms input index column(s) to string column(s) using the model data computed by StringIndexer. It is a reverse operation of StringIndexerModel.

Input Columns #

Param name Type Default Description
inputCols Integer null Indices to be transformed to string.

Output Columns #

Param name Type Default Description
outputCols String null Transformed strings.

Parameters #

Below are the parameters required by StringIndexerModel.

Key Default Type Required Description
inputCols null String yes Input column names.
outputCols null String yes Output column names.

Examples #

import org.apache.flink.ml.feature.stringindexer.IndexToStringModel;
import org.apache.flink.ml.feature.stringindexer.StringIndexerModelData;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;

import java.util.Arrays;

/**
 * Simple program that creates an IndexToStringModelExample instance and uses it for feature
 * engineering.
 */
public class IndexToStringModelExample {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // Creates model data for IndexToStringModel.
        StringIndexerModelData modelData =
                new StringIndexerModelData(
                        new String[][] {{"a", "b", "c", "d"}, {"-1.0", "0.0", "1.0", "2.0"}});
        Table modelTable = tEnv.fromDataStream(env.fromElements(modelData)).as("stringArrays");

        // Generates input data.
        DataStream<Row> predictStream = env.fromElements(Row.of(0, 3), Row.of(1, 2));
        Table predictTable = tEnv.fromDataStream(predictStream).as("inputCol1", "inputCol2");

        // Creates an indexToStringModel object and initializes its parameters.
        IndexToStringModel indexToStringModel =
                new IndexToStringModel()
                        .setInputCols("inputCol1", "inputCol2")
                        .setOutputCols("outputCol1", "outputCol2")
                        .setModelData(modelTable);

        // Uses the indexToStringModel object for feature transformations.
        Table outputTable = indexToStringModel.transform(predictTable)[0];

        // Extracts and displays the results.
        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
            Row row = it.next();

            int[] inputValues = new int[indexToStringModel.getInputCols().length];
            String[] outputValues = new String[indexToStringModel.getInputCols().length];
            for (int i = 0; i < inputValues.length; i++) {
                inputValues[i] = (int) row.getField(indexToStringModel.getInputCols()[i]);
                outputValues[i] = (String) row.getField(indexToStringModel.getOutputCols()[i]);
            }

            System.out.printf(
                    "Input Values: %s \tOutput Values: %s\n",
                    Arrays.toString(inputValues), Arrays.toString(outputValues));
        }
    }
}

# Simple program that creates an IndexToStringModelExample instance and uses it
# for feature engineering.
#
# Before executing this program, please make sure you have followed Flink ML's
# quick start guideline to set up Flink ML and Flink environment. The guideline
# can be found at
#
# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.ml.lib.feature.stringindexer import IndexToStringModel
from pyflink.table import StreamTableEnvironment

# create a new StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()

# create a StreamTableEnvironment
t_env = StreamTableEnvironment.create(env)

# generate input data
predict_table = t_env.from_data_stream(
    env.from_collection([
        (0, 3),
        (1, 2),
    ],
        type_info=Types.ROW_NAMED(
            ['input_col1', 'input_col2'],
            [Types.INT(), Types.INT()])
    ))

# create an index-to-string model and initialize its parameters and model data
model_data_table = t_env.from_data_stream(
    env.from_collection([
        ([['a', 'b', 'c', 'd'], [-1., 0., 1., 2.]],),
    ],
        type_info=Types.ROW_NAMED(
            ['stringArrays'],
            [Types.OBJECT_ARRAY(Types.OBJECT_ARRAY(Types.STRING()))])
    ))

model = IndexToStringModel() \
    .set_input_cols('input_col1', 'input_col2') \
    .set_output_cols('output_col1', 'output_col2') \
    .set_model_data(model_data_table)

# use the index-to-string model for feature engineering
output = model.transform(predict_table)[0]

# extract and display the results
field_names = output.get_schema().get_field_names()
input_values = [None for _ in model.get_input_cols()]
output_values = [None for _ in model.get_input_cols()]
for result in t_env.to_data_stream(output).execute_and_collect():
    for i in range(len(model.get_input_cols())):
        input_values[i] = result[field_names.index(model.get_input_cols()[i])]
        output_values[i] = result[field_names.index(model.get_output_cols()[i])]
    print('Input Values: ' + str(input_values) + '\tOutput Values: ' + str(output_values))