SQLTransformer
This documentation is for an unreleased version of Apache Flink Machine Learning Library. We recommend you use the latest stable version.

SQLTransformer #

SQLTransformer implements the transformations that are defined by SQL statement.

Currently we only support SQL syntax like SELECT ... FROM __THIS__ ... where __THIS__ represents the input table and cannot be modified.

The select clause specifies the fields, constants, and expressions to display in the output. Except the cases described in the note section below, it can be any select clause that Flink SQL supports. Users can also use Flink SQL built-in function and UDFs to operate on these selected columns.

For example, SQLTransformer supports statements like:

  • SELECT a, a + b AS a_b FROM __THIS__
  • SELECT a, SQRT(b) AS b_sqrt FROM __THIS__ where a > 5
  • SELECT a, b, SUM(c) AS c_sum FROM __THIS__ GROUP BY a, b

Note: This operator only generates append-only/insert-only table as its output. If the output table could possibly contain retract messages(e.g. perform SELECT ... FROM __THIS__ GROUP BY ... operation on a table in streaming mode), this operator would aggregate all changelogs and only output the final state.

Parameters #

Key Default Type Required Description
statement null String yes SQL statement.

Examples #

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.ml.feature.sqltransformer.SQLTransformer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.util.Arrays;

/** Simple program that creates a SQLTransformer instance and uses it for feature engineering. */
public class SQLTransformerExample {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // Generates input data.
        DataStream<Row> inputStream =
                env.fromCollection(
                        Arrays.asList(Row.of(0, 1.0, 3.0), Row.of(2, 2.0, 5.0)),
                        new RowTypeInfo(Types.INT, Types.DOUBLE, Types.DOUBLE));
        Table inputTable = tEnv.fromDataStream(inputStream).as("id", "v1", "v2");

        // Creates a SQLTransformer object and initializes its parameters.
        SQLTransformer sqlTransformer =
                new SQLTransformer()
                        .setStatement("SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__");

        // Uses the SQLTransformer object for feature transformations.
        Table outputTable = sqlTransformer.transform(inputTable)[0];

        // Extracts and displays the results.
        outputTable.execute().print();
    }
}
# Simple program that creates a SQLTransformer instance and uses it for feature
# engineering.

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.ml.feature.sqltransformer import SQLTransformer
from pyflink.table import StreamTableEnvironment

env = StreamExecutionEnvironment.get_execution_environment()

t_env = StreamTableEnvironment.create(env)

# Generates input data.
input_data_table = t_env.from_data_stream(
    env.from_collection([
        (0, 1.0, 3.0),
        (2, 2.0, 5.0),
    ],
        type_info=Types.ROW_NAMED(
            ['id', 'v1', 'v2'],
            [Types.INT(), Types.DOUBLE(), Types.DOUBLE()])))

# Creates a SQLTransformer object and initializes its parameters.
sql_transformer = SQLTransformer() \
    .set_statement('SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__')

# Uses the SQLTransformer object for feature transformations.
output_table = sql_transformer.transform(input_data_table)[0]

# Extracts and displays the results.
output_table.execute().print()