Functions

Functions #

Flink ML provides users with some built-in table functions for data transformations. This page gives a brief overview of them.

vectorToArray #

This function converts a column of Flink ML sparse/dense vectors into a column of double arrays.

import org.apache.flink.ml.linalg.Vector;
import org.apache.flink.ml.linalg.Vectors;
import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;

import java.util.Arrays;
import java.util.List;

import static org.apache.flink.ml.Functions.vectorToArray;
import static org.apache.flink.table.api.Expressions.$;

/** Simple program that converts a column of dense/sparse vectors into a column of double arrays. */
public class VectorToArrayExample {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // Generates input vector data.
        List<Vector> vectors =
                Arrays.asList(
                        Vectors.dense(0.0, 0.0),
                        Vectors.sparse(2, new int[] {1}, new double[] {1.0}));
        Table inputTable =
                tEnv.fromDataStream(env.fromCollection(vectors, VectorTypeInfo.INSTANCE))
                        .as("vector");

        // Converts each vector to a double array.
        Table outputTable = inputTable.select($("vector"), vectorToArray($("vector")).as("array"));

        // Extracts and displays the results.
        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
            Row row = it.next();
            Vector vector = row.getFieldAs("vector");
            Double[] doubleArray = row.getFieldAs("array");
            System.out.printf(
                    "Input vector: %s\tOutput double array: %s\n",
                    vector, Arrays.toString(doubleArray));
        }
    }
}
# Simple program that converts a column of dense/sparse vectors into a column of double arrays.

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

from pyflink.ml.linalg import Vectors, VectorTypeInfo

from pyflink.ml.functions import vector_to_array
from pyflink.table.expressions import col

# create a new StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()

# create a StreamTableEnvironment
t_env = StreamTableEnvironment.create(env)

# generate input vector data
vectors = [
    (Vectors.dense(0.0, 0.0),),
    (Vectors.sparse(2, [1], [1.0]),),
]
input_table = t_env.from_data_stream(
    env.from_collection(
        vectors,
        type_info=Types.ROW_NAMED(
            ['vector'],
            [VectorTypeInfo()])
    ))

# convert each vector to a double array
output_table = input_table.select(vector_to_array(col('vector')).alias('array'))

# extract and display the results
output_values = [x for x in
                 t_env.to_data_stream(output_table).map(lambda r: r).execute_and_collect()]

output_values.sort(key=lambda x: x[0])

field_names = output_table.get_schema().get_field_names()
for i in range(len(output_values)):
    vector = vectors[i][0]
    double_array = output_values[i][field_names.index("array")]
    print("Input vector: %s \t output double array: %s" % (vector, double_array))

arrayToVector #

This function converts a column of arrays of numeric type into a column of DenseVector instances.

import org.apache.flink.ml.linalg.Vector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;

import java.util.Arrays;
import java.util.List;

import static org.apache.flink.ml.Functions.arrayToVector;
import static org.apache.flink.table.api.Expressions.$;

/** Simple program that converts a column of double arrays into a column of dense vectors. */
public class ArrayToVectorExample {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // Generates input double array data.
        List<double[]> doubleArrays =
                Arrays.asList(new double[] {0.0, 0.0}, new double[] {0.0, 1.0});
        Table inputTable = tEnv.fromDataStream(env.fromCollection(doubleArrays)).as("array");

        // Converts each double array to a dense vector.
        Table outputTable = inputTable.select($("array"), arrayToVector($("array")).as("vector"));

        // Extracts and displays the results.
        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
            Row row = it.next();
            Double[] doubleArray = row.getFieldAs("array");
            Vector vector = row.getFieldAs("vector");
            System.out.printf(
                    "Input double array: %s\tOutput vector: %s\n",
                    Arrays.toString(doubleArray), vector);
        }
    }
}
# Simple program that converts a column of double arrays into a column of dense vectors.

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.ml.functions import array_to_vector
from pyflink.table import StreamTableEnvironment
from pyflink.table.expressions import col

# create a new StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()

# create a StreamTableEnvironment
t_env = StreamTableEnvironment.create(env)

# generate input double array data
double_arrays = [
    ([0.0, 0.0],),
    ([0.0, 1.0],),
]
input_table = t_env.from_data_stream(
    env.from_collection(
        double_arrays,
        type_info=Types.ROW_NAMED(
            ['array'],
            [Types.PRIMITIVE_ARRAY(Types.DOUBLE())])
    ))

# convert each double array to a dense vector
output_table = input_table.select(array_to_vector(col('array')).alias('vector'))

# extract and display the results
field_names = output_table.get_schema().get_field_names()

output_values = [x[field_names.index('vector')] for x in
                 t_env.to_data_stream(output_table).execute_and_collect()]

output_values.sort(key=lambda x: x.get(1))

for i in range(len(output_values)):
    double_array = double_arrays[i][0]
    vector = output_values[i]
    print("Input double array: %s \t output vector: %s" % (double_array, vector))