Interaction takes vector or numerical columns, and generates a single vector column that contains
the product of all combinations of one value from each input column.
For example, when the input feature values are Double(2) and Vector(3, 4), the output would be
Vector(6, 8). When the input feature values are Vector(1, 2) and Vector(3, 4), the output would
be Vector(3, 4, 6, 8). If you change the position of these two input Vectors, the output would
be Vector(3, 6, 4, 8).
importorg.apache.flink.ml.feature.interaction.Interaction;importorg.apache.flink.ml.linalg.Vector;importorg.apache.flink.ml.linalg.Vectors;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;importorg.apache.flink.types.Row;importorg.apache.flink.util.CloseableIterator;importjava.util.Arrays;/** Simple program that creates an Interaction instance and uses it for feature engineering. */publicclassInteractionExample{publicstaticvoidmain(String[]args){StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);// Generates input data.
DataStream<Row>inputStream=env.fromElements(Row.of(0,Vectors.dense(1.1,3.2),Vectors.dense(2,3)),Row.of(1,Vectors.dense(2.1,3.1),Vectors.dense(1,3)));TableinputTable=tEnv.fromDataStream(inputStream).as("f0","f1","f2");// Creates an Interaction object and initializes its parameters.
Interactioninteraction=newInteraction().setInputCols("f0","f1","f2").setOutputCol("outputVec");// Transforms input data.
TableoutputTable=interaction.transform(inputTable)[0];// Extracts and displays the results.
for(CloseableIterator<Row>it=outputTable.execute().collect();it.hasNext();){Rowrow=it.next();Object[]inputValues=newObject[interaction.getInputCols().length];for(inti=0;i<inputValues.length;i++){inputValues[i]=row.getField(interaction.getInputCols()[i]);}VectoroutputValue=(Vector)row.getField(interaction.getOutputCol());System.out.printf("Input Values: %s \tOutput Value: %s\n",Arrays.toString(inputValues),outputValue);}}}
# Simple program that creates an Interaction instance and uses it for feature# engineering.frompyflink.commonimportTypesfrompyflink.datastreamimportStreamExecutionEnvironmentfrompyflink.ml.linalgimportVectors,DenseVectorTypeInfofrompyflink.ml.feature.interactionimportInteractionfrompyflink.tableimportStreamTableEnvironment# create a new StreamExecutionEnvironmentenv=StreamExecutionEnvironment.get_execution_environment()# create a StreamTableEnvironmentt_env=StreamTableEnvironment.create(env)# generate input datainput_data_table=t_env.from_data_stream(env.from_collection([(1,Vectors.dense(1,2),Vectors.dense(3,4)),(2,Vectors.dense(2,8),Vectors.dense(3,4))],type_info=Types.ROW_NAMED(['f0','f1','f2'],[Types.INT(),DenseVectorTypeInfo(),DenseVectorTypeInfo()])))# create an interaction object and initialize its parametersinteraction=Interaction() \
.set_input_cols('f0','f1','f2') \
.set_output_col('interaction_vec')# use the interaction for feature engineeringoutput=interaction.transform(input_data_table)[0]# extract and display the resultsfield_names=output.get_schema().get_field_names()input_values=[Nonefor_ininteraction.get_input_cols()]forresultint_env.to_data_stream(output).execute_and_collect():foriinrange(len(interaction.get_input_cols())):input_values[i]=result[field_names.index(interaction.get_input_cols()[i])]output_value=result[field_names.index(interaction.get_output_col())]print('Input Values: '+str(input_values)+'\tOutput Value: '+str(output_value))