################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from enum import Enum
from typing import Union, TypeVar, Generic, Any
from pyflink import add_version_doc
from pyflink.java_gateway import get_gateway
from pyflink.table.types import DataType, DataTypes, _to_java_data_type
from pyflink.util.java_utils import to_jarray
__all__ = [
'Expression',
'TimeIntervalUnit',
'TimePointUnit',
'JsonExistsOnError',
'JsonValueOnEmptyOrError'
]
_aggregation_doc = """
{op_desc}
Example:
::
>>> tab \\
>>> .group_by(col("a")) \\
>>> .select(col("a"),
>>> col("b").sum.alias("d"),
>>> col("b").sum0.alias("e"),
>>> col("b").min.alias("f"),
>>> col("b").max.alias("g"),
>>> col("b").count.alias("h"),
>>> col("b").avg.alias("i"),
>>> col("b").stddev_pop.alias("j"),
>>> col("b").stddev_samp.alias("k"),
>>> col("b").var_pop.alias("l"),
>>> col("b").var_samp.alias("m"),
>>> col("b").collect.alias("n"))
.. seealso:: :py:attr:`~Expression.sum`, :py:attr:`~Expression.sum0`, :py:attr:`~Expression.min`,
:py:attr:`~Expression.max`, :py:attr:`~Expression.count`, :py:attr:`~Expression.avg`,
:py:attr:`~Expression.stddev_pop`, :py:attr:`~Expression.stddev_samp`,
:py:attr:`~Expression.var_pop`, :py:attr:`~Expression.var_samp`,
:py:attr:`~Expression.collect`
"""
_math_log_doc = """
{op_desc}
.. seealso:: :py:attr:`~Expression.log10`, :py:attr:`~Expression.log2`, :py:attr:`~Expression.ln`,
:func:`~Expression.log`
"""
_math_trigonometric_doc = """
Calculates the {op_desc} of a given number.
.. seealso:: :py:attr:`~Expression.sin`, :py:attr:`~Expression.cos`, :py:attr:`~Expression.sinh`,
:py:attr:`~Expression.cosh`, :py:attr:`~Expression.tan`, :py:attr:`~Expression.cot`,
:py:attr:`~Expression.asin`, :py:attr:`~Expression.acos`, :py:attr:`~Expression.atan`,
:py:attr:`~Expression.tanh`
"""
_string_doc_seealso = """
.. seealso:: :func:`~Expression.trim_leading`, :func:`~Expression.trim_trailing`,
:func:`~Expression.trim`, :func:`~Expression.replace`,
:py:attr:`~Expression.char_length`, :py:attr:`~Expression.upper_case`,
:py:attr:`~Expression.lower_case`, :py:attr:`~Expression.init_cap`,
:func:`~Expression.like`, :func:`~Expression.similar`,
:func:`~Expression.position`, :func:`~Expression.lpad`, :func:`~Expression.rpad`,
:func:`~Expression.overlay`, :func:`~Expression.regexp_replace`,
:func:`~Expression.regexp_extract`, :func:`~Expression.substring`,
:py:attr:`~Expression.from_base64`, :py:attr:`~Expression.to_base64`,
:py:attr:`~Expression.ltrim`, :py:attr:`~Expression.rtrim`, :func:`~Expression.repeat`
"""
_temporal_doc_seealso = """
.. seealso:: :py:attr:`~Expression.to_date`, :py:attr:`~Expression.to_time`,
:py:attr:`~Expression.to_timestamp`, :func:`~Expression.extract`,
:func:`~Expression.floor`, :func:`~Expression.ceil`
"""
_time_doc = """
Creates an interval of the given number of {op_desc}.
The produced expression is of type :func:`~DataTypes.INTERVAL`.
.. seealso:: :py:attr:`~Expression.year`, :py:attr:`~Expression.years`,
:py:attr:`~Expression.quarter`, :py:attr:`~Expression.quarters`,
:py:attr:`~Expression.month`, :py:attr:`~Expression.months`,
:py:attr:`~Expression.week`, :py:attr:`~Expression.weeks`, :py:attr:`~Expression.day`,
:py:attr:`~Expression.days`, :py:attr:`~Expression.hour`, :py:attr:`~Expression.hours`,
:py:attr:`~Expression.minute`, :py:attr:`~Expression.minutes`,
:py:attr:`~Expression.second`, :py:attr:`~Expression.seconds`,
:py:attr:`~Expression.milli`, :py:attr:`~Expression.millis`
"""
_hash_doc = """
Returns the {op_desc} hash of the string argument; null if string is null.
:return: string of {bit} hexadecimal digits or null.
.. seealso:: :py:attr:`~Expression.md5`, :py:attr:`~Expression.sha1`, :py:attr:`~Expression.sha224`,
:py:attr:`~Expression.sha256`, :py:attr:`~Expression.sha384`,
:py:attr:`~Expression.sha512`, :py:attr:`~Expression.sha2`
"""
def _make_math_log_doc():
math_log_funcs = {
Expression.log10: "Calculates the base 10 logarithm of the given value.",
Expression.log2: "Calculates the base 2 logarithm of the given value.",
Expression.ln: "Calculates the natural logarithm of the given value.",
Expression.log: "Calculates the natural logarithm of the given value if base is not "
"specified. Otherwise, calculates the logarithm of the given value to the "
"given base.",
}
for func, op_desc in math_log_funcs.items():
func.__doc__ = _math_log_doc.format(op_desc=op_desc)
def _make_math_trigonometric_doc():
math_trigonometric_funcs = {
Expression.cosh: "hyperbolic cosine",
Expression.sinh: "hyperbolic sine",
Expression.sin: "sine",
Expression.cos: "cosine",
Expression.tan: "tangent",
Expression.cot: "cotangent",
Expression.asin: "arc sine",
Expression.acos: "arc cosine",
Expression.atan: "arc tangent",
Expression.tanh: "hyperbolic tangent",
}
for func, op_desc in math_trigonometric_funcs.items():
func.__doc__ = _math_trigonometric_doc.format(op_desc=op_desc)
def _make_aggregation_doc():
aggregation_funcs = {
Expression.sum: "Returns the sum of the numeric field across all input values. "
"If all values are null, null is returned.",
Expression.sum0: "Returns the sum of the numeric field across all input values. "
"If all values are null, 0 is returned.",
Expression.min: "Returns the minimum value of field across all input values.",
Expression.max: "Returns the maximum value of field across all input values.",
Expression.count: "Returns the number of input rows for which the field is not null.",
Expression.avg: "Returns the average (arithmetic mean) of the numeric field across all "
"input values.",
Expression.stddev_pop: "Returns the population standard deviation of an expression(the "
"square root of var_pop).",
Expression.stddev_samp: "Returns the sample standard deviation of an expression(the square "
"root of var_samp).",
Expression.var_pop: "Returns the population standard variance of an expression.",
Expression.var_samp: "Returns the sample variance of a given expression.",
Expression.collect: "Returns multiset aggregate of a given expression.",
}
for func, op_desc in aggregation_funcs.items():
func.__doc__ = _aggregation_doc.format(op_desc=op_desc)
def _make_string_doc():
string_funcs = [
Expression.substring, Expression.trim_leading, Expression.trim_trailing, Expression.trim,
Expression.replace, Expression.char_length, Expression.upper_case, Expression.lower_case,
Expression.init_cap, Expression.like, Expression.similar, Expression.position,
Expression.lpad, Expression.rpad, Expression.overlay, Expression.regexp_replace,
Expression.regexp_extract, Expression.from_base64, Expression.to_base64,
Expression.ltrim, Expression.rtrim, Expression.repeat
]
for func in string_funcs:
func.__doc__ = func.__doc__.replace(' ', '') + _string_doc_seealso
def _make_temporal_doc():
temporal_funcs = [
Expression.to_date, Expression.to_time, Expression.to_timestamp, Expression.extract,
Expression.floor, Expression.ceil
]
for func in temporal_funcs:
func.__doc__ = func.__doc__.replace(' ', '') + _temporal_doc_seealso
def _make_time_doc():
time_funcs = {
Expression.year: "years",
Expression.years: "years",
Expression.quarter: "quarters",
Expression.quarters: "quarters",
Expression.month: "months",
Expression.months: "months",
Expression.week: "weeks",
Expression.weeks: "weeks",
Expression.day: "days",
Expression.days: "days",
Expression.hour: "hours",
Expression.hours: "hours",
Expression.minute: "minutes",
Expression.minutes: "minutes",
Expression.second: "seconds",
Expression.seconds: "seconds",
Expression.milli: "millis",
Expression.millis: "millis"
}
for func, op_desc in time_funcs.items():
func.__doc__ = _time_doc.format(op_desc=op_desc)
def _make_hash_doc():
hash_funcs = {
Expression.md5: ("MD5", 32),
Expression.sha1: ("SHA-1", 40),
Expression.sha224: ("SHA-224", 56),
Expression.sha256: ("SHA-256", 64),
Expression.sha384: ("SHA-384", 96),
Expression.sha512: ("SHA-512", 128)
}
for func, (op_desc, bit) in hash_funcs.items():
func.__doc__ = _hash_doc.format(op_desc=op_desc, bit=bit)
def _add_version_doc():
for func_name in dir(Expression):
if not func_name.startswith("_"):
add_version_doc(getattr(Expression, func_name), "1.12.0")
def _get_java_expression(expr, to_expr: bool = False):
"""
Returns the Java expression for the given expr. If expr is a Python expression, returns the
underlying Java expression, otherwise, convert it to a Java expression if to_expr is true.
"""
if isinstance(expr, Expression):
return expr._j_expr
elif to_expr:
gateway = get_gateway()
return gateway.jvm.Expressions.lit(expr)
else:
return expr
def _get_or_create_java_expression(expr: Union["Expression", str]):
if isinstance(expr, Expression):
return expr._j_expr
elif isinstance(expr, str):
from pyflink.table.expressions import col
return col(expr)._j_expr
else:
raise TypeError(
"Invalid argument: expected Expression or string, got {0}.".format(type(expr)))
def _unary_op(op_name: str):
def _(self) -> 'Expression':
return Expression(getattr(self._j_expr, op_name)())
return _
def _binary_op(op_name: str, reverse: bool = False):
def _(self, other) -> 'Expression':
if reverse:
return Expression(getattr(_get_java_expression(other, True), op_name)(self._j_expr))
else:
return Expression(getattr(self._j_expr, op_name)(_get_java_expression(other)))
return _
def _ternary_op(op_name: str):
def _(self, first, second) -> 'Expression':
return Expression(getattr(self._j_expr, op_name)(
_get_java_expression(first), _get_java_expression(second)))
return _
def _varargs_op(op_name: str):
def _(self, *args) -> 'Expression':
return Expression(
getattr(self._j_expr, op_name)(*[_get_java_expression(arg) for arg in args]))
return _
def _expressions_op(op_name: str):
def _(self, *args) -> 'Expression':
from pyflink.table import expressions
return getattr(expressions, op_name)(self, *[_get_java_expression(arg) for arg in args])
return _
class TimeIntervalUnit(Enum):
"""
Units for working with time intervals.
.. versionadded:: 1.12.0
"""
YEAR = 0,
YEAR_TO_MONTH = 1,
QUARTER = 2,
MONTH = 3,
WEEK = 4,
DAY = 5,
DAY_TO_HOUR = 6,
DAY_TO_MINUTE = 7,
DAY_TO_SECOND = 8,
HOUR = 9,
SECOND = 10,
HOUR_TO_MINUTE = 11,
HOUR_TO_SECOND = 12,
MINUTE = 13,
MINUTE_TO_SECOND = 14
def _to_j_time_interval_unit(self):
gateway = get_gateway()
JTimeIntervalUnit = gateway.jvm.org.apache.flink.table.expressions.TimeIntervalUnit
return getattr(JTimeIntervalUnit, self.name)
class TimePointUnit(Enum):
"""
Units for working with points in time.
.. versionadded:: 1.12.0
"""
YEAR = 0,
MONTH = 1,
DAY = 2,
HOUR = 3,
MINUTE = 4,
SECOND = 5,
QUARTER = 6,
WEEK = 7,
MILLISECOND = 8,
MICROSECOND = 9
def _to_j_time_point_unit(self):
gateway = get_gateway()
JTimePointUnit = gateway.jvm.org.apache.flink.table.expressions.TimePointUnit
return getattr(JTimePointUnit, self.name)
class JsonExistsOnError(Enum):
"""
Behavior in case of errors for json_exists().
"""
TRUE = 0,
FALSE = 1,
UNKNOWN = 2,
ERROR = 3
def _to_j_json_exists_on_error(self):
gateway = get_gateway()
JJsonExistsOnError = gateway.jvm.org.apache.flink.table.api.JsonExistsOnError
return getattr(JJsonExistsOnError, self.name)
class JsonValueOnEmptyOrError(Enum):
"""
Behavior in case of emptiness or errors for json_value().
"""
NULL = 0,
ERROR = 1,
DEFAULT = 2
def _to_j_json_value_on_empty_or_error(self):
gateway = get_gateway()
JJsonValueOnEmptyOrError = gateway.jvm.org.apache.flink.table.api.JsonValueOnEmptyOrError
return getattr(JJsonValueOnEmptyOrError, self.name)
T = TypeVar('T')
[docs]class Expression(Generic[T]):
"""
Expressions represent a logical tree for producing a computation result.
Expressions might be literal values, function calls, or field references.
.. versionadded:: 1.12.0
"""
def __init__(self, j_expr_or_property_name):
self._j_expr_or_property_name = j_expr_or_property_name
__abs__ = _unary_op("abs")
# comparison functions
__eq__ = _binary_op("isEqual")
__ne__ = _binary_op("isNotEqual")
__lt__ = _binary_op("isLess")
__gt__ = _binary_op("isGreater")
__le__ = _binary_op("isLessOrEqual")
__ge__ = _binary_op("isGreaterOrEqual")
# logic functions
__and__ = _binary_op("and")
__or__ = _binary_op("or")
__invert__ = _unary_op('isNotTrue')
__rand__ = _binary_op("and")
__ror__ = _binary_op("or")
# arithmetic functions
__add__ = _binary_op("plus")
__sub__ = _binary_op("minus")
__mul__ = _binary_op("times")
__truediv__ = _binary_op("dividedBy")
__mod__ = _binary_op("mod")
__pow__ = _binary_op("power")
__neg__ = _expressions_op("negative")
__radd__ = _binary_op("plus", True)
__rsub__ = _binary_op("minus", True)
__rmul__ = _binary_op("times")
__rtruediv__ = _binary_op("dividedBy", True)
__rmod__ = _binary_op("mod", True)
__rpow__ = _binary_op("power", True)
def __str__(self):
return self._j_expr.asSummaryString()
def __getattr__(self, name):
if name == '_j_expr':
if isinstance(self._j_expr_or_property_name, str):
gateway = get_gateway()
return getattr(gateway.jvm.Expressions, self._j_expr_or_property_name)
else:
return self._j_expr_or_property_name
return self.get(name)
def __getitem__(self, index):
return self.at(index)
# ---------------------------- arithmetic functions ----------------------------------
@property
def exp(self) -> 'Expression[float]':
"""
Calculates the Euler's number raised to the given power.
"""
return _unary_op("exp")(self)
@property
def log10(self) -> 'Expression[float]':
return _unary_op("log10")(self)
@property
def log2(self) -> 'Expression[float]':
return _unary_op("log2")(self)
@property
def ln(self) -> 'Expression[float]':
return _unary_op("ln")(self)
[docs] def log(self, base=None) -> 'Expression[float]':
if base is None:
return _unary_op("log")(self)
else:
return _binary_op("log")(self, base)
@property
def cosh(self) -> 'Expression[float]':
return _unary_op("cosh")(self)
@property
def sinh(self) -> 'Expression[float]':
return _unary_op("sinh")(self)
@property
def sin(self) -> 'Expression[float]':
return _unary_op("sin")(self)
@property
def cos(self) -> 'Expression[float]':
return _unary_op("cos")(self)
@property
def tan(self) -> 'Expression[float]':
return _unary_op("tan")(self)
@property
def cot(self) -> 'Expression[float]':
return _unary_op("cot")(self)
@property
def asin(self) -> 'Expression[float]':
return _unary_op("asin")(self)
@property
def acos(self) -> 'Expression[float]':
return _unary_op("acos")(self)
@property
def atan(self) -> 'Expression[float]':
return _unary_op("atan")(self)
@property
def tanh(self) -> 'Expression[float]':
return _unary_op("tanh")(self)
@property
def degrees(self) -> 'Expression[float]':
"""
Converts numeric from radians to degrees.
.. seealso:: :py:attr:`~Expression.radians`
"""
return _unary_op("degrees")(self)
@property
def radians(self) -> 'Expression[float]':
"""
Converts numeric from degrees to radians.
.. seealso:: :py:attr:`~Expression.degrees`
"""
return _unary_op("radians")(self)
@property
def sqrt(self) -> 'Expression[float]':
"""
Calculates the square root of a given value.
"""
return _unary_op("sqrt")(self)
@property
def abs(self) -> 'Expression[T]':
"""
Calculates the absolute value of given value.
"""
return _unary_op("abs")(self)
@property
def sign(self) -> 'Expression[T]':
"""
Calculates the signum of a given number.
e.g. `lit(1.23).sign` leads to `1.00`, `lit(-1.23).sign` leads to `-1.00`.
"""
return _unary_op("sign")(self)
[docs] def round(self, places: Union[int, 'Expression[int]']):
"""
Rounds the given number to integer places right to the decimal point.
e.g. `lit(646.646).round(2)` leads to `646.65`, `lit(646.646).round(3)` leads to `646.646`,
`lit(646.646).round(0)` leads to `647`, `lit(646.646).round(-2)` leads to `600`.
"""
return _binary_op("round")(self, places)
[docs] def between(self, lower_bound, upper_bound) -> 'Expression[bool]':
"""
Returns true if the given expression is between lower_bound and upper_bound
(both inclusive). False otherwise. The parameters must be numeric types or identical
comparable types.
e.g. `lit(2.1).between(2.1, 2.1)` leads to `true`,
`lit("2018-05-05").to_date.between(lit("2018-05-01").to_date, lit("2018-05-10").to_date)`
leads to `true`.
:param lower_bound: numeric or comparable expression
:param upper_bound: numeric or comparable expression
.. seealso:: :func:`~Expression.not_between`
"""
return _ternary_op("between")(self, lower_bound, upper_bound)
[docs] def not_between(self, lower_bound, upper_bound) -> 'Expression[bool]':
"""
Returns true if the given expression is not between lower_bound and upper_bound
(both inclusive). False otherwise. The parameters must be numeric types or identical
comparable types.
e.g. `lit(2.1).not_between(2.1, 2.1)` leads to `false`,
`lit("2018-05-05").to_date.not_between(lit("2018-05-01").to_date,
lit("2018-05-10").to_date)` leads to `false`.
:param lower_bound: numeric or comparable expression
:param upper_bound: numeric or comparable expression
.. seealso:: :func:`~Expression.between`
"""
return _ternary_op("notBetween")(self, lower_bound, upper_bound)
[docs] def then(self, if_true, if_false) -> 'Expression':
"""
Ternary conditional operator that decides which of two other expressions should be evaluated
based on a evaluated boolean condition.
e.g. lit(42).is_greater(5).then("A", "B") leads to "A"
:param if_true: expression to be evaluated if condition holds
:param if_false: expression to be evaluated if condition does not hold
"""
return _ternary_op("then")(self, if_true, if_false)
[docs] def if_null(self, null_replacement) -> 'Expression':
"""
Returns null_replacement if the given expression is null; otherwise the expression is
returned.
This function returns a data type that is very specific in terms of nullability. The
returned type is the common type of both arguments but only nullable if the
null_replacement is nullable.
The function allows to pass nullable columns into a function or table that is declared
with a NOT NULL constraint.
e.g. col("nullable_column").if_null(5) returns never null.
"""
return _binary_op("ifNull")(self, null_replacement)
@property
def is_null(self) -> 'Expression[bool]':
"""
Returns true if the given expression is null.
.. seealso:: :py:attr:`~Expression.is_not_null`
"""
return _unary_op("isNull")(self)
@property
def is_not_null(self) -> 'Expression[bool]':
"""
Returns true if the given expression is not null.
.. seealso:: :py:attr:`~Expression.is_null`
"""
return _unary_op("isNotNull")(self)
@property
def is_true(self) -> 'Expression[bool]':
"""
Returns true if given boolean expression is true. False otherwise (for null and false).
.. seealso:: :py:attr:`~Expression.is_false`, :py:attr:`~Expression.is_not_true`,
:py:attr:`~Expression.is_not_false`
"""
return _unary_op("isTrue")(self)
@property
def is_false(self) -> 'Expression[bool]':
"""
Returns true if given boolean expression is false. False otherwise (for null and true).
.. seealso:: :py:attr:`~Expression.is_true`, :py:attr:`~Expression.is_not_true`,
:py:attr:`~Expression.is_not_false`
"""
return _unary_op("isFalse")(self)
@property
def is_not_true(self) -> 'Expression[bool]':
"""
Returns true if given boolean expression is not true (for null and false). False otherwise.
.. seealso:: :py:attr:`~Expression.is_true`, :py:attr:`~Expression.is_false`,
:py:attr:`~Expression.is_not_false`
"""
return _unary_op("isNotTrue")(self)
@property
def is_not_false(self) -> 'Expression[bool]':
"""
Returns true if given boolean expression is not false (for null and true). False otherwise.
.. seealso:: :py:attr:`~Expression.is_true`, :py:attr:`~Expression.is_false`,
:py:attr:`~Expression.is_not_true`
"""
return _unary_op("isNotFalse")(self)
@property
def distinct(self) -> 'Expression':
"""
Similar to a SQL distinct aggregation clause such as COUNT(DISTINCT a), declares that an
aggregation function is only applied on distinct input values.
Example:
::
>>> tab \\
>>> .group_by(col("a")) \\
>>> .select(col("a"), col("b").sum.distinct.alias("d"))
"""
return _unary_op("distinct")(self)
@property
def sum(self) -> 'Expression':
return _unary_op("sum")(self)
@property
def sum0(self) -> 'Expression':
return _unary_op("sum0")(self)
@property
def min(self) -> 'Expression':
return _unary_op("min")(self)
@property
def max(self) -> 'Expression':
return _unary_op("max")(self)
@property
def count(self) -> 'Expression':
return _unary_op("count")(self)
@property
def avg(self) -> 'Expression':
return _unary_op("avg")(self)
@property
def stddev_pop(self) -> 'Expression':
return _unary_op("stddevPop")(self)
@property
def stddev_samp(self) -> 'Expression':
return _unary_op("stddevSamp")(self)
@property
def var_pop(self) -> 'Expression':
return _unary_op("varPop")(self)
@property
def var_samp(self) -> 'Expression':
return _unary_op("varSamp")(self)
@property
def collect(self) -> 'Expression':
return _unary_op("collect")(self)
[docs] def alias(self, name: str, *extra_names: str) -> 'Expression[T]':
"""
Specifies a name for an expression i.e. a field.
Example:
::
>>> tab.select(col('a').alias('b'))
:param name: name for one field.
:param extra_names: additional names if the expression expands to multiple fields
"""
gateway = get_gateway()
return _ternary_op("as")(self, name, to_jarray(gateway.jvm.String, extra_names))
[docs] def cast(self, data_type: DataType) -> 'Expression':
"""
Converts a value to a given data type.
e.g. lit("42").cast(DataTypes.INT()) leads to 42.
"""
return _binary_op("cast")(self, _to_java_data_type(data_type))
@property
def asc(self) -> 'Expression':
"""
Specifies ascending order of an expression i.e. a field for order_by.
Example:
::
>>> tab.order_by(col('a').asc)
.. seealso:: :py:attr:`~Expression.desc`
"""
return _unary_op("asc")(self)
@property
def desc(self) -> 'Expression':
"""
Specifies descending order of an expression i.e. a field for order_by.
Example:
::
>>> tab.order_by(col('a').desc)
.. seealso:: :py:attr:`~Expression.asc`
"""
return _unary_op("desc")(self)
[docs] def in_(self, first_element_or_table, *remaining_elements) -> 'Expression':
"""
If first_element_or_table is a Table, Returns true if an expression exists in a given table
sub-query. The sub-query table must consist of one column. This column must have the same
data type as the expression.
.. note::
This operation is not supported in a streaming environment yet if
first_element_or_table is a Table.
Otherwise, Returns true if an expression exists in a given list of expressions. This is a
shorthand for multiple OR conditions.
If the testing set contains null, the result will be null if the element can not be found
and true if it can be found. If the element is null, the result is always null.
e.g. lit("42").in(1, 2, 3) leads to false.
Example:
::
>>> tab.where(col("a").in_(1, 2, 3))
>>> table_a.where(col("x").in_(table_b.select("y")))
"""
from pyflink.table import Table
if isinstance(first_element_or_table, Table):
assert len(remaining_elements) == 0
return _binary_op("in")(self, first_element_or_table._j_table)
else:
gateway = get_gateway()
ApiExpressionUtils = gateway.jvm.org.apache.flink.table.expressions.ApiExpressionUtils
remaining_elements = (first_element_or_table, *remaining_elements)
exprs = [ApiExpressionUtils.objectToExpression(_get_java_expression(e))
for e in remaining_elements]
return _binary_op("in")(self, to_jarray(gateway.jvm.Object, exprs))
@property
def start(self) -> 'Expression':
"""
Returns the start time (inclusive) of a window when applied on a window reference.
Example:
::
>>> tab.window(Tumble
>>> .over(row_interval(2))
>>> .on(col("a"))
>>> .alias("w")) \\
>>> .group_by(col("c"), col("w")) \\
>>> .select(col("c"), col("w").start, col("w").end, col("w").proctime)
.. seealso:: :py:attr:`~Expression.end`
"""
return _unary_op("start")(self)
@property
def end(self) -> 'Expression':
"""
Returns the end time (exclusive) of a window when applied on a window reference.
e.g. if a window ends at 10:59:59.999 this property will return 11:00:00.000.
Example:
::
>>> orders.window(Tumble
>>> .over(row_interval(2))
>>> .on(col("a"))
>>> .alias("w")) \\
>>> .group_by(col("c"), col("w")) \\
>>> .select(col("c"), col("w").start, col("w").end, col("w").proctime)
.. seealso:: :py:attr:`~Expression.start`
"""
return _unary_op("end")(self)
@property
def bin(self) -> 'Expression[str]':
"""
Returns a string representation of an integer numeric value in binary format. Returns null
if numeric is null. E.g. "4" leads to "100", "12" leads to "1100".
.. seealso:: :py:attr:`~Expression.hex`
"""
return _unary_op("bin")(self)
@property
def hex(self) -> 'Expression[str]':
"""
Returns a string representation of an integer numeric value or a string in hex format.
Returns null if numeric or string is null.
E.g. a numeric 20 leads to "14", a numeric 100 leads to "64", and a string "hello,world"
leads to "68656c6c6f2c776f726c64".
.. seealso:: :py:attr:`~Expression.bin`
"""
return _unary_op("hex")(self)
[docs] def truncate(self, n: Union[int, 'Expression[int]'] = 0) -> 'Expression[T]':
"""
Returns a number of truncated to n decimal places.
If n is 0, the result has no decimal point or fractional part.
n can be negative to cause n digits left of the decimal point of the value to become zero.
E.g. truncate(42.345, 2) to 42.34, 42.truncate(-1) to 40
"""
return _binary_op("truncate")(self, n)
# ---------------------------- string functions ----------------------------------
[docs] def substring(self,
begin_index: Union[int, 'Expression[int]'],
length: Union[int, 'Expression[int]'] = None) -> 'Expression[str]':
"""
Creates a substring of the given string at given index for a given length.
:param begin_index: first character of the substring (starting at 1, inclusive)
:param length: number of characters of the substring
"""
if length is None:
return _binary_op("substring")(self, begin_index)
else:
return _ternary_op("substring")(self, begin_index, length)
[docs] def trim_leading(self, character: Union[str, 'Expression[str]'] = None) -> 'Expression[str]':
"""
Removes leading space characters from the given string if character is None.
Otherwise, removes leading specified characters from the given string.
"""
if character is None:
return _unary_op("trimLeading")(self)
else:
return _binary_op("trimLeading")(self, character)
[docs] def trim_trailing(self, character: Union[str, 'Expression[str]'] = None) -> 'Expression[str]':
"""
Removes trailing space characters from the given string if character is None.
Otherwise, removes trailing specified characters from the given string.
"""
if character is None:
return _unary_op("trimTrailing")(self)
else:
return _binary_op("trimTrailing")(self, character)
[docs] def trim(self, character: Union[str, 'Expression[str]'] = None) -> 'Expression[str]':
"""
Removes leading and trailing space characters from the given string if character
is None. Otherwise, removes leading and trailing specified characters from the given string.
"""
if character is None:
return _unary_op("trim")(self)
else:
return _binary_op("trim")(self, character)
[docs] def replace(self,
search: Union[str, 'Expression[str]'] = None,
replacement: Union[str, 'Expression[str]'] = None) -> 'Expression[str]':
"""
Returns a new string which replaces all the occurrences of the search target
with the replacement string (non-overlapping).
e.g. `lit('This is a test String.').replace(' ', '_')` leads to `This_is_a_test_String.`
"""
return _ternary_op("replace")(self, search, replacement)
@property
def char_length(self) -> 'Expression[int]':
"""
Returns the length of a string.
"""
return _unary_op("charLength")(self)
@property
def upper_case(self) -> 'Expression[str]':
"""
Returns all of the characters in a string in upper case using the rules of the default
locale.
"""
return _unary_op("upperCase")(self)
@property
def lower_case(self) -> 'Expression[str]':
"""
Returns all of the characters in a string in lower case using the rules of the default
locale.
"""
return _unary_op("lowerCase")(self)
@property
def init_cap(self) -> 'Expression[str]':
"""
Converts the initial letter of each word in a string to uppercase. Assumes a
string containing only [A-Za-z0-9], everything else is treated as whitespace.
"""
return _unary_op("initCap")(self)
[docs] def like(self, pattern: Union[str, 'Expression[str]'] = None) -> 'Expression[bool]':
"""
Returns true, if a string matches the specified LIKE pattern.
e.g. 'Jo_n%' matches all strings that start with 'Jo(arbitrary letter)n'
"""
return _binary_op("like")(self, pattern)
[docs] def similar(self, pattern: Union[str, 'Expression[str]'] = None) -> 'Expression[bool]':
"""
Returns true, if a string matches the specified SQL regex pattern.
e.g. 'A+' matches all strings that consist of at least one A
"""
return _binary_op("similar")(self, pattern)
[docs] def position(self, haystack: Union[str, 'Expression[str]'] = None) -> 'Expression[int]':
"""
Returns the position of string in an other string starting at 1.
Returns 0 if string could not be found. e.g. lit('a').position('bbbbba') leads to 6.
"""
return _binary_op("position")(self, haystack)
[docs] def lpad(self,
length: Union[int, 'Expression[int]'],
pad: Union[str, 'Expression[str]']) -> 'Expression[str]':
"""
Returns a string left-padded with the given pad string to a length of len characters.
If the string is longer than len, the return value is shortened to len characters.
e.g. lit('hi').lpad(4, '??') returns '??hi', lit('hi').lpad(1, '??') returns 'h'
"""
return _ternary_op("lpad")(self, length, pad)
[docs] def rpad(self,
length: Union[int, 'Expression[int]'],
pad: Union[str, 'Expression[str]']) -> 'Expression[str]':
"""
Returns a string right-padded with the given pad string to a length of len characters.
If the string is longer than len, the return value is shortened to len characters.
e.g. lit('hi').rpad(4, '??') returns 'hi??', lit('hi').rpad(1, '??') returns 'h'
"""
return _ternary_op("rpad")(self, length, pad)
[docs] def overlay(self,
new_string: Union[str, 'Expression[str]'],
starting: Union[int, 'Expression[int]'],
length: Union[int, 'Expression[int]'] = None) -> 'Expression[str]':
"""
Replaces a substring of string with a string starting at a position
(starting at 1). e.g. lit('xxxxxtest').overlay('xxxx', 6) leads to 'xxxxxxxxx'
lit('xxxxxtest').overlay('xxxx', 6, 2) leads to 'xxxxxxxxxst'
"""
if length is None:
return _ternary_op("overlay")(self, new_string, starting)
else:
j_expr_new_string = new_string._j_expr \
if isinstance(new_string, Expression) else new_string
j_expr_starting = starting._j_expr \
if isinstance(starting, Expression) else starting
j_expr_length = length._j_expr \
if isinstance(length, Expression) else length
return Expression(getattr(self._j_expr, "overlay")(
j_expr_new_string, j_expr_starting, j_expr_length))
[docs] def regexp_replace(self,
regex: Union[str, 'Expression[str]'],
replacement: Union[str, 'Expression[str]']) -> 'Expression[str]':
"""
Returns a string with all substrings that match the regular expression
consecutively being replaced.
"""
return _ternary_op("regexpReplace")(self, regex, replacement)
@property
def from_base64(self) -> 'Expression[str]':
"""
Returns the base string decoded with base64.
"""
return _unary_op("fromBase64")(self)
@property
def to_base64(self) -> 'Expression[str]':
"""
Returns the base64-encoded result of the input string.
"""
return _unary_op("toBase64")(self)
@property
def ltrim(self) -> 'Expression[str]':
"""
Returns a string that removes the left whitespaces from the given string.
"""
return _unary_op("ltrim")(self)
@property
def rtrim(self) -> 'Expression[str]':
"""
Returns a string that removes the right whitespaces from the given string.
"""
return _unary_op("rtrim")(self)
[docs] def repeat(self, n: Union[int, 'Expression[int]']) -> 'Expression[str]':
"""
Returns a string that repeats the base string n times.
"""
return _binary_op("repeat")(self, n)
[docs] def over(self, alias) -> 'Expression':
"""
Defines an aggregation to be used for a previously specified over window.
Example:
::
>>> tab.window(Over
>>> .partition_by(col('c'))
>>> .order_by(col('rowtime'))
>>> .preceding(row_interval(2))
>>> .following(CURRENT_ROW)
>>> .alias("w")) \\
>>> .select(col('c'), col('a'), col('a').count.over(col('w')))
"""
return _binary_op("over")(self, alias)
# ---------------------------- temporal functions ----------------------------------
@property
def to_date(self) -> 'Expression':
"""
Parses a date string in the form "yyyy-MM-dd" to a SQL Date. It's equivalent to
`col.cast(DataTypes.DATE())`.
Example:
::
>>> lit("2016-06-15").to_date
"""
return _unary_op("toDate")(self)
@property
def to_time(self) -> 'Expression':
"""
Parses a time string in the form "HH:mm:ss" to a SQL Time. It's equivalent to
`col.cast(DataTypes.TIME())`.
Example:
::
>>> lit("3:30:00").to_time
"""
return _unary_op("toTime")(self)
@property
def to_timestamp(self) -> 'Expression':
"""
Parses a timestamp string in the form "yyyy-MM-dd HH:mm:ss[.SSS]" to a SQL Timestamp.
It's equivalent to `col.cast(DataTypes.TIMESTAMP(3))`.
Example:
::
>>> lit('2016-06-15 3:30:00.001').to_timestamp
"""
return _unary_op("toTimestamp")(self)
[docs] def floor(self, time_interval_unit: TimeIntervalUnit = None) -> 'Expression':
"""
If time_interval_unit is specified, it rounds down a time point to the given
unit, e.g. `lit("12:44:31").to_date.floor(TimeIntervalUnit.MINUTE)` leads to
`12:44:00`. Otherwise, it calculates the largest integer less than or equal to a
given number.
"""
if time_interval_unit is None:
return _unary_op("floor")(self)
else:
return _binary_op("floor")(
self, time_interval_unit._to_j_time_interval_unit())
[docs] def ceil(self, time_interval_unit: TimeIntervalUnit = None) -> 'Expression':
"""
If time_interval_unit is specified, it rounds up a time point to the given unit,
e.g. `lit("12:44:31").to_date.floor(TimeIntervalUnit.MINUTE)` leads to 12:45:00.
Otherwise, it calculates the smallest integer greater than or equal to a given number.
"""
if time_interval_unit is None:
return _unary_op("ceil")(self)
else:
return _binary_op("ceil")(
self, time_interval_unit._to_j_time_interval_unit())
# ---------------------------- advanced type helper functions -----------------------------
[docs] def get(self, name_or_index: Union[str, int]) -> 'Expression':
"""
Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name or index
and returns it's value.
:param name_or_index: name or index of the field (similar to Flink's field expressions)
.. seealso:: :py:attr:`~Expression.flatten`
"""
return _binary_op("get")(self, name_or_index)
@property
def flatten(self) -> 'Expression':
"""
Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its direct subtypes
into a flat representation where every subtype is a separate field.
.. seealso:: :func:`~Expression.get`
"""
return _unary_op("flatten")(self)
[docs] def at(self, index) -> 'Expression':
"""
Accesses the element of an array or map based on a key or an index (starting at 1).
:param index: index key or position of the element (array index starting at 1)
.. seealso:: :py:attr:`~Expression.cardinality`, :py:attr:`~Expression.element`
"""
return _binary_op("at")(self, index)
@property
def cardinality(self) -> 'Expression':
"""
Returns the number of elements of an array or number of entries of a map.
.. seealso:: :func:`~Expression.at`, :py:attr:`~Expression.element`
"""
return _unary_op("cardinality")(self)
@property
def element(self) -> 'Expression':
"""
Returns the sole element of an array with a single element. Returns null if the array is
empty. Throws an exception if the array has more than one element.
.. seealso:: :func:`~Expression.at`, :py:attr:`~Expression.cardinality`
"""
return _unary_op("element")(self)
# ---------------------------- time definition functions -----------------------------
@property
def rowtime(self) -> 'Expression':
"""
Declares a field as the rowtime attribute for indicating, accessing, and working in
Flink's event time.
.. seealso:: :py:attr:`~Expression.rowtime`
"""
return _unary_op("rowtime")(self)
@property
def proctime(self) -> 'Expression':
"""
Declares a field as the proctime attribute for indicating, accessing, and working in
Flink's processing time.
.. seealso:: :py:attr:`~Expression.proctime`
"""
return _unary_op("proctime")(self)
@property
def year(self) -> 'Expression':
return _unary_op("year")(self)
@property
def years(self) -> 'Expression':
return _unary_op("years")(self)
@property
def quarter(self) -> 'Expression':
return _unary_op("quarter")(self)
@property
def quarters(self) -> 'Expression':
return _unary_op("quarters")(self)
@property
def month(self) -> 'Expression':
return _unary_op("month")(self)
@property
def months(self) -> 'Expression':
return _unary_op("months")(self)
@property
def week(self) -> 'Expression':
return _unary_op("week")(self)
@property
def weeks(self) -> 'Expression':
return _unary_op("weeks")(self)
@property
def day(self) -> 'Expression':
return _unary_op("day")(self)
@property
def days(self) -> 'Expression':
return _unary_op("days")(self)
@property
def hour(self) -> 'Expression':
return _unary_op("hour")(self)
@property
def hours(self) -> 'Expression':
return _unary_op("hours")(self)
@property
def minute(self) -> 'Expression':
return _unary_op("minute")(self)
@property
def minutes(self) -> 'Expression':
return _unary_op("minutes")(self)
@property
def second(self) -> 'Expression':
return _unary_op("second")(self)
@property
def seconds(self) -> 'Expression':
return _unary_op("seconds")(self)
@property
def milli(self) -> 'Expression':
return _unary_op("milli")(self)
@property
def millis(self) -> 'Expression':
return _unary_op("millis")(self)
# ---------------------------- hash functions -----------------------------
@property
def md5(self) -> 'Expression[str]':
return _unary_op("md5")(self)
@property
def sha1(self) -> 'Expression[str]':
return _unary_op("sha1")(self)
@property
def sha224(self) -> 'Expression[str]':
return _unary_op("sha224")(self)
@property
def sha256(self) -> 'Expression[str]':
return _unary_op("sha256")(self)
@property
def sha384(self) -> 'Expression[str]':
return _unary_op("sha384")(self)
@property
def sha512(self) -> 'Expression[str]':
return _unary_op("sha512")(self)
[docs] def sha2(self, hash_length: Union[int, 'Expression[int]']) -> 'Expression[str]':
"""
Returns the hash for the given string expression using the SHA-2 family of hash
functions (SHA-224, SHA-256, SHA-384, or SHA-512).
:param hash_length: bit length of the result (either 224, 256, 384, or 512)
:return: string or null if one of the arguments is null.
.. seealso:: :py:attr:`~Expression.md5`, :py:attr:`~Expression.sha1`,
:py:attr:`~Expression.sha224`, :py:attr:`~Expression.sha256`,
:py:attr:`~Expression.sha384`, :py:attr:`~Expression.sha512`
"""
return _binary_op("sha2")(self, hash_length)
# ---------------------------- JSON functions -----------------------------
[docs] def json_exists(self, path: str, on_error: JsonExistsOnError = None) -> 'Expression[bool]':
"""
Determines whether a JSON string satisfies a given search criterion.
This follows the ISO/IEC TR 19075-6 specification for JSON support in SQL.
Examples:
::
>>> lit('{"a": true}').json_exists('$.a') // True
>>> lit('{"a": true}').json_exists('$.b') // False
>>> lit('{"a": [{ "b": 1 }]}').json_exists('$.a[0].b') // True
>>> lit('{"a": true}').json_exists('strict $.b', JsonExistsOnError.TRUE) // True
>>> lit('{"a": true}').json_exists('strict $.b', JsonExistsOnError.FALSE) // False
"""
if on_error is None:
return _binary_op("jsonExists")(self, path)
else:
return _ternary_op("jsonExists")(self, path, on_error._to_j_json_exists_on_error())
[docs] def json_value(self,
path: str,
returning_type: DataType = DataTypes.STRING(),
on_empty: JsonValueOnEmptyOrError = JsonValueOnEmptyOrError.NULL,
default_on_empty: Any = None,
on_error: JsonValueOnEmptyOrError = JsonValueOnEmptyOrError.NULL,
default_on_error: Any = None) -> 'Expression':
"""
Extracts a scalar from a JSON string.
This method searches a JSON string for a given path expression and returns the value if the
value at that path is scalar. Non-scalar values cannot be returned. By default, the value is
returned as `DataTypes.STRING()`. Using `returningType` a different type can be chosen, with
the following types being supported:
* `STRING`
* `BOOLEAN`
* `INT`
* `DOUBLE`
For empty path expressions or errors a behavior can be defined to either return `null`,
raise an error or return a defined default value instead.
Examples:
::
>>> lit('{"a": true}').json_value('$.a')
>>> lit('{"a": true}').json_value('$.a', DataTypes.BOOLEAN())
>>> lit('{"a": true}').json_value('lax $.b', \
JsonValueOnEmptyOrError.DEFAULT, False)
>>> lit('{"a": true}').json_value('strict $.b', \
JsonValueOnEmptyOrError.NULL, None, JsonValueOnEmptyOrError.DEFAULT, False)
"""
return _varargs_op("jsonValue")(self, path, _to_java_data_type(returning_type),
on_empty._to_j_json_value_on_empty_or_error(),
default_on_empty,
on_error._to_j_json_value_on_empty_or_error(),
default_on_error)
# add the docs
_make_math_log_doc()
_make_math_trigonometric_doc()
_make_aggregation_doc()
_make_string_doc()
_make_temporal_doc()
_make_time_doc()
_make_hash_doc()
# add the version docs
_add_version_doc()