This documentation is for an unreleased version of Apache Flink Machine Learning Library. We recommend you use the latest stable version.
String Indexer #
String Indexer maps one or more columns (string/numerical value) of the input to one or more indexed output columns (integer value). The output indices of two data points are the same iff their corresponding input columns are the same. The indices are in [0, numDistinctValuesInThisColumn].
IndexToStringModel transforms input index column(s) to string column(s) using the model data computed by StringIndexer. It is a reverse operation of StringIndexerModel.
Input Columns #
Param name | Type | Default | Description |
---|---|---|---|
inputCols | Number/String | null |
String/Numerical values to be indexed. |
Output Columns #
Param name | Type | Default | Description |
---|---|---|---|
outputCols | Double | null |
Indices of string/numerical values. |
Parameters #
Below are the parameters required by StringIndexerModel
.
Key | Default | Type | Required | Description |
---|---|---|---|---|
inputCols | null |
String[] | yes | Input column names. |
outputCols | null |
String[] | yes | Output column names. |
handleInvalid | "error" |
String | no | Strategy to handle invalid entries. Supported values: ‘error’, ‘skip’, ‘keep’. |
StringIndexer
needs parameters above and also below.
Key | Default | Type | Required | Description |
---|---|---|---|---|
stringOrderType | "arbitrary" |
String | no | How to order strings of each column. Supported values: ‘arbitrary’, ‘frequencyDesc’, ‘frequencyAsc’, ‘alphabetDesc’, ‘alphabetAsc’. |
Examples #
import org.apache.flink.ml.feature.stringindexer.StringIndexer;
import org.apache.flink.ml.feature.stringindexer.StringIndexerModel;
import org.apache.flink.ml.feature.stringindexer.StringIndexerParams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import java.util.Arrays;
/** Simple program that trains a StringIndexer model and uses it for feature engineering. */
public class StringIndexerExample {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// Generates input training and prediction data.
DataStream<Row> trainStream =
env.fromElements(
Row.of("a", 1.0),
Row.of("b", 1.0),
Row.of("b", 2.0),
Row.of("c", 0.0),
Row.of("d", 2.0),
Row.of("a", 2.0),
Row.of("b", 2.0),
Row.of("b", -1.0),
Row.of("a", -1.0),
Row.of("c", -1.0));
Table trainTable = tEnv.fromDataStream(trainStream).as("inputCol1", "inputCol2");
DataStream<Row> predictStream =
env.fromElements(Row.of("a", 2.0), Row.of("b", 1.0), Row.of("c", 2.0));
Table predictTable = tEnv.fromDataStream(predictStream).as("inputCol1", "inputCol2");
// Creates a StringIndexer object and initializes its parameters.
StringIndexer stringIndexer =
new StringIndexer()
.setStringOrderType(StringIndexerParams.ALPHABET_ASC_ORDER)
.setInputCols("inputCol1", "inputCol2")
.setOutputCols("outputCol1", "outputCol2");
// Trains the StringIndexer Model.
StringIndexerModel model = stringIndexer.fit(trainTable);
// Uses the StringIndexer Model for predictions.
Table outputTable = model.transform(predictTable)[0];
// Extracts and displays the results.
for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
Row row = it.next();
Object[] inputValues = new Object[stringIndexer.getInputCols().length];
double[] outputValues = new double[stringIndexer.getInputCols().length];
for (int i = 0; i < inputValues.length; i++) {
inputValues[i] = row.getField(stringIndexer.getInputCols()[i]);
outputValues[i] = (double) row.getField(stringIndexer.getOutputCols()[i]);
}
System.out.printf(
"Input Values: %s \tOutput Values: %s\n",
Arrays.toString(inputValues), Arrays.toString(outputValues));
}
}
}
# Simple program that trains a StringIndexer model and uses it for feature
# engineering.
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.ml.feature.stringindexer import StringIndexer
from pyflink.table import StreamTableEnvironment
# create a new StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
# create a StreamTableEnvironment
t_env = StreamTableEnvironment.create(env)
# generate input training and prediction data
train_table = t_env.from_data_stream(
env.from_collection([
('a', 1.),
('b', 1.),
('b', 2.),
('c', 0.),
('d', 2.),
('a', 2.),
('b', 2.),
('b', -1.),
('a', -1.),
('c', -1.),
],
type_info=Types.ROW_NAMED(
['input_col1', 'input_col2'],
[Types.STRING(), Types.DOUBLE()])
))
predict_table = t_env.from_data_stream(
env.from_collection([
('a', 2.),
('b', 1.),
('c', 2.),
],
type_info=Types.ROW_NAMED(
['input_col1', 'input_col2'],
[Types.STRING(), Types.DOUBLE()])
))
# create a string-indexer object and initialize its parameters
string_indexer = StringIndexer() \
.set_string_order_type('alphabetAsc') \
.set_input_cols('input_col1', 'input_col2') \
.set_output_cols('output_col1', 'output_col2')
# train the string-indexer model
model = string_indexer.fit(train_table)
# use the string-indexer model for feature engineering
output = model.transform(predict_table)[0]
# extract and display the results
field_names = output.get_schema().get_field_names()
input_values = [None for _ in string_indexer.get_input_cols()]
output_values = [None for _ in string_indexer.get_input_cols()]
for result in t_env.to_data_stream(output).execute_and_collect():
for i in range(len(string_indexer.get_input_cols())):
input_values[i] = result[field_names.index(string_indexer.get_input_cols()[i])]
output_values[i] = result[field_names.index(string_indexer.get_output_cols()[i])]
print('Input Values: ' + str(input_values) + '\tOutput Values: ' + str(output_values))
Index To String #
IndexToStringModel
transforms input index column(s) to string column(s) using
the model data computed by StringIndexer. It is a reverse operation of
StringIndexerModel.
Input Columns #
Param name | Type | Default | Description |
---|---|---|---|
inputCols | Integer | null |
Indices to be transformed to string. |
Output Columns #
Param name | Type | Default | Description |
---|---|---|---|
outputCols | String | null |
Transformed strings. |
Parameters #
Below are the parameters required by StringIndexerModel
.
Key | Default | Type | Required | Description |
---|---|---|---|---|
inputCols | null |
String | yes | Input column names. |
outputCols | null |
String | yes | Output column names. |
Examples #
import org.apache.flink.ml.feature.stringindexer.IndexToStringModel;
import org.apache.flink.ml.feature.stringindexer.StringIndexerModelData;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import java.util.Arrays;
/**
* Simple program that creates an IndexToStringModelExample instance and uses it for feature
* engineering.
*/
public class IndexToStringModelExample {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// Creates model data for IndexToStringModel.
StringIndexerModelData modelData =
new StringIndexerModelData(
new String[][] {{"a", "b", "c", "d"}, {"-1.0", "0.0", "1.0", "2.0"}});
Table modelTable = tEnv.fromDataStream(env.fromElements(modelData)).as("stringArrays");
// Generates input data.
DataStream<Row> predictStream = env.fromElements(Row.of(0, 3), Row.of(1, 2));
Table predictTable = tEnv.fromDataStream(predictStream).as("inputCol1", "inputCol2");
// Creates an indexToStringModel object and initializes its parameters.
IndexToStringModel indexToStringModel =
new IndexToStringModel()
.setInputCols("inputCol1", "inputCol2")
.setOutputCols("outputCol1", "outputCol2")
.setModelData(modelTable);
// Uses the indexToStringModel object for feature transformations.
Table outputTable = indexToStringModel.transform(predictTable)[0];
// Extracts and displays the results.
for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
Row row = it.next();
int[] inputValues = new int[indexToStringModel.getInputCols().length];
String[] outputValues = new String[indexToStringModel.getInputCols().length];
for (int i = 0; i < inputValues.length; i++) {
inputValues[i] = (int) row.getField(indexToStringModel.getInputCols()[i]);
outputValues[i] = (String) row.getField(indexToStringModel.getOutputCols()[i]);
}
System.out.printf(
"Input Values: %s \tOutput Values: %s\n",
Arrays.toString(inputValues), Arrays.toString(outputValues));
}
}
}
# Simple program that creates an IndexToStringModelExample instance and uses it
# for feature engineering.
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.ml.feature.stringindexer import IndexToStringModel
from pyflink.table import StreamTableEnvironment
# create a new StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
# create a StreamTableEnvironment
t_env = StreamTableEnvironment.create(env)
# generate input data
predict_table = t_env.from_data_stream(
env.from_collection([
(0, 3),
(1, 2),
],
type_info=Types.ROW_NAMED(
['input_col1', 'input_col2'],
[Types.INT(), Types.INT()])
))
# create an index-to-string model and initialize its parameters and model data
model_data_table = t_env.from_data_stream(
env.from_collection([
([['a', 'b', 'c', 'd'], [-1., 0., 1., 2.]],),
],
type_info=Types.ROW_NAMED(
['stringArrays'],
[Types.OBJECT_ARRAY(Types.OBJECT_ARRAY(Types.STRING()))])
))
model = IndexToStringModel() \
.set_input_cols('input_col1', 'input_col2') \
.set_output_cols('output_col1', 'output_col2') \
.set_model_data(model_data_table)
# use the index-to-string model for feature engineering
output = model.transform(predict_table)[0]
# extract and display the results
field_names = output.get_schema().get_field_names()
input_values = [None for _ in model.get_input_cols()]
output_values = [None for _ in model.get_input_cols()]
for result in t_env.to_data_stream(output).execute_and_collect():
for i in range(len(model.get_input_cols())):
input_values[i] = result[field_names.index(model.get_input_cols()[i])]
output_values[i] = result[field_names.index(model.get_output_cols()[i])]
print('Input Values: ' + str(input_values) + '\tOutput Values: ' + str(output_values))