Source code for pyflink.table.udf

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import abc
import collections
import functools
import inspect
from typing import Union, List, Type, Callable, TypeVar, Generic, Iterable

from pyflink.java_gateway import get_gateway
from pyflink.metrics import MetricGroup
from pyflink.table import Expression
from pyflink.table.types import DataType, _to_java_data_type
from pyflink.util import java_utils

__all__ = ['FunctionContext', 'AggregateFunction', 'ScalarFunction', 'TableFunction',
           'TableAggregateFunction', 'udf', 'udtf', 'udaf', 'udtaf']


class FunctionContext(object):
    """
    Used to obtain global runtime information about the context in which the
    user-defined function is executed. The information includes the metric group,
    and global job parameters, etc.
    """

    def __init__(self, base_metric_group):
        self._base_metric_group = base_metric_group

    def get_metric_group(self) -> MetricGroup:
        """
        Returns the metric group for this parallel subtask.

        .. versionadded:: 1.11.0
        """
        if self._base_metric_group is None:
            raise RuntimeError("Metric has not been enabled. You can enable "
                               "metric with the 'python.metric.enabled' configuration.")
        return self._base_metric_group


class UserDefinedFunction(abc.ABC):
    """
    Base interface for user-defined function.

    .. versionadded:: 1.10.0
    """

    def open(self, function_context: FunctionContext):
        """
        Initialization method for the function. It is called before the actual working methods
        and thus suitable for one time setup work.

        :param function_context: the context of the function
        :type function_context: FunctionContext
        """
        pass

    def close(self):
        """
        Tear-down method for the user code. It is called after the last call to the main
        working methods.
        """
        pass

    def is_deterministic(self) -> bool:
        """
        Returns information about the determinism of the function's results.
        It returns true if and only if a call to this function is guaranteed to
        always return the same result given the same parameters. true is assumed by default.
        If the function is not pure functional like random(), date(), now(),
        this method must return false.

        :return: the determinism of the function's results.
        """
        return True


[docs]class ScalarFunction(UserDefinedFunction): """ Base interface for user-defined scalar function. A user-defined scalar functions maps zero, one, or multiple scalar values to a new scalar value. .. versionadded:: 1.10.0 """ @abc.abstractmethod def eval(self, *args): """ Method which defines the logic of the scalar function. """ pass
[docs]class TableFunction(UserDefinedFunction): """ Base interface for user-defined table function. A user-defined table function creates zero, one, or multiple rows to a new row value. .. versionadded:: 1.11.0 """ @abc.abstractmethod def eval(self, *args): """ Method which defines the logic of the table function. """ pass
T = TypeVar('T') ACC = TypeVar('ACC') class ImperativeAggregateFunction(UserDefinedFunction, Generic[T, ACC]): """ Base interface for user-defined aggregate function and table aggregate function. This class is used for unified handling of imperative aggregating functions. Concrete implementations should extend from :class:`~pyflink.table.AggregateFunction` or :class:`~pyflink.table.TableAggregateFunction`. .. versionadded:: 1.13.0 """ @abc.abstractmethod def create_accumulator(self) -> ACC: """ Creates and initializes the accumulator for this AggregateFunction. :return: the accumulator with the initial value """ pass @abc.abstractmethod def accumulate(self, accumulator: ACC, *args): """ Processes the input values and updates the provided accumulator instance. :param accumulator: the accumulator which contains the current aggregated results :param args: the input value (usually obtained from new arrived data) """ pass def retract(self, accumulator: ACC, *args): """ Retracts the input values from the accumulator instance.The current design assumes the inputs are the values that have been previously accumulated. :param accumulator: the accumulator which contains the current aggregated results :param args: the input value (usually obtained from new arrived data). """ raise RuntimeError("Method retract is not implemented") def merge(self, accumulator: ACC, accumulators): """ Merges a group of accumulator instances into one accumulator instance. This method must be implemented for unbounded session window grouping aggregates and bounded grouping aggregates. :param accumulator: the accumulator which will keep the merged aggregate results. It should be noted that the accumulator may contain the previous aggregated results. Therefore user should not replace or clean this instance in the custom merge method. :param accumulators: a group of accumulators that will be merged. """ raise RuntimeError("Method merge is not implemented") def get_result_type(self) -> DataType: """ Returns the DataType of the AggregateFunction's result. :return: The :class:`~pyflink.table.types.DataType` of the AggregateFunction's result. """ raise RuntimeError("Method get_result_type is not implemented") def get_accumulator_type(self) -> DataType: """ Returns the DataType of the AggregateFunction's accumulator. :return: The :class:`~pyflink.table.types.DataType` of the AggregateFunction's accumulator. """ raise RuntimeError("Method get_accumulator_type is not implemented")
[docs]class AggregateFunction(ImperativeAggregateFunction): """ Base interface for user-defined aggregate function. A user-defined aggregate function maps scalar values of multiple rows to a new scalar value. .. versionadded:: 1.12.0 """ @abc.abstractmethod def get_value(self, accumulator: ACC) -> T: """ Called every time when an aggregation result should be materialized. The returned value could be either an early and incomplete result (periodically emitted as data arrives) or the final result of the aggregation. :param accumulator: the accumulator which contains the current intermediate results :return: the aggregation result """ pass
[docs]class TableAggregateFunction(ImperativeAggregateFunction): """ Base class for a user-defined table aggregate function. A user-defined table aggregate function maps scalar values of multiple rows to zero, one, or multiple rows (or structured types). If an output record consists of only one field, the structured record can be omitted, and a scalar value can be emitted that will be implicitly wrapped into a row by the runtime. .. versionadded:: 1.13.0 """ @abc.abstractmethod def emit_value(self, accumulator: ACC) -> Iterable[T]: """ Called every time when an aggregation result should be materialized. The returned value could be either an early and incomplete result (periodically emitted as data arrives) or the final result of the aggregation. :param accumulator: the accumulator which contains the current aggregated results. :return: multiple aggregated result """ pass
class DelegatingScalarFunction(ScalarFunction): """ Helper scalar function implementation for lambda expression and python function. It's for internal use only. """ def __init__(self, func): self.func = func def eval(self, *args): return self.func(*args) class DelegationTableFunction(TableFunction): """ Helper table function implementation for lambda expression and python function. It's for internal use only. """ def __init__(self, func): self.func = func def eval(self, *args): return self.func(*args) class DelegatingPandasAggregateFunction(AggregateFunction): """ Helper pandas aggregate function implementation for lambda expression and python function. It's for internal use only. """ def __init__(self, func): self.func = func def get_value(self, accumulator): return accumulator[0] def create_accumulator(self): return [] def accumulate(self, accumulator, *args): accumulator.append(self.func(*args)) class PandasAggregateFunctionWrapper(object): """ Wrapper for Pandas Aggregate function. """ def __init__(self, func: AggregateFunction): self.func = func def open(self, function_context: FunctionContext): self.func.open(function_context) def eval(self, *args): accumulator = self.func.create_accumulator() self.func.accumulate(accumulator, *args) return self.func.get_value(accumulator) def close(self): self.func.close() class UserDefinedFunctionWrapper(object): """ Base Wrapper for Python user-defined function. It handles things like converting lambda functions to user-defined functions, creating the Java user-defined function representation, etc. It's for internal use only. """ def __init__(self, func, input_types, func_type, deterministic=None, name=None): if inspect.isclass(func) or ( not isinstance(func, UserDefinedFunction) and not callable(func)): raise TypeError( "Invalid function: not a function or callable (__call__ is not defined): {0}" .format(type(func))) if input_types is not None: from pyflink.table.types import RowType if not isinstance(input_types, collections.abc.Iterable) \ or isinstance(input_types, RowType): input_types = [input_types] for input_type in input_types: if not isinstance(input_type, DataType): raise TypeError( "Invalid input_type: input_type should be DataType but contains {}".format( input_type)) self._func = func self._input_types = input_types self._name = name or ( func.__name__ if hasattr(func, '__name__') else func.__class__.__name__) if deterministic is not None and isinstance(func, UserDefinedFunction) and deterministic \ != func.is_deterministic(): raise ValueError("Inconsistent deterministic: {} and {}".format( deterministic, func.is_deterministic())) # default deterministic is True self._deterministic = deterministic if deterministic is not None else ( func.is_deterministic() if isinstance(func, UserDefinedFunction) else True) self._func_type = func_type self._judf_placeholder = None self._takes_row_as_input = False def __call__(self, *args) -> Expression: from pyflink.table import expressions as expr return expr.call(self, *args) def alias(self, *alias_names: str): self._alias_names = alias_names return self def _set_takes_row_as_input(self): self._takes_row_as_input = True return self def _java_user_defined_function(self): if self._judf_placeholder is None: gateway = get_gateway() def get_python_function_kind(): JPythonFunctionKind = gateway.jvm.org.apache.flink.table.functions.python. \ PythonFunctionKind if self._func_type == "general": return JPythonFunctionKind.GENERAL elif self._func_type == "pandas": return JPythonFunctionKind.PANDAS else: raise TypeError("Unsupported func_type: %s." % self._func_type) if self._input_types is not None: j_input_types = java_utils.to_jarray( gateway.jvm.DataType, [_to_java_data_type(i) for i in self._input_types]) else: j_input_types = None j_function_kind = get_python_function_kind() func = self._func if not isinstance(self._func, UserDefinedFunction): func = self._create_delegate_function() import cloudpickle serialized_func = cloudpickle.dumps(func) self._judf_placeholder = \ self._create_judf(serialized_func, j_input_types, j_function_kind) return self._judf_placeholder def _create_delegate_function(self) -> UserDefinedFunction: pass def _create_judf(self, serialized_func, j_input_types, j_function_kind): pass class UserDefinedScalarFunctionWrapper(UserDefinedFunctionWrapper): """ Wrapper for Python user-defined scalar function. """ def __init__(self, func, input_types, result_type, func_type, deterministic, name): super(UserDefinedScalarFunctionWrapper, self).__init__( func, input_types, func_type, deterministic, name) if not isinstance(result_type, DataType): raise TypeError( "Invalid returnType: returnType should be DataType but is {}".format(result_type)) self._result_type = result_type self._judf_placeholder = None def _create_judf(self, serialized_func, j_input_types, j_function_kind): gateway = get_gateway() j_result_type = _to_java_data_type(self._result_type) PythonScalarFunction = gateway.jvm \ .org.apache.flink.table.functions.python.PythonScalarFunction j_scalar_function = PythonScalarFunction( self._name, bytearray(serialized_func), j_input_types, j_result_type, j_function_kind, self._deterministic, self._takes_row_as_input, _get_python_env()) return j_scalar_function def _create_delegate_function(self) -> UserDefinedFunction: return DelegatingScalarFunction(self._func) class UserDefinedTableFunctionWrapper(UserDefinedFunctionWrapper): """ Wrapper for Python user-defined table function. """ def __init__(self, func, input_types, result_types, deterministic=None, name=None): super(UserDefinedTableFunctionWrapper, self).__init__( func, input_types, "general", deterministic, name) from pyflink.table.types import RowType if not isinstance(result_types, collections.abc.Iterable) \ or isinstance(result_types, RowType): result_types = [result_types] for result_type in result_types: if not isinstance(result_type, DataType): raise TypeError( "Invalid result_type: result_type should be DataType but contains {}".format( result_type)) self._result_types = result_types def _create_judf(self, serialized_func, j_input_types, j_function_kind): gateway = get_gateway() j_result_types = java_utils.to_jarray(gateway.jvm.DataType, [_to_java_data_type(i) for i in self._result_types]) j_result_type = gateway.jvm.DataTypes.ROW(j_result_types) PythonTableFunction = gateway.jvm \ .org.apache.flink.table.functions.python.PythonTableFunction j_table_function = PythonTableFunction( self._name, bytearray(serialized_func), j_input_types, j_result_type, j_function_kind, self._deterministic, self._takes_row_as_input, _get_python_env()) return j_table_function def _create_delegate_function(self) -> UserDefinedFunction: return DelegationTableFunction(self._func) class UserDefinedAggregateFunctionWrapper(UserDefinedFunctionWrapper): """ Wrapper for Python user-defined aggregate function or user-defined table aggregate function. """ def __init__(self, func, input_types, result_type, accumulator_type, func_type, deterministic, name, is_table_aggregate=False): super(UserDefinedAggregateFunctionWrapper, self).__init__( func, input_types, func_type, deterministic, name) if accumulator_type is None and func_type == "general": accumulator_type = func.get_accumulator_type() if result_type is None: result_type = func.get_result_type() if not isinstance(result_type, DataType): raise TypeError( "Invalid returnType: returnType should be DataType but is {}".format(result_type)) from pyflink.table.types import MapType if func_type == 'pandas' and isinstance(result_type, MapType): raise TypeError( "Invalid returnType: Pandas UDAF doesn't support DataType type {} currently" .format(result_type)) if accumulator_type is not None and not isinstance(accumulator_type, DataType): raise TypeError( "Invalid accumulator_type: accumulator_type should be DataType but is {}".format( accumulator_type)) self._result_type = result_type self._accumulator_type = accumulator_type self._is_table_aggregate = is_table_aggregate def _create_judf(self, serialized_func, j_input_types, j_function_kind): if self._func_type == "pandas": from pyflink.table.types import DataTypes self._accumulator_type = DataTypes.ARRAY(self._result_type) if j_input_types is not None: gateway = get_gateway() j_input_types = java_utils.to_jarray( gateway.jvm.DataType, [_to_java_data_type(i) for i in self._input_types]) j_result_type = _to_java_data_type(self._result_type) j_accumulator_type = _to_java_data_type(self._accumulator_type) gateway = get_gateway() if self._is_table_aggregate: PythonAggregateFunction = gateway.jvm \ .org.apache.flink.table.functions.python.PythonTableAggregateFunction else: PythonAggregateFunction = gateway.jvm \ .org.apache.flink.table.functions.python.PythonAggregateFunction j_aggregate_function = PythonAggregateFunction( self._name, bytearray(serialized_func), j_input_types, j_result_type, j_accumulator_type, j_function_kind, self._deterministic, self._takes_row_as_input, _get_python_env()) return j_aggregate_function def _create_delegate_function(self) -> UserDefinedFunction: assert self._func_type == 'pandas' return DelegatingPandasAggregateFunction(self._func) # TODO: support to configure the python execution environment def _get_python_env(): gateway = get_gateway() exec_type = gateway.jvm.org.apache.flink.table.functions.python.PythonEnv.ExecType.PROCESS return gateway.jvm.org.apache.flink.table.functions.python.PythonEnv(exec_type) def _create_udf(f, input_types, result_type, func_type, deterministic, name): return UserDefinedScalarFunctionWrapper( f, input_types, result_type, func_type, deterministic, name) def _create_udtf(f, input_types, result_types, deterministic, name): return UserDefinedTableFunctionWrapper(f, input_types, result_types, deterministic, name) def _create_udaf(f, input_types, result_type, accumulator_type, func_type, deterministic, name): return UserDefinedAggregateFunctionWrapper( f, input_types, result_type, accumulator_type, func_type, deterministic, name) def _create_udtaf(f, input_types, result_type, accumulator_type, func_type, deterministic, name): return UserDefinedAggregateFunctionWrapper( f, input_types, result_type, accumulator_type, func_type, deterministic, name, True)
[docs]def udf(f: Union[Callable, ScalarFunction, Type] = None, input_types: Union[List[DataType], DataType] = None, result_type: DataType = None, deterministic: bool = None, name: str = None, func_type: str = "general", udf_type: str = None) -> Union[UserDefinedScalarFunctionWrapper, Callable]: """ Helper method for creating a user-defined function. Example: :: >>> add_one = udf(lambda i: i + 1, DataTypes.BIGINT(), DataTypes.BIGINT()) >>> # The input_types is optional. >>> @udf(result_type=DataTypes.BIGINT()) ... def add(i, j): ... return i + j >>> class SubtractOne(ScalarFunction): ... def eval(self, i): ... return i - 1 >>> subtract_one = udf(SubtractOne(), DataTypes.BIGINT(), DataTypes.BIGINT()) :param f: lambda function or user-defined function. :param input_types: optional, the input data types. :param result_type: the result data type. :param deterministic: the determinism of the function's results. True if and only if a call to this function is guaranteed to always return the same result given the same parameters. (default True) :param name: the function name. :param func_type: the type of the python function, available value: general, pandas, (default: general) :param udf_type: the type of the python function, available value: general, pandas, (default: general) :return: UserDefinedScalarFunctionWrapper or function. .. versionadded:: 1.10.0 """ if udf_type: import warnings warnings.warn("The param udf_type is deprecated in 1.12. Use func_type instead.") func_type = udf_type if func_type not in ('general', 'pandas'): raise ValueError("The func_type must be one of 'general, pandas', got %s." % func_type) # decorator if f is None: return functools.partial(_create_udf, input_types=input_types, result_type=result_type, func_type=func_type, deterministic=deterministic, name=name) else: return _create_udf(f, input_types, result_type, func_type, deterministic, name)
[docs]def udtf(f: Union[Callable, TableFunction, Type] = None, input_types: Union[List[DataType], DataType] = None, result_types: Union[List[DataType], DataType] = None, deterministic: bool = None, name: str = None) -> Union[UserDefinedTableFunctionWrapper, Callable]: """ Helper method for creating a user-defined table function. Example: :: >>> # The input_types is optional. >>> @udtf(result_types=[DataTypes.BIGINT(), DataTypes.BIGINT()]) ... def range_emit(s, e): ... for i in range(e): ... yield s, i >>> class MultiEmit(TableFunction): ... def eval(self, i): ... return range(i) >>> multi_emit = udtf(MultiEmit(), DataTypes.BIGINT(), DataTypes.BIGINT()) :param f: user-defined table function. :param input_types: optional, the input data types. :param result_types: the result data types. :param name: the function name. :param deterministic: the determinism of the function's results. True if and only if a call to this function is guaranteed to always return the same result given the same parameters. (default True) :return: UserDefinedTableFunctionWrapper or function. .. versionadded:: 1.11.0 """ # decorator if f is None: return functools.partial(_create_udtf, input_types=input_types, result_types=result_types, deterministic=deterministic, name=name) else: return _create_udtf(f, input_types, result_types, deterministic, name)
[docs]def udaf(f: Union[Callable, AggregateFunction, Type] = None, input_types: Union[List[DataType], DataType] = None, result_type: DataType = None, accumulator_type: DataType = None, deterministic: bool = None, name: str = None, func_type: str = "general") -> Union[UserDefinedAggregateFunctionWrapper, Callable]: """ Helper method for creating a user-defined aggregate function. Example: :: >>> # The input_types is optional. >>> @udaf(result_type=DataTypes.FLOAT(), func_type="pandas") ... def mean_udaf(v): ... return v.mean() :param f: user-defined aggregate function. :param input_types: optional, the input data types. :param result_type: the result data type. :param accumulator_type: optional, the accumulator data type. :param deterministic: the determinism of the function's results. True if and only if a call to this function is guaranteed to always return the same result given the same parameters. (default True) :param name: the function name. :param func_type: the type of the python function, available value: general, pandas, (default: general) :return: UserDefinedAggregateFunctionWrapper or function. .. versionadded:: 1.12.0 """ if func_type not in ('general', 'pandas'): raise ValueError("The func_type must be one of 'general, pandas', got %s." % func_type) # decorator if f is None: return functools.partial(_create_udaf, input_types=input_types, result_type=result_type, accumulator_type=accumulator_type, func_type=func_type, deterministic=deterministic, name=name) else: return _create_udaf(f, input_types, result_type, accumulator_type, func_type, deterministic, name)
[docs]def udtaf(f: Union[Callable, TableAggregateFunction, Type] = None, input_types: Union[List[DataType], DataType] = None, result_type: DataType = None, accumulator_type: DataType = None, deterministic: bool = None, name: str = None, func_type: str = 'general') -> Union[UserDefinedAggregateFunctionWrapper, Callable]: """ Helper method for creating a user-defined table aggregate function. Example: :: >>> # The input_types is optional. >>> class Top2(TableAggregateFunction): ... def emit_value(self, accumulator): ... yield Row(accumulator[0]) ... yield Row(accumulator[1]) ... ... def create_accumulator(self): ... return [None, None] ... ... def accumulate(self, accumulator, *args): ... if args[0] is not None: ... if accumulator[0] is None or args[0] > accumulator[0]: ... accumulator[1] = accumulator[0] ... accumulator[0] = args[0] ... elif accumulator[1] is None or args[0] > accumulator[1]: ... accumulator[1] = args[0] ... ... def retract(self, accumulator, *args): ... accumulator[0] = accumulator[0] - 1 ... ... def merge(self, accumulator, accumulators): ... for other_acc in accumulators: ... self.accumulate(accumulator, other_acc[0]) ... self.accumulate(accumulator, other_acc[1]) ... ... def get_accumulator_type(self): ... return DataTypes.ARRAY(DataTypes.BIGINT()) ... ... def get_result_type(self): ... return DataTypes.ROW( ... [DataTypes.FIELD("a", DataTypes.BIGINT())]) >>> top2 = udtaf(Top2()) :param f: user-defined table aggregate function. :param input_types: optional, the input data types. :param result_type: the result data type. :param accumulator_type: optional, the accumulator data type. :param deterministic: the determinism of the function's results. True if and only if a call to this function is guaranteed to always return the same result given the same parameters. (default True) :param name: the function name. :param func_type: the type of the python function, available value: general (default: general) :return: UserDefinedAggregateFunctionWrapper or function. .. versionadded:: 1.13.0 """ if func_type != 'general': raise ValueError("The func_type must be 'general', got %s." % func_type) if f is None: return functools.partial(_create_udtaf, input_types=input_types, result_type=result_type, accumulator_type=accumulator_type, func_type=func_type, deterministic=deterministic, name=name) else: return _create_udtaf(f, input_types, result_type, accumulator_type, func_type, deterministic, name)