StringIndexer #
StringIndexer maps one or more columns (string/numerical value) of the input to one or more indexed output columns (integer value). The output indices of two data points are the same iff their corresponding input columns are the same. The indices are in [0, numDistinctValuesInThisColumn].
IndexToStringModel transforms input index column(s) to string column(s) using the model data computed by StringIndexer. It is a reverse operation of StringIndexerModel.
Input Columns #
Param name | Type | Default | Description |
---|---|---|---|
inputCols | Number/String | null |
String/Numerical values to be indexed. |
Output Columns #
Param name | Type | Default | Description |
---|---|---|---|
outputCols | Double | null |
Indices of string/numerical values. |
Parameters #
Below are the parameters required by StringIndexerModel
.
Key | Default | Type | Required | Description |
---|---|---|---|---|
inputCols | null |
String[] | yes | Input column names. |
outputCols | null |
String[] | yes | Output column names. |
handleInvalid | "error" |
String | no | Strategy to handle invalid entries. Supported values: ‘error’, ‘skip’, ‘keep’. |
StringIndexer
needs parameters above and also below.
Key | Default | Type | Required | Description |
---|---|---|---|---|
stringOrderType | "arbitrary" |
String | no | How to order strings of each column. Supported values: ‘arbitrary’, ‘frequencyDesc’, ‘frequencyAsc’, ‘alphabetDesc’, ‘alphabetAsc’. |
MaxIndexNum | 2147483647 |
Integer | no | The max number of indices for each column. It only works when ‘stringOrderType’ is set as ‘frequencyDesc’. |
Examples #
import org.apache.flink.ml.feature.stringindexer.StringIndexer;
import org.apache.flink.ml.feature.stringindexer.StringIndexerModel;
import org.apache.flink.ml.feature.stringindexer.StringIndexerParams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import java.util.Arrays;
/** Simple program that trains a StringIndexer model and uses it for feature engineering. */
public class StringIndexerExample {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// Generates input training and prediction data.
DataStream<Row> trainStream =
env.fromElements(
Row.of("a", 1.0),
Row.of("b", 1.0),
Row.of("b", 2.0),
Row.of("c", 0.0),
Row.of("d", 2.0),
Row.of("a", 2.0),
Row.of("b", 2.0),
Row.of("b", -1.0),
Row.of("a", -1.0),
Row.of("c", -1.0));
Table trainTable = tEnv.fromDataStream(trainStream).as("inputCol1", "inputCol2");
DataStream<Row> predictStream =
env.fromElements(Row.of("a", 2.0), Row.of("b", 1.0), Row.of("c", 2.0));
Table predictTable = tEnv.fromDataStream(predictStream).as("inputCol1", "inputCol2");
// Creates a StringIndexer object and initializes its parameters.
StringIndexer stringIndexer =
new StringIndexer()
.setStringOrderType(StringIndexerParams.ALPHABET_ASC_ORDER)
.setInputCols("inputCol1", "inputCol2")
.setOutputCols("outputCol1", "outputCol2");
// Trains the StringIndexer Model.
StringIndexerModel model = stringIndexer.fit(trainTable);
// Uses the StringIndexer Model for predictions.
Table outputTable = model.transform(predictTable)[0];
// Extracts and displays the results.
for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
Row row = it.next();
Object[] inputValues = new Object[stringIndexer.getInputCols().length];
double[] outputValues = new double[stringIndexer.getInputCols().length];
for (int i = 0; i < inputValues.length; i++) {
inputValues[i] = row.getField(stringIndexer.getInputCols()[i]);
outputValues[i] = (double) row.getField(stringIndexer.getOutputCols()[i]);
}
System.out.printf(
"Input Values: %s \tOutput Values: %s\n",
Arrays.toString(inputValues), Arrays.toString(outputValues));
}
}
}
# Simple program that trains a StringIndexer model and uses it for feature
# engineering.
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.ml.feature.stringindexer import StringIndexer
from pyflink.table import StreamTableEnvironment
# create a new StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
# create a StreamTableEnvironment
t_env = StreamTableEnvironment.create(env)
# generate input training and prediction data
train_table = t_env.from_data_stream(
env.from_collection([
('a', 1.),
('b', 1.),
('b', 2.),
('c', 0.),
('d', 2.),
('a', 2.),
('b', 2.),
('b', -1.),
('a', -1.),
('c', -1.),
],
type_info=Types.ROW_NAMED(
['input_col1', 'input_col2'],
[Types.STRING(), Types.DOUBLE()])
))
predict_table = t_env.from_data_stream(
env.from_collection([
('a', 2.),
('b', 1.),
('c', 2.),
],
type_info=Types.ROW_NAMED(
['input_col1', 'input_col2'],
[Types.STRING(), Types.DOUBLE()])
))
# create a string-indexer object and initialize its parameters
string_indexer = StringIndexer() \
.set_string_order_type('alphabetAsc') \
.set_input_cols('input_col1', 'input_col2') \
.set_output_cols('output_col1', 'output_col2')
# train the string-indexer model
model = string_indexer.fit(train_table)
# use the string-indexer model for feature engineering
output = model.transform(predict_table)[0]
# extract and display the results
field_names = output.get_schema().get_field_names()
input_values = [None for _ in string_indexer.get_input_cols()]
output_values = [None for _ in string_indexer.get_input_cols()]
for result in t_env.to_data_stream(output).execute_and_collect():
for i in range(len(string_indexer.get_input_cols())):
input_values[i] = result[field_names.index(string_indexer.get_input_cols()[i])]
output_values[i] = result[field_names.index(string_indexer.get_output_cols()[i])]
print('Input Values: ' + str(input_values) + '\tOutput Values: ' + str(output_values))