################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import sys
from abc import ABCMeta
from collections import OrderedDict
from py4j.java_gateway import get_method
from typing import Dict, Union
from pyflink.java_gateway import get_gateway
from pyflink.table.table_schema import TableSchema
from pyflink.table.types import _to_java_type, DataType
__all__ = [
'Rowtime',
'Schema',
'OldCsv',
'FileSystem',
'Kafka',
'Elasticsearch',
'HBase',
'Csv',
'Avro',
'Json',
'ConnectTableDescriptor',
'StreamTableDescriptor',
'BatchTableDescriptor',
'CustomConnectorDescriptor',
'CustomFormatDescriptor'
]
class Descriptor(object, metaclass=ABCMeta):
"""
Base class of the descriptors that adds a set of string-based, normalized properties for
describing DDL information.
Typical characteristics of a descriptor are:
- descriptors have a default constructor
- descriptors themselves contain very little logic
- corresponding validators validate the correctness (goal: have a single point of validation)
A descriptor is similar to a builder in a builder pattern, thus, mutable for building
properties.
"""
def __init__(self, j_descriptor):
self._j_descriptor = j_descriptor
def to_properties(self) -> Dict:
"""
Converts this descriptor into a dict of properties.
:return: Dict object contains all of current properties.
"""
return dict(self._j_descriptor.toProperties())
[docs]class Rowtime(Descriptor):
"""
Rowtime descriptor for describing an event time attribute in the schema.
"""
def __init__(self):
gateway = get_gateway()
self._j_rowtime = gateway.jvm.Rowtime()
super(Rowtime, self).__init__(self._j_rowtime)
[docs] def timestamps_from_field(self, field_name: str):
"""
Sets a built-in timestamp extractor that converts an existing LONG or TIMESTAMP field into
the rowtime attribute.
:param field_name: The field to convert into a rowtime attribute.
:return: This rowtime descriptor.
"""
self._j_rowtime = self._j_rowtime.timestampsFromField(field_name)
return self
[docs] def timestamps_from_source(self) -> 'Rowtime':
"""
Sets a built-in timestamp extractor that converts the assigned timestamps from a DataStream
API record into the rowtime attribute and thus preserves the assigned timestamps from the
source.
.. note::
This extractor only works in streaming environments.
:return: This rowtime descriptor.
"""
self._j_rowtime = self._j_rowtime.timestampsFromSource()
return self
[docs] def watermarks_periodic_ascending(self) -> 'Rowtime':
"""
Sets a built-in watermark strategy for ascending rowtime attributes.
Emits a watermark of the maximum observed timestamp so far minus 1. Rows that have a
timestamp equal to the max timestamp are not late.
:return: This rowtime descriptor.
"""
self._j_rowtime = self._j_rowtime.watermarksPeriodicAscending()
return self
[docs] def watermarks_periodic_bounded(self, delay: int) -> 'Rowtime':
"""
Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a
bounded time interval.
Emits watermarks which are the maximum observed timestamp minus the specified delay.
:param delay: Delay in milliseconds.
:return: This rowtime descriptor.
"""
self._j_rowtime = self._j_rowtime.watermarksPeriodicBounded(delay)
return self
[docs] def watermarks_from_source(self) -> 'Rowtime':
"""
Sets a built-in watermark strategy which indicates the watermarks should be preserved from
the underlying DataStream API and thus preserves the assigned watermarks from the source.
:return: This rowtime descriptor.
"""
self._j_rowtime = self._j_rowtime.watermarksFromSource()
return self
[docs] def watermarks_from_strategy(self, strategy: str) -> 'Rowtime':
"""
Sets a custom watermark strategy to be used for the rowtime attribute.
:param strategy: The java fully-qualified class name of the WatermarkStrategy. The
WatermarkStrategy must have a public no-argument constructor and can be
founded by in current Java classloader.
:return: This rowtime descriptor.
"""
gateway = get_gateway()
self._j_rowtime = self._j_rowtime.watermarksFromStrategy(
gateway.jvm.Thread.currentThread().getContextClassLoader().loadClass(strategy)
.newInstance())
return self
[docs]class Schema(Descriptor):
"""
Describes a schema of a table.
.. note::
Field names are matched by the exact name by default (case sensitive).
"""
def __init__(self, schema=None, fields=None, rowtime=None):
"""
Constructor of Schema descriptor.
:param schema: The :class:`TableSchema` object.
:param fields: Dict of fields with the field name and the data type or type string stored.
:param rowtime: A :class:`RowTime` that Specifies the previously defined field as an
event-time attribute.
"""
gateway = get_gateway()
self._j_schema = gateway.jvm.org.apache.flink.table.descriptors.Schema()
super(Schema, self).__init__(self._j_schema)
if schema is not None:
self.schema(schema)
if fields is not None:
self.fields(fields)
if rowtime is not None:
self.rowtime(rowtime)
[docs] def schema(self, table_schema: 'TableSchema') -> 'Schema':
"""
Sets the schema with field names and the types. Required.
This method overwrites existing fields added with
:func:`~pyflink.table.descriptors.Schema.field`.
:param table_schema: The :class:`TableSchema` object.
:return: This schema object.
"""
self._j_schema = self._j_schema.schema(table_schema._j_table_schema)
return self
[docs] def field(self, field_name: str, field_type: Union[DataType, str]) -> 'Schema':
"""
Adds a field with the field name and the data type or type string. Required.
This method can be called multiple times. The call order of this method defines
also the order of the fields in a row. Here is a document that introduces the type strings:
https://nightlies.apache.org/flink/flink-docs-stable/dev/table/connect.html#type-strings
:param field_name: The field name.
:param field_type: The data type or type string of the field.
:return: This schema object.
"""
if isinstance(field_type, str):
self._j_schema = self._j_schema.field(field_name, field_type)
else:
self._j_schema = self._j_schema.field(field_name, _to_java_type(field_type))
return self
[docs] def fields(self, fields: Dict[str, Union[DataType, str]]) -> 'Schema':
"""
Adds a set of fields with the field name and the data type or type string stored in a
list.
:param fields: Dict of fields with the field name and the data type or type string
stored.
E.g, [('int_field', DataTypes.INT()), ('string_field', DataTypes.STRING())].
:return: This schema object.
.. versionadded:: 1.11.0
"""
if sys.version_info[:2] <= (3, 5) and not isinstance(fields, OrderedDict):
raise TypeError("Must use OrderedDict type in python3.5 or older version to key the "
"schema in insert order.")
elif sys.version_info[:2] > (3, 5) and not isinstance(fields, (OrderedDict, dict)):
raise TypeError("fields must be stored in a dict or OrderedDict")
for field_name, field_type in fields.items():
self.field(field_name=field_name, field_type=field_type)
return self
[docs] def from_origin_field(self, origin_field_name: str) -> 'Schema':
"""
Specifies the origin of the previously defined field. The origin field is defined by a
connector or format.
E.g. field("myString", Types.STRING).from_origin_field("CSV_MY_STRING")
.. note::
Field names are matched by the exact name by default (case sensitive).
:param origin_field_name: The origin field name.
:return: This schema object.
"""
self._j_schema = get_method(self._j_schema, "from")(origin_field_name)
return self
[docs] def proctime(self) -> 'Schema':
"""
Specifies the previously defined field as a processing-time attribute.
E.g. field("proctime", Types.SQL_TIMESTAMP_LTZ).proctime()
:return: This schema object.
"""
self._j_schema = self._j_schema.proctime()
return self
[docs] def rowtime(self, rowtime: Rowtime) -> 'Schema':
"""
Specifies the previously defined field as an event-time attribute.
E.g. field("rowtime", Types.SQL_TIMESTAMP).rowtime(...)
:param rowtime: A :class:`RowTime`.
:return: This schema object.
"""
self._j_schema = self._j_schema.rowtime(rowtime._j_rowtime)
return self
class FormatDescriptor(Descriptor, metaclass=ABCMeta):
"""
Describes the format of data.
"""
def __init__(self, j_format_descriptor):
self._j_format_descriptor = j_format_descriptor
super(FormatDescriptor, self).__init__(self._j_format_descriptor)
[docs]class OldCsv(FormatDescriptor):
"""
Format descriptor for comma-separated values (CSV).
.. note::
This descriptor describes Flink's non-standard CSV table source/sink. In the future, the
descriptor will be replaced by a proper RFC-compliant version. Use the RFC-compliant `Csv`
format in the dedicated `flink-formats/flink-csv` module instead when writing to Kafka. Use
the old one for stream/batch filesystem operations for now.
.. note::
Deprecated: use the RFC-compliant `Csv` format instead when writing to Kafka.
"""
def __init__(self, schema=None, field_delimiter=None, line_delimiter=None, quote_character=None,
comment_prefix=None, ignore_parse_errors=False, ignore_first_line=False):
"""
Constructor of OldCsv format descriptor.
:param schema: Data type from :class:`DataTypes` that describes the schema.
:param field_delimiter: The field delimiter character.
:param line_delimiter: The line delimiter.
:param quote_character: The quote character.
:param comment_prefix: The prefix to indicate comments.
:param ignore_parse_errors: Skip records with parse error instead to fail. Throw an
exception by default.
:param ignore_first_line: Ignore the first line. Not skip the first line by default.
"""
gateway = get_gateway()
self._j_csv = gateway.jvm.OldCsv()
super(OldCsv, self).__init__(self._j_csv)
if schema is not None:
self.schema(schema)
if field_delimiter is not None:
self.field_delimiter(field_delimiter)
if line_delimiter is not None:
self.line_delimiter(line_delimiter)
if quote_character is not None:
self.quote_character(quote_character)
if comment_prefix is not None:
self.comment_prefix(comment_prefix)
if ignore_parse_errors:
self.ignore_parse_errors()
if ignore_first_line:
self.ignore_first_line()
[docs] def field_delimiter(self, delimiter: str) -> 'OldCsv':
"""
Sets the field delimiter, "," by default.
:param delimiter: The field delimiter.
:return: This :class:`OldCsv` object.
"""
self._j_csv = self._j_csv.fieldDelimiter(delimiter)
return self
[docs] def line_delimiter(self, delimiter: str) -> 'OldCsv':
r"""
Sets the line delimiter, "\\n" by default.
:param delimiter: The line delimiter.
:return: This :class:`OldCsv` object.
"""
self._j_csv = self._j_csv.lineDelimiter(delimiter)
return self
[docs] def schema(self, table_schema: 'TableSchema') -> 'OldCsv':
"""
Sets the schema with field names and the types. Required.
This method overwrites existing fields added with
:func:`~pyflink.table.descriptors.OldCsv.field`.
:param table_schema: The :class:`TableSchema` object.
:return: This :class:`OldCsv` object.
"""
self._j_csv = self._j_csv.schema(table_schema._j_table_schema)
return self
[docs] def field(self, field_name: str, field_type: Union[DataType, str]) -> 'OldCsv':
"""
Adds a format field with the field name and the data type or type string. Required.
This method can be called multiple times. The call order of this method defines
also the order of the fields in the format.
:param field_name: The field name.
:param field_type: The data type or type string of the field.
:return: This :class:`OldCsv` object.
"""
if isinstance(field_type, str):
self._j_csv = self._j_csv.field(field_name, field_type)
else:
self._j_csv = self._j_csv.field(field_name, _to_java_type(field_type))
return self
[docs] def quote_character(self, quote_character: str) -> 'OldCsv':
"""
Sets a quote character for String values, null by default.
:param quote_character: The quote character.
:return: This :class:`OldCsv` object.
"""
self._j_csv = self._j_csv.quoteCharacter(quote_character)
return self
[docs] def ignore_parse_errors(self) -> 'OldCsv':
"""
Skip records with parse error instead to fail. Throw an exception by default.
:return: This :class:`OldCsv` object.
"""
self._j_csv = self._j_csv.ignoreParseErrors()
return self
[docs] def ignore_first_line(self) -> 'OldCsv':
"""
Ignore the first line. Not skip the first line by default.
:return: This :class:`OldCsv` object.
"""
self._j_csv = self._j_csv.ignoreFirstLine()
return self
[docs]class Csv(FormatDescriptor):
"""
Format descriptor for comma-separated values (CSV).
This descriptor aims to comply with RFC-4180 ("Common Format and MIME Type for
Comma-Separated Values (CSV) Files") proposed by the Internet Engineering Task Force (IETF).
.. note::
This descriptor does not describe Flink's old non-standard CSV table
source/sink. Currently, this descriptor can be used when writing to Kafka. The old one is
still available as :class:`OldCsv` for stream/batch filesystem operations.
"""
def __init__(self, schema=None, field_delimiter=None, line_delimiter=None, quote_character=None,
allow_comments=False, ignore_parse_errors=False, array_element_delimiter=None,
escape_character=None, null_literal=None):
"""
Constructor of Csv format descriptor.
:param schema: Data type from :class:`DataTypes` that describes the schema.
:param field_delimiter: The field delimiter character.
:param line_delimiter: The line delimiter.
:param quote_character: The quote character.
:param allow_comments: Whether Ignores comment lines that start with '#' (disabled by
default).
:param ignore_parse_errors: Whether Skip records with parse error instead to fail. Throw an
exception by default.
:param array_element_delimiter: The array element delimiter.
:param escape_character: Escaping character (e.g. backslash).
:param null_literal: The null literal string.
"""
gateway = get_gateway()
self._j_csv = gateway.jvm.Csv()
super(Csv, self).__init__(self._j_csv)
if schema is not None:
self.schema(schema)
if field_delimiter is not None:
self.field_delimiter(field_delimiter)
if line_delimiter is not None:
self.line_delimiter(line_delimiter)
if quote_character is not None:
self.quote_character(quote_character)
if allow_comments:
self.allow_comments()
if ignore_parse_errors:
self.ignore_parse_errors()
if array_element_delimiter is not None:
self.array_element_delimiter(array_element_delimiter)
if escape_character is not None:
self.escape_character(escape_character)
if null_literal is not None:
self.null_literal(null_literal)
[docs] def field_delimiter(self, delimiter: str) -> 'Csv':
"""
Sets the field delimiter character (',' by default).
:param delimiter: The field delimiter character.
:return: This :class:`Csv` object.
"""
if not isinstance(delimiter, str) or len(delimiter) != 1:
raise TypeError("Only one-character string is supported!")
self._j_csv = self._j_csv.fieldDelimiter(delimiter)
return self
[docs] def line_delimiter(self, delimiter: str) -> 'Csv':
r"""
Sets the line delimiter ("\\n" by default; otherwise "\\r" or "\\r\\n" are allowed).
:param delimiter: The line delimiter.
:return: This :class:`Csv` object.
"""
self._j_csv = self._j_csv.lineDelimiter(delimiter)
return self
[docs] def quote_character(self, quote_character: str) -> 'Csv':
"""
Sets the field delimiter character (',' by default).
:param quote_character: The quote character.
:return: This :class:`Csv` object.
"""
if not isinstance(quote_character, str) or len(quote_character) != 1:
raise TypeError("Only one-character string is supported!")
self._j_csv = self._j_csv.quoteCharacter(quote_character)
return self
[docs] def ignore_parse_errors(self) -> 'Csv':
"""
Skip records with parse error instead to fail. Throw an exception by default.
:return: This :class:`Csv` object.
"""
self._j_csv = self._j_csv.ignoreParseErrors()
return self
[docs] def array_element_delimiter(self, delimiter: str) -> 'Csv':
"""
Sets the array element delimiter string for separating array or row element
values (";" by default).
:param delimiter: The array element delimiter.
:return: This :class:`Csv` object.
"""
self._j_csv = self._j_csv.arrayElementDelimiter(delimiter)
return self
[docs] def escape_character(self, escape_character: str) -> 'Csv':
"""
Sets the escape character for escaping values (disabled by default).
:param escape_character: Escaping character (e.g. backslash).
:return: This :class:`Csv` object.
"""
if not isinstance(escape_character, str) or len(escape_character) != 1:
raise TypeError("Only one-character string is supported!")
self._j_csv = self._j_csv.escapeCharacter(escape_character)
return self
[docs] def null_literal(self, null_literal: str) -> 'Csv':
"""
Sets the null literal string that is interpreted as a null value (disabled by default).
:param null_literal: The null literal string.
:return: This :class:`Csv` object.
"""
self._j_csv = self._j_csv.nullLiteral(null_literal)
return self
[docs] def schema(self, schema_data_type: DataType) -> 'Csv':
"""
Sets the format schema with field names and the types. Required if schema is not derived.
:param schema_data_type: Data type from :class:`DataTypes` that describes the schema.
:return: This :class:`Csv` object.
"""
self._j_csv = self._j_csv.schema(_to_java_type(schema_data_type))
return self
[docs] def derive_schema(self) -> 'Csv':
"""
Derives the format schema from the table's schema. Required if no format schema is defined.
This allows for defining schema information only once.
The names, types, and fields' order of the format are determined by the table's
schema. Time attributes are ignored if their origin is not a field. A "from" definition
is interpreted as a field renaming in the format.
:return: This :class:`Csv` object.
"""
self._j_csv = self._j_csv.deriveSchema()
return self
[docs]class Avro(FormatDescriptor):
"""
Format descriptor for Apache Avro records.
"""
def __init__(self, record_class=None, avro_schema=None):
"""
Constructor of Avro format descriptor.
:param record_class: The java fully-qualified class name of the Avro record.
:param avro_schema: Avro schema string.
"""
gateway = get_gateway()
self._j_avro = gateway.jvm.Avro()
super(Avro, self).__init__(self._j_avro)
if record_class is not None:
self.record_class(record_class)
if avro_schema is not None:
self.avro_schema(avro_schema)
[docs] def record_class(self, record_class: str) -> 'Avro':
"""
Sets the class of the Avro specific record.
:param record_class: The java fully-qualified class name of the Avro record.
:return: This object.
"""
gateway = get_gateway()
clz = gateway.jvm.Thread.currentThread().getContextClassLoader().loadClass(record_class)
self._j_avro = self._j_avro.recordClass(clz)
return self
[docs] def avro_schema(self, avro_schema: str) -> 'Avro':
"""
Sets the Avro schema for specific or generic Avro records.
:param avro_schema: Avro schema string.
:return: This object.
"""
self._j_avro = self._j_avro.avroSchema(avro_schema)
return self
[docs]class Json(FormatDescriptor):
"""
Format descriptor for JSON.
"""
def __init__(self, json_schema=None, schema=None, derive_schema=False):
"""
Constructor of Json format descriptor.
:param json_schema: The JSON schema string.
:param schema: Sets the schema using :class:`DataTypes` that describes the schema.
:param derive_schema: Derives the format schema from the table's schema described.
"""
gateway = get_gateway()
self._j_json = gateway.jvm.Json()
super(Json, self).__init__(self._j_json)
if json_schema is not None:
self.json_schema(json_schema)
if schema is not None:
self.schema(schema)
if derive_schema:
self.derive_schema()
[docs] def fail_on_missing_field(self, fail_on_missing_field: bool) -> 'Json':
"""
Sets flag whether to fail if a field is missing or not.
:param fail_on_missing_field: If set to ``True``, the operation fails if there is a missing
field.
If set to ``False``, a missing field is set to null.
:return: This object.
"""
if not isinstance(fail_on_missing_field, bool):
raise TypeError("Only bool value is supported!")
self._j_json = self._j_json.failOnMissingField(fail_on_missing_field)
return self
[docs] def ignore_parse_errors(self, ignore_parse_errors: bool) -> 'Json':
"""
Sets flag whether to fail when parsing json fails.
:param ignore_parse_errors: If set to true, the operation will ignore parse errors.
If set to false, the operation fails when parsing json fails.
:return: This object.
"""
if not isinstance(ignore_parse_errors, bool):
raise TypeError("Only bool value is supported!")
self._j_json = self._j_json.ignoreParseErrors(ignore_parse_errors)
return self
[docs] def json_schema(self, json_schema: str) -> 'Json':
"""
Sets the JSON schema string with field names and the types according to the JSON schema
specification: http://json-schema.org/specification.html
The schema might be nested.
:param json_schema: The JSON schema string.
:return: This object.
"""
self._j_json = self._j_json.jsonSchema(json_schema)
return self
[docs] def schema(self, schema_data_type: DataType) -> 'Json':
"""
Sets the schema using :class:`DataTypes`.
JSON objects are represented as ROW types.
The schema might be nested.
:param schema_data_type: Data type that describes the schema.
:return: This object.
"""
self._j_json = self._j_json.schema(_to_java_type(schema_data_type))
return self
[docs] def derive_schema(self) -> 'Json':
"""
Derives the format schema from the table's schema described.
This allows for defining schema information only once.
The names, types, and fields' order of the format are determined by the table's
schema. Time attributes are ignored if their origin is not a field. A "from" definition
is interpreted as a field renaming in the format.
:return: This object.
"""
self._j_json = self._j_json.deriveSchema()
return self
class ConnectorDescriptor(Descriptor, metaclass=ABCMeta):
"""
Describes a connector to an other system.
"""
def __init__(self, j_connector_descriptor):
self._j_connector_descriptor = j_connector_descriptor
super(ConnectorDescriptor, self).__init__(self._j_connector_descriptor)
[docs]class FileSystem(ConnectorDescriptor):
"""
Connector descriptor for a file system.
"""
def __init__(self, path=None):
"""
Constructor of FileSystem descriptor.
:param path: The path of a file or directory.
"""
gateway = get_gateway()
self._j_file_system = gateway.jvm.FileSystem()
super(FileSystem, self).__init__(self._j_file_system)
if path is not None:
self.path(path)
[docs] def path(self, path_str: str) -> 'FileSystem':
"""
Sets the path to a file or directory in a file system.
:param path_str: The path of a file or directory.
:return: This :class:`FileSystem` object.
"""
self._j_file_system = self._j_file_system.path(path_str)
return self
[docs]class Kafka(ConnectorDescriptor):
"""
Connector descriptor for the Apache Kafka message queue.
"""
def __init__(self, version=None, topic=None, properties=None, start_from_earliest=False,
start_from_latest=False, start_from_group_offsets=True,
start_from_specific_offsets_dict=None, start_from_timestamp=None,
sink_partitioner_fixed=None, sink_partitioner_round_robin=None,
custom_partitioner_class_name=None):
"""
Constructor of Kafka descriptor.
:param version: Kafka version. E.g., "0.8", "0.11", etc.
:param topic: The topic from which the table is read.
:param properties: The dict object contains configuration properties for the Kafka
consumer. Both the keys and values should be strings.
:param start_from_earliest: Specifies the consumer to start reading from the earliest offset
for all partitions.
:param start_from_latest: Specifies the consumer to start reading from the latest offset for
all partitions.
:param start_from_group_offsets: Specifies the consumer to start reading from any committed
group offsets found in Zookeeper / Kafka brokers.
:param start_from_specific_offsets_dict: Dict of specific_offsets that the key is int-type
partition id and value is int-type offset value.
:param start_from_timestamp: Specifies the consumer to start reading partitions from a
specified timestamp.
:param sink_partitioner_fixed: Configures how to partition records from Flink's partitions
into Kafka's partitions.
:param sink_partitioner_round_robin: Configures how to partition records from Flink's
partitions into Kafka's partitions.
:param custom_partitioner_class_name: Configures how to partition records from Flink's
partitions into Kafka's partitions.
"""
gateway = get_gateway()
self._j_kafka = gateway.jvm.Kafka()
super(Kafka, self).__init__(self._j_kafka)
if version is not None:
self.version(version)
if topic is not None:
self.topic(topic)
if properties is not None:
self.properties(properties)
if start_from_earliest:
self.start_from_earliest()
if start_from_latest:
self.start_from_latest()
if start_from_group_offsets:
self.start_from_group_offsets()
if start_from_specific_offsets_dict is not None:
self.start_from_specific_offsets(start_from_specific_offsets_dict)
if start_from_timestamp is not None:
self.start_from_timestamp(start_from_timestamp)
if sink_partitioner_fixed is not None and sink_partitioner_fixed is True:
self.sink_partitioner_fixed()
if sink_partitioner_round_robin is not None and sink_partitioner_round_robin is True:
self.sink_partitioner_round_robin()
if custom_partitioner_class_name is not None:
self.sink_partitioner_custom(custom_partitioner_class_name)
[docs] def version(self, version: str) -> 'Kafka':
"""
Sets the Kafka version to be used.
:param version: Kafka version. E.g., "0.8", "0.11", etc.
:return: This object.
"""
if not isinstance(version, str):
version = str(version)
self._j_kafka = self._j_kafka.version(version)
return self
[docs] def topic(self, topic: str) -> 'Kafka':
"""
Sets the topic from which the table is read.
:param topic: The topic from which the table is read.
:return: This object.
"""
self._j_kafka = self._j_kafka.topic(topic)
return self
[docs] def properties(self, property_dict: Dict[str, str]) -> 'Kafka':
"""
Sets the configuration properties for the Kafka consumer. Resets previously set properties.
:param property_dict: The dict object contains configuration properties for the Kafka
consumer. Both the keys and values should be strings.
:return: This object.
"""
gateway = get_gateway()
properties = gateway.jvm.java.util.Properties()
for key in property_dict:
properties.setProperty(key, property_dict[key])
self._j_kafka = self._j_kafka.properties(properties)
return self
[docs] def property(self, key: str, value: str) -> 'Kafka':
"""
Adds a configuration properties for the Kafka consumer.
:param key: Property key string for the Kafka consumer.
:param value: Property value string for the Kafka consumer.
:return: This object.
"""
self._j_kafka = self._j_kafka.property(key, value)
return self
[docs] def start_from_earliest(self) -> 'Kafka':
"""
Specifies the consumer to start reading from the earliest offset for all partitions.
This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
This method does not affect where partitions are read from when the consumer is restored
from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
savepoint, only the offsets in the restored state will be used.
:return: This object.
"""
self._j_kafka = self._j_kafka.startFromEarliest()
return self
[docs] def start_from_latest(self) -> 'Kafka':
"""
Specifies the consumer to start reading from the latest offset for all partitions.
This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
This method does not affect where partitions are read from when the consumer is restored
from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
savepoint, only the offsets in the restored state will be used.
:return: This object.
"""
self._j_kafka = self._j_kafka.startFromLatest()
return self
[docs] def start_from_group_offsets(self) -> 'Kafka':
"""
Specifies the consumer to start reading from any committed group offsets found
in Zookeeper / Kafka brokers. The "group.id" property must be set in the configuration
properties. If no offset can be found for a partition, the behaviour in "auto.offset.reset"
set in the configuration properties will be used for the partition.
This method does not affect where partitions are read from when the consumer is restored
from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
savepoint, only the offsets in the restored state will be used.
:return: This object.
"""
self._j_kafka = self._j_kafka.startFromGroupOffsets()
return self
[docs] def start_from_specific_offsets(self, specific_offsets_dict: Dict[int, int]) -> 'Kafka':
"""
Specifies the consumer to start reading partitions from specific offsets, set independently
for each partition. The specified offset should be the offset of the next record that will
be read from partitions. This lets the consumer ignore any committed group offsets in
Zookeeper / Kafka brokers.
If the provided map of offsets contains entries whose partition is not subscribed by the
consumer, the entry will be ignored. If the consumer subscribes to a partition that does
not exist in the provided map of offsets, the consumer will fallback to the default group
offset behaviour(see :func:`pyflink.table.descriptors.Kafka.start_from_group_offsets`)
for that particular partition.
If the specified offset for a partition is invalid, or the behaviour for that partition is
defaulted to group offsets but still no group offset could be found for it, then the
"auto.offset.reset" behaviour set in the configuration properties will be used for the
partition.
This method does not affect where partitions are read from when the consumer is restored
from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
savepoint, only the offsets in the restored state will be used.
:param specific_offsets_dict: Dict of specific_offsets that the key is int-type partition
id and value is int-type offset value.
:return: This object.
"""
for key in specific_offsets_dict:
self.start_from_specific_offset(key, specific_offsets_dict[key])
return self
[docs] def start_from_specific_offset(self, partition: int, specific_offset: int) -> 'Kafka':
"""
Configures to start reading partitions from specific offsets and specifies the given offset
for the given partition.
see :func:`pyflink.table.descriptors.Kafka.start_from_specific_offsets`
:param partition: Partition id.
:param specific_offset: Specified offset in given partition.
:return: This object.
"""
self._j_kafka = self._j_kafka.startFromSpecificOffset(int(partition), int(specific_offset))
return self
[docs] def start_from_timestamp(self, timestamp: int) -> 'Kafka':
"""
Specifies the consumer to start reading partitions from a specified timestamp.
The specified timestamp must be before the current timestamp.
This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
The consumer will look up the earliest offset whose timestamp is greater than or equal
to the specific timestamp from Kafka. If there's no such offset, the consumer will use the
latest offset to read data from kafka.
This method does not affect where partitions are read from when the consumer is restored
from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
savepoint, only the offsets in the restored state will be used.
:param timestamp timestamp for the startup offsets, as milliseconds from epoch.
:return: This object.
.. versionadded:: 1.11.0
"""
self._j_kafka = self._j_kafka.startFromTimestamp(int(timestamp))
return self
[docs] def sink_partitioner_fixed(self) -> 'Kafka':
"""
Configures how to partition records from Flink's partitions into Kafka's partitions.
This strategy ensures that each Flink partition ends up in one Kafka partition.
.. note::
One Kafka partition can contain multiple Flink partitions. Examples:
More Flink partitions than Kafka partitions. Some (or all) Kafka partitions contain
the output of more than one flink partition:
| Flink Sinks --------- Kafka Partitions
| 1 ----------------> 1
| 2 --------------/
| 3 -------------/
| 4 ------------/
Fewer Flink partitions than Kafka partitions:
| Flink Sinks --------- Kafka Partitions
| 1 ----------------> 1
| 2 ----------------> 2
| ................ 3
| ................ 4
| ................ 5
:return: This object.
"""
self._j_kafka = self._j_kafka.sinkPartitionerFixed()
return self
[docs] def sink_partitioner_round_robin(self) -> 'Kafka':
"""
Configures how to partition records from Flink's partitions into Kafka's partitions.
This strategy ensures that records will be distributed to Kafka partitions in a
round-robin fashion.
.. note::
This strategy is useful to avoid an unbalanced partitioning. However, it will cause a
lot of network connections between all the Flink instances and all the Kafka brokers.
:return: This object.
"""
self._j_kafka = self._j_kafka.sinkPartitionerRoundRobin()
return self
[docs] def sink_partitioner_custom(self, partitioner_class_name: str) -> 'Kafka':
"""
Configures how to partition records from Flink's partitions into Kafka's partitions.
This strategy allows for a custom partitioner by providing an implementation
of ``FlinkKafkaPartitioner``.
:param partitioner_class_name: The java canonical class name of the FlinkKafkaPartitioner.
The FlinkKafkaPartitioner must have a public no-argument
constructor and can be founded by in current Java
classloader.
:return: This object.
"""
gateway = get_gateway()
self._j_kafka = self._j_kafka.sinkPartitionerCustom(
gateway.jvm.Thread.currentThread().getContextClassLoader()
.loadClass(partitioner_class_name))
return self
[docs]class Elasticsearch(ConnectorDescriptor):
"""
Connector descriptor for the Elasticsearch search engine.
"""
def __init__(self, version=None, hostname=None, port=None, protocol=None, index=None,
document_type=None, key_delimiter=None, key_null_literal=None,
failure_handler_fail=False, failure_handler_ignore=False,
failure_handler_retry_rejected=False, custom_failure_handler_class_name=None,
disable_flush_on_checkpoint=False, bulk_flush_max_actions=None,
bulk_flush_max_size=None, bulk_flush_interval=None,
bulk_flush_backoff_constant=False, bulk_flush_backoff_exponential=False,
bulk_flush_backoff_max_retries=None, bulk_flush_backoff_delay=None,
connection_max_retry_timeout=None, connection_path_prefix=None):
"""
Constructor of Elasticsearch descriptor.
:param version: Elasticsearch version. E.g., "6".
:param hostname: Connection hostname.
:param port: Connection port.
:param protocol: Connection protocol; e.g. "http".
:param index: Elasticsearch index.
:param document_type: Elasticsearch document type.
:param key_delimiter: Key delimiter; e.g., "$" would result in IDs "KEY1$KEY2$KEY3".
:param key_null_literal: key null literal string; e.g. "N/A" would result in IDs
"KEY1_N/A_KEY3".
:param failure_handler_fail: Configures a failure handling strategy in case a request to
Elasticsearch fails.
:param failure_handler_ignore: Configures a failure handling strategy in case a request to
Elasticsearch fails.
:param failure_handler_retry_rejected: Configures a failure handling strategy in case a
request to Elasticsearch fails.
:param custom_failure_handler_class_name: Configures a failure handling strategy in case a
request to Elasticsearch fails.
:param disable_flush_on_checkpoint: Disables flushing on checkpoint
:param bulk_flush_max_actions: the maximum number of actions to buffer per bulk request.
:param bulk_flush_max_size: The maximum size. E.g. "42 mb". only MB granularity is
supported.
:param bulk_flush_interval: Bulk flush interval (in milliseconds).
:param bulk_flush_backoff_constant: Configures how to buffer elements before sending them in
bulk to the cluster for efficiency.
:param bulk_flush_backoff_exponential: Configures how to buffer elements before sending them
in bulk to the cluster for efficiency.
:param bulk_flush_backoff_max_retries: The maximum number of retries.
:param bulk_flush_backoff_delay: Delay between each backoff attempt (in milliseconds).
:param connection_max_retry_timeout: Maximum timeout (in milliseconds).
:param connection_path_prefix: Prefix string to be added to every REST communication.
"""
gateway = get_gateway()
self._j_elasticsearch = gateway.jvm.Elasticsearch()
super(Elasticsearch, self).__init__(self._j_elasticsearch)
if version is not None:
self.version(version)
if hostname is not None:
self.host(hostname, port, protocol)
if index is not None:
self.index(index)
if document_type is not None:
self.document_type(document_type)
if key_delimiter is not None:
self.key_delimiter(key_delimiter)
if key_null_literal is not None:
self.key_null_literal(key_null_literal)
if failure_handler_fail:
self.failure_handler_fail()
if failure_handler_ignore:
self.failure_handler_ignore()
if failure_handler_retry_rejected:
self.failure_handler_retry_rejected()
if custom_failure_handler_class_name is not None:
self.failure_handler_custom(custom_failure_handler_class_name)
if disable_flush_on_checkpoint:
self.disable_flush_on_checkpoint()
if bulk_flush_max_actions is not None:
self.bulk_flush_max_actions(bulk_flush_max_actions)
if bulk_flush_max_size is not None:
self.bulk_flush_max_size(bulk_flush_max_size)
if bulk_flush_interval is not None:
self.bulk_flush_interval(bulk_flush_interval)
if bulk_flush_backoff_constant:
self.bulk_flush_backoff_constant()
if bulk_flush_backoff_exponential:
self.bulk_flush_backoff_exponential()
if bulk_flush_backoff_max_retries is not None:
self.bulk_flush_backoff_max_retries(bulk_flush_backoff_max_retries)
if bulk_flush_backoff_delay is not None:
self.bulk_flush_backoff_delay(bulk_flush_backoff_delay)
if connection_max_retry_timeout is not None:
self.connection_max_retry_timeout(connection_max_retry_timeout)
if connection_path_prefix is not None:
self.connection_path_prefix(connection_path_prefix)
[docs] def version(self, version: str) -> 'Elasticsearch':
"""
Sets the Elasticsearch version to be used. Required.
:param version: Elasticsearch version. E.g., "6".
:return: This object.
"""
if not isinstance(version, str):
version = str(version)
self._j_elasticsearch = self._j_elasticsearch.version(version)
return self
[docs] def host(self, hostname: str, port: Union[int, str], protocol: str) -> 'Elasticsearch':
"""
Adds an Elasticsearch host to connect to. Required.
Multiple hosts can be declared by calling this method multiple times.
:param hostname: Connection hostname.
:param port: Connection port.
:param protocol: Connection protocol; e.g. "http".
:return: This object.
"""
self._j_elasticsearch = self._j_elasticsearch.host(hostname, int(port), protocol)
return self
[docs] def index(self, index: str) -> 'Elasticsearch':
"""
Declares the Elasticsearch index for every record. Required.
:param index: Elasticsearch index.
:return: This object.
"""
self._j_elasticsearch = self._j_elasticsearch.index(index)
return self
[docs] def document_type(self, document_type: str) -> 'Elasticsearch':
"""
Declares the Elasticsearch document type for every record. Required.
:param document_type: Elasticsearch document type.
:return: This object.
"""
self._j_elasticsearch = self._j_elasticsearch.documentType(document_type)
return self
[docs] def key_delimiter(self, key_delimiter: str) -> 'Elasticsearch':
"""
Sets a custom key delimiter in case the Elasticsearch ID needs to be constructed from
multiple fields. Optional.
:param key_delimiter: Key delimiter; e.g., "$" would result in IDs "KEY1$KEY2$KEY3".
:return: This object.
"""
self._j_elasticsearch = self._j_elasticsearch.keyDelimiter(key_delimiter)
return self
[docs] def key_null_literal(self, key_null_literal: str) -> 'Elasticsearch':
"""
Sets a custom representation for null fields in keys. Optional.
:param key_null_literal: key null literal string; e.g. "N/A" would result in IDs
"KEY1_N/A_KEY3".
:return: This object.
"""
self._j_elasticsearch = self._j_elasticsearch.keyNullLiteral(key_null_literal)
return self
[docs] def failure_handler_fail(self) -> 'Elasticsearch':
"""
Configures a failure handling strategy in case a request to Elasticsearch fails.
This strategy throws an exception if a request fails and thus causes a job failure.
:return: This object.
"""
self._j_elasticsearch = self._j_elasticsearch.failureHandlerFail()
return self
[docs] def failure_handler_ignore(self) -> 'Elasticsearch':
"""
Configures a failure handling strategy in case a request to Elasticsearch fails.
This strategy ignores failures and drops the request.
:return: This object.
"""
self._j_elasticsearch = self._j_elasticsearch.failureHandlerIgnore()
return self
[docs] def failure_handler_retry_rejected(self) -> 'Elasticsearch':
"""
Configures a failure handling strategy in case a request to Elasticsearch fails.
This strategy re-adds requests that have failed due to queue capacity saturation.
:return: This object.
"""
self._j_elasticsearch = self._j_elasticsearch.failureHandlerRetryRejected()
return self
[docs] def failure_handler_custom(self, failure_handler_class_name: str) -> 'Elasticsearch':
"""
Configures a failure handling strategy in case a request to Elasticsearch fails.
This strategy allows for custom failure handling using a ``ActionRequestFailureHandler``.
:param failure_handler_class_name:
:return: This object.
"""
gateway = get_gateway()
self._j_elasticsearch = self._j_elasticsearch.failureHandlerCustom(
gateway.jvm.Thread.currentThread().getContextClassLoader()
.loadClass(failure_handler_class_name))
return self
[docs] def disable_flush_on_checkpoint(self) -> 'Elasticsearch':
"""
Disables flushing on checkpoint. When disabled, a sink will not wait for all pending action
requests to be acknowledged by Elasticsearch on checkpoints.
.. note::
If flushing on checkpoint is disabled, a Elasticsearch sink does NOT
provide any strong guarantees for at-least-once delivery of action requests.
:return: This object.
"""
self._j_elasticsearch = self._j_elasticsearch.disableFlushOnCheckpoint()
return self
[docs] def bulk_flush_max_actions(self, max_actions_num: int) -> 'Elasticsearch':
"""
Configures how to buffer elements before sending them in bulk to the cluster for
efficiency.
Sets the maximum number of actions to buffer for each bulk request.
:param max_actions_num: the maximum number of actions to buffer per bulk request.
:return: This object.
"""
self._j_elasticsearch = self._j_elasticsearch.bulkFlushMaxActions(int(max_actions_num))
return self
[docs] def bulk_flush_max_size(self, max_size: int) -> 'Elasticsearch':
"""
Configures how to buffer elements before sending them in bulk to the cluster for
efficiency.
Sets the maximum size of buffered actions per bulk request (using the syntax of
MemorySize).
:param max_size: The maximum size. E.g. "42 mb". only MB granularity is supported.
:return: This object.
"""
self._j_elasticsearch = self._j_elasticsearch.bulkFlushMaxSize(max_size)
return self
[docs] def bulk_flush_interval(self, interval: int) -> 'Elasticsearch':
"""
Configures how to buffer elements before sending them in bulk to the cluster for
efficiency.
Sets the bulk flush interval (in milliseconds).
:param interval: Bulk flush interval (in milliseconds).
:return: This object.
"""
self._j_elasticsearch = self._j_elasticsearch.bulkFlushInterval(int(interval))
return self
[docs] def bulk_flush_backoff_constant(self) -> 'Elasticsearch':
"""
Configures how to buffer elements before sending them in bulk to the cluster for
efficiency.
Sets a constant backoff type to use when flushing bulk requests.
:return: This object.
"""
self._j_elasticsearch = self._j_elasticsearch.bulkFlushBackoffConstant()
return self
[docs] def bulk_flush_backoff_exponential(self) -> 'Elasticsearch':
"""
Configures how to buffer elements before sending them in bulk to the cluster for
efficiency.
Sets an exponential backoff type to use when flushing bulk requests.
:return: This object.
"""
self._j_elasticsearch = self._j_elasticsearch.bulkFlushBackoffExponential()
return self
[docs] def bulk_flush_backoff_max_retries(self, max_retries: int) -> 'Elasticsearch':
"""
Configures how to buffer elements before sending them in bulk to the cluster for
efficiency.
Sets the maximum number of retries for a backoff attempt when flushing bulk requests.
Make sure to enable backoff by selecting a strategy (
:func:`pyflink.table.descriptors.Elasticsearch.bulk_flush_backoff_constant` or
:func:`pyflink.table.descriptors.Elasticsearch.bulk_flush_backoff_exponential`).
:param max_retries: The maximum number of retries.
:return: This object.
"""
self._j_elasticsearch = self._j_elasticsearch.bulkFlushBackoffMaxRetries(int(max_retries))
return self
[docs] def bulk_flush_backoff_delay(self, delay: int) -> 'Elasticsearch':
"""
Configures how to buffer elements before sending them in bulk to the cluster for
efficiency.
Sets the amount of delay between each backoff attempt when flushing bulk requests
(in milliseconds).
Make sure to enable backoff by selecting a strategy (
:func:`pyflink.table.descriptors.Elasticsearch.bulk_flush_backoff_constant` or
:func:`pyflink.table.descriptors.Elasticsearch.bulk_flush_backoff_exponential`).
:param delay: Delay between each backoff attempt (in milliseconds).
:return: This object.
"""
self._j_elasticsearch = self._j_elasticsearch.bulkFlushBackoffDelay(int(delay))
return self
[docs] def connection_max_retry_timeout(self, max_retry_timeout: int) -> 'Elasticsearch':
"""
Sets connection properties to be used during REST communication to Elasticsearch.
Sets the maximum timeout (in milliseconds) in case of multiple retries of the same request.
:param max_retry_timeout: Maximum timeout (in milliseconds).
:return: This object.
"""
self._j_elasticsearch = self._j_elasticsearch.connectionMaxRetryTimeout(
int(max_retry_timeout))
return self
[docs] def connection_path_prefix(self, path_prefix: str) -> 'Elasticsearch':
"""
Sets connection properties to be used during REST communication to Elasticsearch.
Adds a path prefix to every REST communication.
:param path_prefix: Prefix string to be added to every REST communication.
:return: This object.
"""
self._j_elasticsearch = self._j_elasticsearch.connectionPathPrefix(path_prefix)
return self
[docs]class HBase(ConnectorDescriptor):
"""
Connector descriptor for Apache HBase.
.. versionadded:: 1.11.0
"""
def __init__(self, version=None, table_name=None, zookeeper_quorum=None,
zookeeper_node_parent=None, write_buffer_flush_max_size=None,
write_buffer_flush_max_rows=None, write_buffer_flush_interval=None):
"""
Constructor of HBase descriptor.
:param version: HBase version. E.g., "1.4.3".
:param table_name: Name of HBase table. E.g., "testNamespace:testTable", "testDefaultTable"
:param zookeeper_quorum: zookeeper quorum address to connect the HBase cluster. E.g.,
"localhost:2181,localhost:2182,localhost:2183"
:param zookeeper_node_parent: zookeeper node path of hbase cluster. E.g,
"/hbase/example-root-znode".
:param write_buffer_flush_max_size: the maximum size.
:param write_buffer_flush_max_rows: number of added rows when begin the request flushing.
:param write_buffer_flush_interval: flush interval. The string should be in format
"{length value}{time unit label}" E.g, "123ms", "1 s",
if not time unit label is specified, it will be
considered as milliseconds.
"""
gateway = get_gateway()
self._j_hbase = gateway.jvm.HBase()
super(HBase, self).__init__(self._j_hbase)
if version is not None:
self.version(version)
if table_name is not None:
self.table_name(table_name)
if zookeeper_quorum is not None:
self.zookeeper_quorum(zookeeper_quorum)
if zookeeper_node_parent is not None:
self.zookeeper_node_parent(zookeeper_node_parent)
if write_buffer_flush_max_size is not None:
self.write_buffer_flush_max_size(write_buffer_flush_max_size)
if write_buffer_flush_max_rows is not None:
self.write_buffer_flush_max_rows(write_buffer_flush_max_rows)
if write_buffer_flush_interval is not None:
self.write_buffer_flush_interval(write_buffer_flush_interval)
[docs] def version(self, version: str) -> 'HBase':
"""
Set the Apache HBase version to be used, Required.
:param version: HBase version. E.g., "1.4.3".
:return: This object.
.. versionadded:: 1.11.0
"""
if not isinstance(version, str):
version = str(version)
self._j_hbase = self._j_hbase.version(version)
return self
[docs] def table_name(self, table_name: str) -> 'HBase':
"""
Set the HBase table name, Required.
:param table_name: Name of HBase table. E.g., "testNamespace:testTable", "testDefaultTable"
:return: This object.
.. versionadded:: 1.11.0
"""
self._j_hbase = self._j_hbase.tableName(table_name)
return self
[docs] def zookeeper_quorum(self, zookeeper_quorum: str) -> 'HBase':
"""
Set the zookeeper quorum address to connect the HBase cluster, Required.
:param zookeeper_quorum: zookeeper quorum address to connect the HBase cluster. E.g.,
"localhost:2181,localhost:2182,localhost:2183"
:return: This object.
.. versionadded:: 1.11.0
"""
self._j_hbase = self._j_hbase.zookeeperQuorum(zookeeper_quorum)
return self
[docs] def zookeeper_node_parent(self, zookeeper_node_parent: str) -> 'HBase':
"""
Set the zookeeper node parent path of HBase cluster. Default to use "/hbase", Optional.
:param zookeeper_node_parent: zookeeper node path of hbase cluster. E.g,
"/hbase/example-root-znode".
:return: This object
.. versionadded:: 1.11.0
"""
self._j_hbase = self._j_hbase.zookeeperNodeParent(zookeeper_node_parent)
return self
[docs] def write_buffer_flush_max_size(self, max_size: Union[int, str]) -> 'HBase':
"""
Set threshold when to flush buffered request based on the memory byte size of rows currently
added.
:param max_size: the maximum size.
:return: This object.
.. versionadded:: 1.11.0
"""
if not isinstance(max_size, str):
max_size = str(max_size)
self._j_hbase = self._j_hbase.writeBufferFlushMaxSize(max_size)
return self
[docs] def write_buffer_flush_max_rows(self, write_buffer_flush_max_rows: int) -> 'HBase':
"""
Set threshold when to flush buffered request based on the number of rows currently added.
Defaults to not set, i.e. won;t flush based on the number of buffered rows, Optional.
:param write_buffer_flush_max_rows: number of added rows when begin the request flushing.
:return: This object.
.. versionadded:: 1.11.0
"""
self._j_hbase = self._j_hbase.writeBufferFlushMaxRows(write_buffer_flush_max_rows)
return self
[docs] def write_buffer_flush_interval(self, interval: Union[str, int]) -> 'HBase':
"""
Set an interval when to flushing buffered requesting if the interval passes, in
milliseconds.
Defaults to not set, i.e. won't flush based on flush interval, Optional.
:param interval: flush interval. The string should be in format
"{length value}{time unit label}" E.g, "123ms", "1 s", if not time unit
label is specified, it will be considered as milliseconds.
:return: This object.
.. versionadded:: 1.11.0
"""
if not isinstance(interval, str):
interval = str(interval)
self._j_hbase = self._j_hbase.writeBufferFlushInterval(interval)
return self
[docs]class CustomConnectorDescriptor(ConnectorDescriptor):
"""
Describes a custom connector to an other system.
"""
def __init__(self, type, version, format_needed):
"""
Constructs a :class:`CustomConnectorDescriptor`.
:param type: String that identifies this connector.
:param version: Property version for backwards compatibility.
:param format_needed: Flag for basic validation of a needed format descriptor.
"""
if not isinstance(type, str):
raise TypeError("type must be of type str.")
if not isinstance(version, int):
raise TypeError("version must be of type int.")
if not isinstance(format_needed, bool):
raise TypeError("format_needed must be of type bool.")
gateway = get_gateway()
super(CustomConnectorDescriptor, self).__init__(
gateway.jvm.CustomConnectorDescriptor(type, version, format_needed))
[docs] def property(self, key: str, value: str) -> 'CustomConnectorDescriptor':
"""
Adds a configuration property for the connector.
:param key: The property key to be set.
:param value: The property value to be set.
:return: This object.
"""
if not isinstance(key, str):
raise TypeError("key must be of type str.")
if not isinstance(value, str):
raise TypeError("value must be of type str.")
self._j_connector_descriptor = self._j_connector_descriptor.property(key, value)
return self
[docs] def properties(self, property_dict: Dict[str, str]) -> 'CustomConnectorDescriptor':
"""
Adds a set of properties for the connector.
:param property_dict: The dict object contains configuration properties for the connector.
Both the keys and values should be strings.
:return: This object.
"""
if not isinstance(property_dict, dict):
raise TypeError("property_dict must be of type dict.")
self._j_connector_descriptor = self._j_connector_descriptor.properties(property_dict)
return self
[docs]class ConnectTableDescriptor(Descriptor, metaclass=ABCMeta):
"""
Common class for table's created with :class:`pyflink.table.TableEnvironment.connect`.
"""
def __init__(self, j_connect_table_descriptor):
self._j_connect_table_descriptor = j_connect_table_descriptor
super(ConnectTableDescriptor, self).__init__(self._j_connect_table_descriptor)
[docs] def with_schema(self, schema: Schema) -> 'ConnectTableDescriptor':
"""
Specifies the resulting table schema.
:type schema: The :class:`Schema` object for the resulting table.
:return: This object.
"""
self._j_connect_table_descriptor = \
self._j_connect_table_descriptor.withSchema(schema._j_schema)
return self
[docs] def create_temporary_table(self, path: str) -> 'ConnectTableDescriptor':
"""
Registers the table described by underlying properties in a given path.
There is no distinction between source and sink at the descriptor level anymore as this
method does not perform actual class lookup. It only stores the underlying properties. The
actual source/sink lookup is performed when the table is used.
Temporary objects can shadow permanent ones. If a permanent object in a given path exists,
it will be inaccessible in the current session. To make the permanent object available
again you can drop the corresponding temporary object.
.. note:: The schema must be explicitly defined.
:param path: path where to register the temporary table
.. versionadded:: 1.10.0
"""
self._j_connect_table_descriptor.createTemporaryTable(path)
return self
[docs]class StreamTableDescriptor(ConnectTableDescriptor):
"""
Descriptor for specifying a table source and/or sink in a streaming environment.
.. seealso:: parent class: :class:`ConnectTableDescriptor`
"""
def __init__(self, j_stream_table_descriptor, in_append_mode=False, in_retract_mode=False,
in_upsert_mode=False):
"""
Constructor of StreamTableDescriptor.
:param j_stream_table_descriptor: the corresponding stream table descriptor in java.
:param in_append_mode: Declares the conversion between a dynamic table and external
connector in append mode.
:param in_retract_mode: Declares the conversion between a dynamic table and external
connector in retract mode.
:param in_upsert_mode: Declares the conversion between a dynamic table and external
connector in upsert mode.
"""
self._j_stream_table_descriptor = j_stream_table_descriptor
super(StreamTableDescriptor, self).__init__(self._j_stream_table_descriptor)
if in_append_mode:
self.in_append_mode()
if in_retract_mode:
self.in_retract_mode()
if in_upsert_mode:
self.in_upsert_mode()
[docs] def in_append_mode(self) -> 'StreamTableDescriptor':
"""
Declares how to perform the conversion between a dynamic table and an external connector.
In append mode, a dynamic table and an external connector only exchange INSERT messages.
:return: This object.
"""
self._j_stream_table_descriptor = self._j_stream_table_descriptor.inAppendMode()
return self
[docs] def in_retract_mode(self) -> 'StreamTableDescriptor':
"""
Declares how to perform the conversion between a dynamic table and an external connector.
In retract mode, a dynamic table and an external connector exchange ADD and RETRACT
messages.
An INSERT change is encoded as an ADD message, a DELETE change as a RETRACT message, and an
UPDATE change as a RETRACT message for the updated (previous) row and an ADD message for
the updating (new) row.
In this mode, a key must not be defined as opposed to upsert mode. However, every update
consists of two messages which is less efficient.
:return: This object.
"""
self._j_stream_table_descriptor = self._j_stream_table_descriptor.inRetractMode()
return self
[docs] def in_upsert_mode(self) -> 'StreamTableDescriptor':
"""
Declares how to perform the conversion between a dynamic table and an external connector.
In upsert mode, a dynamic table and an external connector exchange UPSERT and DELETE
messages.
This mode requires a (possibly composite) unique key by which updates can be propagated. The
external connector needs to be aware of the unique key attribute in order to apply messages
correctly. INSERT and UPDATE changes are encoded as UPSERT messages. DELETE changes as
DELETE messages.
The main difference to a retract stream is that UPDATE changes are encoded with a single
message and are therefore more efficient.
:return: This object.
"""
self._j_stream_table_descriptor = self._j_stream_table_descriptor.inUpsertMode()
return self
[docs]class BatchTableDescriptor(ConnectTableDescriptor):
"""
Descriptor for specifying a table source and/or sink in a batch environment.
.. seealso:: parent class: :class:`ConnectTableDescriptor`
"""
def __init__(self, j_batch_table_descriptor):
self._j_batch_table_descriptor = j_batch_table_descriptor
super(BatchTableDescriptor, self).__init__(self._j_batch_table_descriptor)