################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import datetime
import warnings
from py4j.compat import long
from typing import Tuple
from pyflink.common.configuration import Configuration
from pyflink.java_gateway import get_gateway
from pyflink.table.sql_dialect import SqlDialect
__all__ = ['TableConfig']
from pyflink.util.java_utils import add_jars_to_context_class_loader
[docs]class TableConfig(object):
"""
Configuration for the current :class:`TableEnvironment` session to adjust Table & SQL API
programs.
This class is a pure API class that abstracts configuration from various sources. Currently,
configuration can be set in any of the following layers (in the given order):
- flink-conf.yaml
- CLI parameters
- :class:`~pyflink.datastream.StreamExecutionEnvironment` when bridging to DataStream API
- :func:`~EnvironmentSettings.Builder.with_configuration`
- :func:`~TableConfig.set`
The latter two represent the application-specific part of the configuration. They initialize
and directly modify :func:`~TableConfig.get_configuration`. Other layers represent the
configuration of the execution context and are immutable.
The getter :func:`~TableConfig.get` gives read-only access to the full configuration. However,
application-specific configuration has precedence. Configuration of outer layers is used for
defaults and fallbacks. The setter :func:`~TableConfig.set` will only affect
application-specific configuration.
For common or important configuration options, this class provides getters and setters methods
with detailed inline documentation.
For more advanced configuration, users can directly access the underlying key-value map via
:func:`~pyflink.table.TableConfig.get_configuration`.
Example:
::
>>> table_config = t_env.get_config()
>>> config = Configuration()
>>> config.set_string("parallelism.default", "128") \\
... .set_string("pipeline.auto-watermark-interval", "800ms") \\
... .set_string("execution.checkpointing.interval", "30s")
>>> table_config.add_configuration(config)
.. note::
Because options are read at different point in time when performing operations, it is
recommended to set configuration options early after instantiating a table environment.
"""
def __init__(self, j_table_config=None):
gateway = get_gateway()
if j_table_config is None:
self._j_table_config = gateway.jvm.TableConfig.getDefault()
else:
self._j_table_config = j_table_config
def get(self, key: str, default_value: str) -> str:
"""
Returns the value associated with the given key as a string.
:param key: The key pointing to the associated value.
:param default_value: The default value which is returned in case there is no value
associated with the given key.
:return: The (default) value associated with the given key.
"""
if self.get_configuration().contains_key(key):
return self.get_configuration().get_string(key, default_value)
else:
return self._j_table_config.getRootConfiguration().getString(key, default_value)
def set(self, key: str, value: str) -> 'TableConfig':
"""
Sets a string-based value for the given string-based key.
The value will be parsed by the framework on access.
"""
self._j_table_config.set(key, value)
jvm = get_gateway().jvm
jars_key = jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
classpaths_key = jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()
if key in [jars_key, classpaths_key]:
add_jars_to_context_class_loader(value.split(";"))
return self
def get_local_timezone(self) -> str:
"""
Returns the local timezone id for timestamp with local time zone, either an abbreviation
such as "PST", a full name such as "America/Los_Angeles", or a custom timezone_id such
as "GMT-08:00".
"""
return self._j_table_config.getLocalTimeZone().getId()
def set_local_timezone(self, timezone_id: str):
"""
Sets the local timezone id for timestamp with local time zone.
:param timezone_id: The timezone id, either an abbreviation such as "PST", a full name
such as "America/Los_Angeles", or a custom timezone_id such as
"GMT-08:00".
"""
if timezone_id is not None and isinstance(timezone_id, str):
j_timezone = get_gateway().jvm.java.time.ZoneId.of(timezone_id)
self._j_table_config.setLocalTimeZone(j_timezone)
else:
raise Exception("TableConfig.timezone should be a string!")
def get_null_check(self) -> bool:
"""
A boolean value, "True" enables NULL check and "False" disables NULL check.
.. note:: Deprecated in 1.15 and will be removed in next release.
"""
warnings.warn("Deprecated in 1.15.", DeprecationWarning)
return self._j_table_config.getNullCheck()
def set_null_check(self, null_check: bool):
"""
Sets the NULL check. If enabled, all fields need to be checked for NULL first.
.. note:: Deprecated in 1.15 and will be removed in next release.
"""
warnings.warn("Deprecated in 1.15.", DeprecationWarning)
if null_check is not None and isinstance(null_check, bool):
self._j_table_config.setNullCheck(null_check)
else:
raise Exception("TableConfig.null_check should be a bool value!")
def get_max_generated_code_length(self) -> int:
"""
The current threshold where generated code will be split into sub-function calls. Java has
a maximum method length of 64 KB. This setting allows for finer granularity if necessary.
Default is 64000.
"""
return self._j_table_config.getMaxGeneratedCodeLength()
def set_max_generated_code_length(self, max_generated_code_length: int):
"""
Returns the current threshold where generated code will be split into sub-function calls.
Java has a maximum method length of 64 KB. This setting allows for finer granularity if
necessary. Default is 64000.
"""
if max_generated_code_length is not None and isinstance(max_generated_code_length, int):
self._j_table_config.setMaxGeneratedCodeLength(max_generated_code_length)
else:
raise Exception("TableConfig.max_generated_code_length should be a int value!")
def set_idle_state_retention_time(self,
min_time: datetime.timedelta,
max_time: datetime.timedelta):
"""
Specifies a minimum and a maximum time interval for how long idle state, i.e., state which
was not updated, will be retained.
State will never be cleared until it was idle for less than the minimum time and will never
be kept if it was idle for more than the maximum time.
When new data arrives for previously cleaned-up state, the new data will be handled as if it
was the first data. This can result in previous results being overwritten.
Set to 0 (zero) to never clean-up the state.
Example:
::
>>> table_config = TableConfig() \\
... .set_idle_state_retention_time(datetime.timedelta(days=1),
... datetime.timedelta(days=3))
.. note::
Cleaning up state requires additional bookkeeping which becomes less expensive for
larger differences of minTime and maxTime. The difference between minTime and maxTime
must be at least 5 minutes.
Method set_idle_state_retention_time is deprecated now. The suggested way to set idle
state retention time is :func:`~pyflink.table.TableConfig.set_idle_state_retention`
Currently, setting max_time will not work and the max_time is directly derived from the
min_time as 1.5 x min_time.
:param min_time: The minimum time interval for which idle state is retained. Set to
0 (zero) to never clean-up the state.
:param max_time: The maximum time interval for which idle state is retained. Must be at
least 5 minutes greater than minTime. Set to
0 (zero) to never clean-up the state.
"""
j_time_class = get_gateway().jvm.org.apache.flink.api.common.time.Time
j_min_time = j_time_class.milliseconds(long(round(min_time.total_seconds() * 1000)))
j_max_time = j_time_class.milliseconds(long(round(max_time.total_seconds() * 1000)))
self._j_table_config.setIdleStateRetentionTime(j_min_time, j_max_time)
def set_idle_state_retention(self, duration: datetime.timedelta):
"""
Specifies a retention time interval for how long idle state, i.e., state which
was not updated, will be retained.
State will never be cleared until it was idle for less than the duration and will never
be kept if it was idle for more than the 1.5 x duration.
When new data arrives for previously cleaned-up state, the new data will be handled as if it
was the first data. This can result in previous results being overwritten.
Set to 0 (zero) to never clean-up the state.
Example:
::
>>> table_config.set_idle_state_retention(datetime.timedelta(days=1))
.. note::
Cleaning up state requires additional bookkeeping which becomes less expensive for
larger differences of minTime and maxTime. The difference between minTime and maxTime
must be at least 5 minutes.
:param duration: The retention time interval for which idle state is retained. Set to
0 (zero) to never clean-up the state.
"""
j_duration_class = get_gateway().jvm.java.time.Duration
j_duration = j_duration_class.ofMillis(long(round(duration.total_seconds() * 1000)))
self._j_table_config.setIdleStateRetention(j_duration)
def get_min_idle_state_retention_time(self) -> int:
"""
State might be cleared and removed if it was not updated for the defined period of time.
.. note::
Currently the concept of min/max idle state retention has been deprecated and only
idle state retention time is supported. The min idle state retention is regarded as
idle state retention and the max idle state retention is derived from idle state
retention as 1.5 x idle state retention.
:return: The minimum time until state which was not updated will be retained.
"""
return self._j_table_config.getMinIdleStateRetentionTime()
def get_max_idle_state_retention_time(self) -> int:
"""
State will be cleared and removed if it was not updated for the defined period of time.
.. note::
Currently the concept of min/max idle state retention has been deprecated and only
idle state retention time is supported. The min idle state retention is regarded as
idle state retention and the max idle state retention is derived from idle state
retention as 1.5 x idle state retention.
:return: The maximum time until state which was not updated will be retained.
"""
return self._j_table_config.getMaxIdleStateRetentionTime()
def get_idle_state_retention(self) -> datetime.timedelta:
"""
:return: The duration until state which was not updated will be retained.
"""
return datetime.timedelta(
milliseconds=self._j_table_config.getIdleStateRetention().toMillis())
def set_decimal_context(self, precision: int, rounding_mode: str):
"""
Sets the default context for decimal division calculation.
(precision=34, rounding_mode=HALF_EVEN) by default.
The precision is the number of digits to be used for an operation. A value of 0 indicates
that unlimited precision (as many digits as are required) will be used. Note that leading
zeros (in the coefficient of a number) are never significant.
The rounding mode is the rounding algorithm to be used for an operation. It could be:
**UP**, **DOWN**, **CEILING**, **FLOOR**, **HALF_UP**, **HALF_DOWN**, **HALF_EVEN**,
**UNNECESSARY**
The table below shows the results of rounding input to one digit with the given rounding
mode:
+-------+----+------+---------+-------+---------+-----------+-----------+-------------+
| Input | UP | DOWN | CEILING | FLOOR | HALF_UP | HALF_DOWN | HALF_EVEN | UNNECESSARY |
+=======+====+======+=========+=======+=========+===========+===========+=============+
| 5.5 | 6 | 5 | 6 | 5 | 6 | 5 | 6 | Exception |
+-------+----+------+---------+-------+---------+-----------+-----------+-------------+
| 2.5 | 3 | 2 | 3 | 2 | 3 | 2 | 2 | Exception |
+-------+----+------+---------+-------+---------+-----------+-----------+-------------+
| 1.6 | 2 | 1 | 2 | 1 | 2 | 2 | 2 | Exception |
+-------+----+------+---------+-------+---------+-----------+-----------+-------------+
| 1.1 | 2 | 1 | 2 | 1 | 1 | 1 | 1 | Exception |
+-------+----+------+---------+-------+---------+-----------+-----------+-------------+
| 1.0 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 |
+-------+----+------+---------+-------+---------+-----------+-----------+-------------+
| -1.0 | -1 | -1 | -1 | -1 | -1 | -1 | -1 | -1 |
+-------+----+------+---------+-------+---------+-----------+-----------+-------------+
| -1.1 | -2 | -1 | -1 | -2 | -1 | -1 | -1 | Exception |
+-------+----+------+---------+-------+---------+-----------+-----------+-------------+
| -1.6 | -2 | -1 | -1 | -2 | -2 | -2 | -2 | Exception |
+-------+----+------+---------+-------+---------+-----------+-----------+-------------+
| 2.5 | -3 | -2 | -2 | -3 | -3 | -2 | -2 | Exception |
+-------+----+------+---------+-------+---------+-----------+-----------+-------------+
| 5.5 | -6 | -5 | -5 | -6 | -6 | -5 | -6 | Exception |
+-------+----+------+---------+-------+---------+-----------+-----------+-------------+
:param precision: The precision of the decimal context.
:param rounding_mode: The rounding mode of the decimal context.
.. note:: Deprecated in 1.15 and will be removed in next release.
"""
warnings.warn("Deprecated in 1.15.", DeprecationWarning)
if rounding_mode not in (
"UP",
"DOWN",
"CEILING",
"FLOOR",
"HALF_UP",
"HALF_DOWN",
"HALF_EVEN",
"UNNECESSARY"):
raise ValueError("Unsupported rounding_mode: %s" % rounding_mode)
gateway = get_gateway()
j_rounding_mode = getattr(gateway.jvm.java.math.RoundingMode, rounding_mode)
j_math_context = gateway.jvm.java.math.MathContext(precision, j_rounding_mode)
self._j_table_config.setDecimalContext(j_math_context)
def get_decimal_context(self) -> Tuple[int, str]:
"""
Returns current context for decimal division calculation,
(precision=34, rounding_mode=HALF_EVEN) by default.
.. seealso:: :func:`set_decimal_context`
:return: the current context for decimal division calculation.
.. note:: Deprecated in 1.15 and will be removed in next release.
"""
warnings.warn("Deprecated in 1.15.", DeprecationWarning)
j_math_context = self._j_table_config.getDecimalContext()
precision = j_math_context.getPrecision()
rounding_mode = j_math_context.getRoundingMode().name()
return precision, rounding_mode
def get_configuration(self) -> Configuration:
"""
Gives direct access to the underlying key-value map for advanced configuration.
:return: Entire key-value configuration.
"""
return Configuration(j_configuration=self._j_table_config.getConfiguration())
def add_configuration(self, configuration: Configuration):
"""
Adds the given key-value configuration to the underlying configuration. It overwrites
existing keys.
:param configuration: Key-value configuration to be added.
"""
self._j_table_config.addConfiguration(configuration._j_configuration)
def get_sql_dialect(self) -> SqlDialect:
"""
Returns the current SQL dialect.
"""
return SqlDialect._from_j_sql_dialect(self._j_table_config.getSqlDialect())
def set_sql_dialect(self, sql_dialect: SqlDialect):
"""
Sets the current SQL dialect to parse a SQL query. Flink's SQL behavior by default.
:param sql_dialect: The given SQL dialect.
"""
self._j_table_config.setSqlDialect(SqlDialect._to_j_sql_dialect(sql_dialect))
def set_python_executable(self, python_exec: str):
"""
Sets the path of the python interpreter which is used to execute the python udf workers.
e.g. "/usr/local/bin/python3".
If python UDF depends on a specific python version which does not exist in the cluster,
the method :func:`pyflink.table.TableEnvironment.add_python_archive` can be used to upload
a virtual environment. The path of the python interpreter contained in the uploaded
environment can be specified via this method.
Example:
::
# command executed in shell
# assume that the relative path of python interpreter is py_env/bin/python
$ zip -r py_env.zip py_env
# python code
>>> table_env.add_python_archive("py_env.zip")
>>> table_env.get_config().set_python_executable("py_env.zip/py_env/bin/python")
.. note::
Please make sure the uploaded python environment matches the platform that the cluster
is running on and that the python version must be 3.6 or higher.
.. note::
The python udf worker depends on Apache Beam (version == 2.27.0).
Please ensure that the specified environment meets the above requirements.
:param python_exec: The path of python interpreter.
.. versionadded:: 1.10.0
"""
jvm = get_gateway().jvm
self.set(jvm.PythonOptions.PYTHON_EXECUTABLE.key(), python_exec)
def get_python_executable(self) -> str:
"""
Gets the path of the python interpreter which is used to execute the python udf workers.
If no path is specified before, it will return a None value.
:return: The path of the python interpreter which is used to execute the python udf workers.
.. versionadded:: 1.10.0
"""
jvm = get_gateway().jvm
return self.get_configuration().get_string(jvm.PythonOptions.PYTHON_EXECUTABLE.key(), None)
@staticmethod
def get_default() -> 'TableConfig':
"""
:return: A TableConfig object with default settings.
"""
return TableConfig(get_gateway().jvm.TableConfig.getDefault())