Source code for pyflink.table.environment_settings

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import warnings

from pyflink.java_gateway import get_gateway
from pyflink.util.java_utils import create_url_class_loader

from pyflink.common import Configuration

__all__ = ['EnvironmentSettings']


[docs]class EnvironmentSettings(object): """ Defines all parameters that initialize a table environment. Those parameters are used only during instantiation of a :class:`~pyflink.table.TableEnvironment` and cannot be changed afterwards. Example: :: >>> EnvironmentSettings.new_instance() \\ ... .in_streaming_mode() \\ ... .with_built_in_catalog_name("my_catalog") \\ ... .with_built_in_database_name("my_database") \\ ... .build() :func:`~EnvironmentSettings.in_streaming_mode` or :func:`~EnvironmentSettings.in_batch_mode` might be convenient as shortcuts. """
[docs] class Builder(object): """ A builder for :class:`~EnvironmentSettings`. """ def __init__(self): gateway = get_gateway() self._j_builder = gateway.jvm.EnvironmentSettings.Builder()
[docs] def with_configuration(self, config: Configuration) -> 'EnvironmentSettings.Builder': """ Creates the EnvironmentSetting with specified Configuration. :return: EnvironmentSettings. """ self._j_builder = self._j_builder.withConfiguration(config._j_configuration) return self
[docs] def in_batch_mode(self) -> 'EnvironmentSettings.Builder': """ Sets that the components should work in a batch mode. Streaming mode by default. :return: This object. """ self._j_builder = self._j_builder.inBatchMode() return self
[docs] def in_streaming_mode(self) -> 'EnvironmentSettings.Builder': """ Sets that the components should work in a streaming mode. Enabled by default. :return: This object. """ self._j_builder = self._j_builder.inStreamingMode() return self
[docs] def with_built_in_catalog_name(self, built_in_catalog_name: str) \ -> 'EnvironmentSettings.Builder': """ Specifies the name of the initial catalog to be created when instantiating a :class:`~pyflink.table.TableEnvironment`. This catalog is an in-memory catalog that will be used to store all temporary objects (e.g. from :func:`~pyflink.table.TableEnvironment.create_temporary_view` or :func:`~pyflink.table.TableEnvironment.create_temporary_system_function`) that cannot be persisted because they have no serializable representation. It will also be the initial value for the current catalog which can be altered via :func:`~pyflink.table.TableEnvironment.use_catalog`. Default: "default_catalog". :param built_in_catalog_name: The specified built-in catalog name. :return: This object. """ self._j_builder = self._j_builder.withBuiltInCatalogName(built_in_catalog_name) return self
[docs] def with_built_in_database_name(self, built_in_database_name: str) \ -> 'EnvironmentSettings.Builder': """ Specifies the name of the default database in the initial catalog to be created when instantiating a :class:`~pyflink.table.TableEnvironment`. This database is an in-memory database that will be used to store all temporary objects (e.g. from :func:`~pyflink.table.TableEnvironment.create_temporary_view` or :func:`~pyflink.table.TableEnvironment.create_temporary_system_function`) that cannot be persisted because they have no serializable representation. It will also be the initial value for the current catalog which can be altered via :func:`~pyflink.table.TableEnvironment.use_catalog`. Default: "default_database". :param built_in_database_name: The specified built-in database name. :return: This object. """ self._j_builder = self._j_builder.withBuiltInDatabaseName(built_in_database_name) return self
[docs] def build(self) -> 'EnvironmentSettings': """ Returns an immutable instance of EnvironmentSettings. :return: an immutable instance of EnvironmentSettings. """ gateway = get_gateway() context_classloader = gateway.jvm.Thread.currentThread().getContextClassLoader() new_classloader = create_url_class_loader([], context_classloader) gateway.jvm.Thread.currentThread().setContextClassLoader(new_classloader) return EnvironmentSettings(self._j_builder.build())
def __init__(self, j_environment_settings): self._j_environment_settings = j_environment_settings
[docs] def get_built_in_catalog_name(self) -> str: """ Gets the specified name of the initial catalog to be created when instantiating a :class:`~pyflink.table.TableEnvironment`. :return: The specified name of the initial catalog to be created. """ return self._j_environment_settings.getBuiltInCatalogName()
[docs] def get_built_in_database_name(self) -> str: """ Gets the specified name of the default database in the initial catalog to be created when instantiating a :class:`~pyflink.table.TableEnvironment`. :return: The specified name of the default database in the initial catalog to be created. """ return self._j_environment_settings.getBuiltInDatabaseName()
[docs] def is_streaming_mode(self) -> bool: """ Tells if the :class:`~pyflink.table.TableEnvironment` should work in a batch or streaming mode. :return: True if the TableEnvironment should work in a streaming mode, false otherwise. """ return self._j_environment_settings.isStreamingMode()
[docs] def to_configuration(self) -> Configuration: """ Convert to `pyflink.common.Configuration`. :return: Configuration with specified value. .. note:: Deprecated in 1.15. Please use :func:`EnvironmentSettings.get_configuration` instead. """ warnings.warn("Deprecated in 1.15.", DeprecationWarning) return Configuration(j_configuration=self._j_environment_settings.toConfiguration())
[docs] def get_configuration(self) -> Configuration: """ Get the underlying `pyflink.common.Configuration`. :return: Configuration with specified value. """ return Configuration(j_configuration=self._j_environment_settings.getConfiguration())
[docs] @staticmethod def new_instance() -> 'EnvironmentSettings.Builder': """ Creates a builder for creating an instance of EnvironmentSettings. :return: A builder of EnvironmentSettings. """ return EnvironmentSettings.Builder()
[docs] @staticmethod def from_configuration(config: Configuration) -> 'EnvironmentSettings': """ Creates the EnvironmentSetting with specified Configuration. :return: EnvironmentSettings. .. note:: Deprecated in 1.15. Please use :func:`EnvironmentSettings.Builder.with_configuration` instead. """ warnings.warn("Deprecated in 1.15.", DeprecationWarning) gateway = get_gateway() context_classloader = gateway.jvm.Thread.currentThread().getContextClassLoader() new_classloader = create_url_class_loader([], context_classloader) gateway.jvm.Thread.currentThread().setContextClassLoader(new_classloader) return EnvironmentSettings( get_gateway().jvm.EnvironmentSettings.fromConfiguration(config._j_configuration))
[docs] @staticmethod def in_streaming_mode() -> 'EnvironmentSettings': """ Creates a default instance of EnvironmentSettings in streaming execution mode. In this mode, both bounded and unbounded data streams can be processed. This method is a shortcut for creating a :class:`~pyflink.table.TableEnvironment` with little code. Use the builder provided in :func:`EnvironmentSettings.new_instance` for advanced settings. :return: EnvironmentSettings. """ return EnvironmentSettings.new_instance().in_streaming_mode().build()
[docs] @staticmethod def in_batch_mode() -> 'EnvironmentSettings': """ Creates a default instance of EnvironmentSettings in batch execution mode. This mode is highly optimized for batch scenarios. Only bounded data streams can be processed in this mode. This method is a shortcut for creating a :class:`~pyflink.table.TableEnvironment` with little code. Use the builder provided in :func:`EnvironmentSettings.new_instance` for advanced settings. :return: EnvironmentSettings. """ return EnvironmentSettings.new_instance().in_batch_mode().build()