Source code for pyflink.table.environment_settings

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import warnings

from pyflink.java_gateway import get_gateway
from pyflink.util.java_utils import create_url_class_loader

from pyflink.common import Configuration

__all__ = ['EnvironmentSettings']


class EnvironmentSettings(object):
    """
    Defines all parameters that initialize a table environment. Those parameters are used only
    during instantiation of a :class:`~pyflink.table.TableEnvironment` and cannot be changed
    afterwards.

    Example:
    ::

        >>> EnvironmentSettings.new_instance() \\
        ...     .in_streaming_mode() \\
        ...     .with_built_in_catalog_name("my_catalog") \\
        ...     .with_built_in_database_name("my_database") \\
        ...     .build()

    :func:`~EnvironmentSettings.in_streaming_mode` or :func:`~EnvironmentSettings.in_batch_mode`
    might be convenient as shortcuts.
    """

    class Builder(object):
        """
        A builder for :class:`~EnvironmentSettings`.
        """

        def __init__(self):
            gateway = get_gateway()
            self._j_builder = gateway.jvm.EnvironmentSettings.Builder()

        def with_configuration(self, config: Configuration) -> 'EnvironmentSettings.Builder':
            """
            Creates the EnvironmentSetting with specified Configuration.

            :return: EnvironmentSettings.
            """
            self._j_builder = self._j_builder.withConfiguration(config._j_configuration)
            return self

        def in_batch_mode(self) -> 'EnvironmentSettings.Builder':
            """
            Sets that the components should work in a batch mode. Streaming mode by default.

            :return: This object.
            """
            self._j_builder = self._j_builder.inBatchMode()
            return self

        def in_streaming_mode(self) -> 'EnvironmentSettings.Builder':
            """
            Sets that the components should work in a streaming mode. Enabled by default.

            :return: This object.
            """
            self._j_builder = self._j_builder.inStreamingMode()
            return self

        def with_built_in_catalog_name(self, built_in_catalog_name: str) \
                -> 'EnvironmentSettings.Builder':
            """
            Specifies the name of the initial catalog to be created when instantiating
            a :class:`~pyflink.table.TableEnvironment`.

            This catalog is an in-memory catalog that will be used to store all temporary objects
            (e.g. from :func:`~pyflink.table.TableEnvironment.create_temporary_view` or
            :func:`~pyflink.table.TableEnvironment.create_temporary_system_function`) that cannot
            be persisted because they have no serializable representation.

            It will also be the initial value for the current catalog which can be altered via
            :func:`~pyflink.table.TableEnvironment.use_catalog`.

            Default: "default_catalog".

            :param built_in_catalog_name: The specified built-in catalog name.
            :return: This object.
            """
            self._j_builder = self._j_builder.withBuiltInCatalogName(built_in_catalog_name)
            return self

        def with_built_in_database_name(self, built_in_database_name: str) \
                -> 'EnvironmentSettings.Builder':
            """
            Specifies the name of the default database in the initial catalog to be
            created when instantiating a :class:`~pyflink.table.TableEnvironment`.

            This database is an in-memory database that will be used to store all temporary
            objects (e.g. from :func:`~pyflink.table.TableEnvironment.create_temporary_view` or
            :func:`~pyflink.table.TableEnvironment.create_temporary_system_function`) that cannot
            be persisted because they have no serializable representation.

            It will also be the initial value for the current catalog which can be altered via
            :func:`~pyflink.table.TableEnvironment.use_catalog`.

            Default: "default_database".

            :param built_in_database_name: The specified built-in database name.
            :return: This object.
            """
            self._j_builder = self._j_builder.withBuiltInDatabaseName(built_in_database_name)
            return self

        def build(self) -> 'EnvironmentSettings':
            """
            Returns an immutable instance of EnvironmentSettings.

            :return: an immutable instance of EnvironmentSettings.
            """
            gateway = get_gateway()
            context_classloader = gateway.jvm.Thread.currentThread().getContextClassLoader()
            new_classloader = create_url_class_loader([], context_classloader)
            gateway.jvm.Thread.currentThread().setContextClassLoader(new_classloader)
            return EnvironmentSettings(self._j_builder.build())

    def __init__(self, j_environment_settings):
        self._j_environment_settings = j_environment_settings

[docs] def get_built_in_catalog_name(self) -> str: """ Gets the specified name of the initial catalog to be created when instantiating a :class:`~pyflink.table.TableEnvironment`. :return: The specified name of the initial catalog to be created. """ return self._j_environment_settings.getBuiltInCatalogName()
[docs] def get_built_in_database_name(self) -> str: """ Gets the specified name of the default database in the initial catalog to be created when instantiating a :class:`~pyflink.table.TableEnvironment`. :return: The specified name of the default database in the initial catalog to be created. """ return self._j_environment_settings.getBuiltInDatabaseName()
[docs] def is_streaming_mode(self) -> bool: """ Tells if the :class:`~pyflink.table.TableEnvironment` should work in a batch or streaming mode. :return: True if the TableEnvironment should work in a streaming mode, false otherwise. """ return self._j_environment_settings.isStreamingMode()
[docs] def to_configuration(self) -> Configuration: """ Convert to `pyflink.common.Configuration`. :return: Configuration with specified value. .. note:: Deprecated in 1.15. Please use :func:`EnvironmentSettings.get_configuration` instead. """ warnings.warn("Deprecated in 1.15.", DeprecationWarning) return Configuration(j_configuration=self._j_environment_settings.toConfiguration())
[docs] def get_configuration(self) -> Configuration: """ Get the underlying `pyflink.common.Configuration`. :return: Configuration with specified value. """ return Configuration(j_configuration=self._j_environment_settings.getConfiguration())
[docs] @staticmethod def new_instance() -> 'EnvironmentSettings.Builder': """ Creates a builder for creating an instance of EnvironmentSettings. :return: A builder of EnvironmentSettings. """ return EnvironmentSettings.Builder()
[docs] @staticmethod def from_configuration(config: Configuration) -> 'EnvironmentSettings': """ Creates the EnvironmentSetting with specified Configuration. :return: EnvironmentSettings. .. note:: Deprecated in 1.15. Please use :func:`EnvironmentSettings.Builder.with_configuration` instead. """ warnings.warn("Deprecated in 1.15.", DeprecationWarning) gateway = get_gateway() context_classloader = gateway.jvm.Thread.currentThread().getContextClassLoader() new_classloader = create_url_class_loader([], context_classloader) gateway.jvm.Thread.currentThread().setContextClassLoader(new_classloader) return EnvironmentSettings( get_gateway().jvm.EnvironmentSettings.fromConfiguration(config._j_configuration))
[docs] @staticmethod def in_streaming_mode() -> 'EnvironmentSettings': """ Creates a default instance of EnvironmentSettings in streaming execution mode. In this mode, both bounded and unbounded data streams can be processed. This method is a shortcut for creating a :class:`~pyflink.table.TableEnvironment` with little code. Use the builder provided in :func:`EnvironmentSettings.new_instance` for advanced settings. :return: EnvironmentSettings. """ return EnvironmentSettings.new_instance().in_streaming_mode().build()
[docs] @staticmethod def in_batch_mode() -> 'EnvironmentSettings': """ Creates a default instance of EnvironmentSettings in batch execution mode. This mode is highly optimized for batch scenarios. Only bounded data streams can be processed in this mode. This method is a shortcut for creating a :class:`~pyflink.table.TableEnvironment` with little code. Use the builder provided in :func:`EnvironmentSettings.new_instance` for advanced settings. :return: EnvironmentSettings. """ return EnvironmentSettings.new_instance().in_batch_mode().build()