Source code for pyflink.table.environment_settings
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import warnings
from pyflink.java_gateway import get_gateway
from pyflink.util.java_utils import create_url_class_loader
from pyflink.common import Configuration
__all__ = ['EnvironmentSettings']
class EnvironmentSettings(object):
"""
Defines all parameters that initialize a table environment. Those parameters are used only
during instantiation of a :class:`~pyflink.table.TableEnvironment` and cannot be changed
afterwards.
Example:
::
>>> EnvironmentSettings.new_instance() \\
... .in_streaming_mode() \\
... .with_built_in_catalog_name("my_catalog") \\
... .with_built_in_database_name("my_database") \\
... .build()
:func:`~EnvironmentSettings.in_streaming_mode` or :func:`~EnvironmentSettings.in_batch_mode`
might be convenient as shortcuts.
"""
class Builder(object):
"""
A builder for :class:`~EnvironmentSettings`.
"""
def __init__(self):
gateway = get_gateway()
self._j_builder = gateway.jvm.EnvironmentSettings.Builder()
def with_configuration(self, config: Configuration) -> 'EnvironmentSettings.Builder':
"""
Creates the EnvironmentSetting with specified Configuration.
:return: EnvironmentSettings.
"""
self._j_builder = self._j_builder.withConfiguration(config._j_configuration)
return self
def in_batch_mode(self) -> 'EnvironmentSettings.Builder':
"""
Sets that the components should work in a batch mode. Streaming mode by default.
:return: This object.
"""
self._j_builder = self._j_builder.inBatchMode()
return self
def in_streaming_mode(self) -> 'EnvironmentSettings.Builder':
"""
Sets that the components should work in a streaming mode. Enabled by default.
:return: This object.
"""
self._j_builder = self._j_builder.inStreamingMode()
return self
def with_built_in_catalog_name(self, built_in_catalog_name: str) \
-> 'EnvironmentSettings.Builder':
"""
Specifies the name of the initial catalog to be created when instantiating
a :class:`~pyflink.table.TableEnvironment`.
This catalog is an in-memory catalog that will be used to store all temporary objects
(e.g. from :func:`~pyflink.table.TableEnvironment.create_temporary_view` or
:func:`~pyflink.table.TableEnvironment.create_temporary_system_function`) that cannot
be persisted because they have no serializable representation.
It will also be the initial value for the current catalog which can be altered via
:func:`~pyflink.table.TableEnvironment.use_catalog`.
Default: "default_catalog".
:param built_in_catalog_name: The specified built-in catalog name.
:return: This object.
"""
self._j_builder = self._j_builder.withBuiltInCatalogName(built_in_catalog_name)
return self
def with_built_in_database_name(self, built_in_database_name: str) \
-> 'EnvironmentSettings.Builder':
"""
Specifies the name of the default database in the initial catalog to be
created when instantiating a :class:`~pyflink.table.TableEnvironment`.
This database is an in-memory database that will be used to store all temporary
objects (e.g. from :func:`~pyflink.table.TableEnvironment.create_temporary_view` or
:func:`~pyflink.table.TableEnvironment.create_temporary_system_function`) that cannot
be persisted because they have no serializable representation.
It will also be the initial value for the current catalog which can be altered via
:func:`~pyflink.table.TableEnvironment.use_catalog`.
Default: "default_database".
:param built_in_database_name: The specified built-in database name.
:return: This object.
"""
self._j_builder = self._j_builder.withBuiltInDatabaseName(built_in_database_name)
return self
def build(self) -> 'EnvironmentSettings':
"""
Returns an immutable instance of EnvironmentSettings.
:return: an immutable instance of EnvironmentSettings.
"""
gateway = get_gateway()
context_classloader = gateway.jvm.Thread.currentThread().getContextClassLoader()
new_classloader = create_url_class_loader([], context_classloader)
gateway.jvm.Thread.currentThread().setContextClassLoader(new_classloader)
return EnvironmentSettings(self._j_builder.build())
def __init__(self, j_environment_settings):
self._j_environment_settings = j_environment_settings
[docs] def get_built_in_catalog_name(self) -> str:
"""
Gets the specified name of the initial catalog to be created when instantiating a
:class:`~pyflink.table.TableEnvironment`.
:return: The specified name of the initial catalog to be created.
"""
return self._j_environment_settings.getBuiltInCatalogName()
[docs] def get_built_in_database_name(self) -> str:
"""
Gets the specified name of the default database in the initial catalog to be created when
instantiating a :class:`~pyflink.table.TableEnvironment`.
:return: The specified name of the default database in the initial catalog to be created.
"""
return self._j_environment_settings.getBuiltInDatabaseName()
[docs] def is_streaming_mode(self) -> bool:
"""
Tells if the :class:`~pyflink.table.TableEnvironment` should work in a batch or streaming
mode.
:return: True if the TableEnvironment should work in a streaming mode, false otherwise.
"""
return self._j_environment_settings.isStreamingMode()
[docs] def to_configuration(self) -> Configuration:
"""
Convert to `pyflink.common.Configuration`.
:return: Configuration with specified value.
.. note:: Deprecated in 1.15. Please use
:func:`EnvironmentSettings.get_configuration` instead.
"""
warnings.warn("Deprecated in 1.15.", DeprecationWarning)
return Configuration(j_configuration=self._j_environment_settings.toConfiguration())
[docs] def get_configuration(self) -> Configuration:
"""
Get the underlying `pyflink.common.Configuration`.
:return: Configuration with specified value.
"""
return Configuration(j_configuration=self._j_environment_settings.getConfiguration())
[docs] @staticmethod
def new_instance() -> 'EnvironmentSettings.Builder':
"""
Creates a builder for creating an instance of EnvironmentSettings.
:return: A builder of EnvironmentSettings.
"""
return EnvironmentSettings.Builder()
[docs] @staticmethod
def from_configuration(config: Configuration) -> 'EnvironmentSettings':
"""
Creates the EnvironmentSetting with specified Configuration.
:return: EnvironmentSettings.
.. note:: Deprecated in 1.15. Please use
:func:`EnvironmentSettings.Builder.with_configuration` instead.
"""
warnings.warn("Deprecated in 1.15.", DeprecationWarning)
gateway = get_gateway()
context_classloader = gateway.jvm.Thread.currentThread().getContextClassLoader()
new_classloader = create_url_class_loader([], context_classloader)
gateway.jvm.Thread.currentThread().setContextClassLoader(new_classloader)
return EnvironmentSettings(
get_gateway().jvm.EnvironmentSettings.fromConfiguration(config._j_configuration))
[docs] @staticmethod
def in_streaming_mode() -> 'EnvironmentSettings':
"""
Creates a default instance of EnvironmentSettings in streaming execution mode.
In this mode, both bounded and unbounded data streams can be processed.
This method is a shortcut for creating a :class:`~pyflink.table.TableEnvironment` with
little code. Use the builder provided in :func:`EnvironmentSettings.new_instance` for
advanced settings.
:return: EnvironmentSettings.
"""
return EnvironmentSettings.new_instance().in_streaming_mode().build()
[docs] @staticmethod
def in_batch_mode() -> 'EnvironmentSettings':
"""
Creates a default instance of EnvironmentSettings in batch execution mode.
This mode is highly optimized for batch scenarios. Only bounded data streams can be
processed in this mode.
This method is a shortcut for creating a :class:`~pyflink.table.TableEnvironment` with
little code. Use the builder provided in :func:`EnvironmentSettings.new_instance` for
advanced settings.
:return: EnvironmentSettings.
"""
return EnvironmentSettings.new_instance().in_batch_mode().build()