Source code for pyflink.table.sources

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

from pyflink.java_gateway import get_gateway
from pyflink.table.types import _to_java_type

__all__ = ['TableSource', 'CsvTableSource']


[docs]class TableSource(object): """ Defines a table from an external system or location. """ def __init__(self, j_table_source): self._j_table_source = j_table_source
[docs]class CsvTableSource(TableSource): """ A :class:`TableSource` for simple CSV files with a (logically) unlimited number of fields. Example: :: >>> CsvTableSource("/csv/file/path", ["a", "b"], [DataTypes.INT(), DataTypes.STRING()]) :param source_path: The path to the CSV file. :type source_path: str :param field_names: The names of the table fields. :type field_names: collections.Iterable[str] :param field_types: The types of the table fields. :type field_types: collections.Iterable[str] :param field_delim: The field delimiter, "," by default. :type field_delim: str, optional :param line_delim: The row delimiter, "\\n" by default. :type line_delim: str, optional :param quote_character: An optional quote character for String values, null by default. :type quote_character: str, optional :param ignore_first_line: Flag to ignore the first line, false by default. :type ignore_first_line: bool, optional :param ignore_comments: An optional prefix to indicate comments, null by default. :type ignore_comments: str, optional :param lenient: Flag to skip records with parse error instead to fail, false by default. :type lenient: bool, optional :param empty_column_as_null: Treat empty column as null, false by default. :type empty_column_as_null: bool, optional """ def __init__( self, source_path, field_names, field_types, field_delim=None, line_delim=None, quote_character=None, ignore_first_line=None, ignore_comments=None, lenient=None, empty_column_as_null=None, ): gateway = get_gateway() builder = gateway.jvm.CsvTableSource.builder() builder.path(source_path) for (field_name, field_type) in zip(field_names, field_types): builder.field(field_name, _to_java_type(field_type)) if field_delim is not None: builder.fieldDelimiter(field_delim) if line_delim is not None: builder.lineDelimiter(line_delim) if quote_character is not None: # Java API has a Character type for this field. At time of writing, # Py4J will convert the Python str to Java Character by taking only # the first character. This results in either: # - Silently truncating a Python str with more than one character # with no further type error from either Py4J or Java # CsvTableSource # - java.lang.StringIndexOutOfBoundsException from Py4J for an # empty Python str. That error can be made more friendly here. if len(quote_character) != 1: raise ValueError( "Expected a single CSV quote character but got '{}'".format(quote_character) ) builder.quoteCharacter(quote_character) if ignore_first_line: builder.ignoreFirstLine() if ignore_comments is not None: builder.commentPrefix(ignore_comments) if lenient: builder.ignoreParseErrors() if empty_column_as_null: builder.emptyColumnAsNull() super(CsvTableSource, self).__init__(builder.build())