################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from typing import Optional, TYPE_CHECKING
from pyflink.common.serialization import BulkWriterFactory, RowDataBulkWriterFactory, \
SerializationSchema, DeserializationSchema
from pyflink.common.typeinfo import TypeInformation
from pyflink.datastream.connectors.file_system import StreamFormat
from pyflink.java_gateway import get_gateway
if TYPE_CHECKING:
from pyflink.table.types import DataType, RowType, NumericType
__all__ = [
'CsvSchema',
'CsvSchemaBuilder',
'CsvReaderFormat',
'CsvBulkWriters',
'CsvRowDeserializationSchema',
'CsvRowSerializationSchema'
]
[docs]class CsvSchema(object):
"""
CsvSchema holds schema information of a csv file, corresponding to Java
``com.fasterxml.jackson.dataformat.csv.CsvSchema`` class.
.. versionadded:: 1.16.0
"""
def __init__(self, j_schema, row_type: 'RowType'):
self._j_schema = j_schema
self._row_type = row_type
self._type_info = None
@staticmethod
def builder() -> 'CsvSchemaBuilder':
"""
Returns a :class:`CsvSchemaBuilder`.
"""
return CsvSchemaBuilder()
def size(self):
return self._j_schema.size()
def __len__(self):
return self.size()
def __str__(self):
return self._j_schema.toString()
def __repr__(self):
return str(self)
[docs]class CsvSchemaBuilder(object):
"""
CsvSchemaBuilder is for building a :class:`~CsvSchema`, corresponding to Java
``com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder`` class.
.. versionadded:: 1.16.0
"""
def __init__(self):
jvm = get_gateway().jvm
self._j_schema_builder = jvm.org.apache.flink.shaded.jackson2.com.fasterxml.jackson \
.dataformat.csv.CsvSchema.builder()
self._fields = []
def build(self) -> 'CsvSchema':
"""
Build the :class:`~CsvSchema`.
"""
from pyflink.table.types import DataTypes
return CsvSchema(self._j_schema_builder.build(), DataTypes.ROW(self._fields))
def add_array_column(self,
name: str,
separator: str = ';',
element_type: Optional['DataType'] = None) \
-> 'CsvSchemaBuilder':
"""
Add an array column to schema, the type of elements could be specified via ``element_type``,
which should be primitive types.
:param name: Name of the column.
:param separator: Text separator of array elements, default to ``;``.
:param element_type: DataType of array elements, default to ``DataTypes.STRING()``.
"""
from pyflink.table.types import DataTypes
if element_type is None:
element_type = DataTypes.STRING()
self._j_schema_builder.addArrayColumn(name, separator)
self._fields.append(DataTypes.FIELD(name, DataTypes.ARRAY(element_type)))
return self
def add_boolean_column(self, name: str) -> 'CsvSchemaBuilder':
"""
Add a boolean column to schema, with type as ``DataTypes.BOOLEAN()``.
:param name: Name of the column.
"""
from pyflink.table.types import DataTypes
self._j_schema_builder.addBooleanColumn(name)
self._fields.append(DataTypes.FIELD(name, DataTypes.BOOLEAN()))
return self
def add_number_column(self, name: str,
number_type: Optional['NumericType'] = None) \
-> 'CsvSchemaBuilder':
"""
Add a number column to schema, the type of number could be specified via ``number_type``.
:param name: Name of the column.
:param number_type: DataType of the number, default to ``DataTypes.BIGINT()``.
"""
from pyflink.table.types import DataTypes
if number_type is None:
number_type = DataTypes.BIGINT()
self._j_schema_builder.addNumberColumn(name)
self._fields.append(DataTypes.FIELD(name, number_type))
return self
def add_string_column(self, name: str) -> 'CsvSchemaBuilder':
"""
Add a string column to schema, with type as ``DataTypes.STRING()``.
:param name: Name of the column.
"""
from pyflink.table.types import DataTypes
self._j_schema_builder.addColumn(name)
self._fields.append(DataTypes.FIELD(name, DataTypes.STRING()))
return self
def add_columns_from(self, schema: 'CsvSchema') -> 'CsvSchemaBuilder':
"""
Add all columns in ``schema`` to current schema.
:param schema: Another :class:`CsvSchema`.
"""
self._j_schema_builder.addColumnsFrom(schema._j_schema)
for field in schema._row_type:
self._fields.append(field)
return self
def clear_columns(self):
"""
Delete all columns in the schema.
"""
self._j_schema_builder.clearColumns()
self._fields.clear()
return self
def set_allow_comments(self, allow: bool = True):
"""
Allow using ``#`` prefixed comments in csv file.
"""
self._j_schema_builder.setAllowComments(allow)
return self
def set_any_property_name(self, name: str):
self._j_schema_builder.setAnyPropertyName(name)
return self
def disable_array_element_separator(self):
"""
Set array element separator to ``""``.
"""
self._j_schema_builder.disableArrayElementSeparator()
return self
def remove_array_element_separator(self, index: int):
"""
Set array element separator of a column specified by ``index`` to ``""``.
"""
self._j_schema_builder.removeArrayElementSeparator(index)
return self
def set_array_element_separator(self, separator: str):
"""
Set global array element separator, default to ``;``.
"""
self._j_schema_builder.setArrayElementSeparator(separator)
return self
def set_column_separator(self, char: str):
"""
Set column separator, ``char`` should be a single char, default to ``,``.
"""
if len(char) != 1:
raise ValueError('Column separator must be a single char, got {}'.format(char))
self._j_schema_builder.setColumnSeparator(char)
return self
def disable_escape_char(self):
"""
Disable escaping in csv file.
"""
self._j_schema_builder.disableEscapeChar()
return self
def set_escape_char(self, char: str):
"""
Set escape char, ``char`` should be a single char, default to no-escaping.
"""
if len(char) != 1:
raise ValueError('Escape char must be a single char, got {}'.format(char))
self._j_schema_builder.setEscapeChar(char)
return self
def set_line_separator(self, separator: str):
"""
Set line separator, default to ``\\n``. This is only configurable for writing, for reading,
``\\n``, ``\\r``, ``\\r\\n`` are recognized.
"""
self._j_schema_builder.setLineSeparator(separator)
return self
def set_null_value(self, null_value: str):
"""
Set literal for null value, default to empty sequence.
"""
self._j_schema_builder.setNullValue(null_value)
def disable_quote_char(self):
"""
Disable quote char.
"""
self._j_schema_builder.disableQuoteChar()
return self
def set_quote_char(self, char: str):
"""
Set quote char, default to ``"``.
"""
if len(char) != 1:
raise ValueError('Quote char must be a single char, got {}'.format(char))
self._j_schema_builder.setQuoteChar(char)
return self
def set_skip_first_data_row(self, skip: bool = True):
"""
Set whether to skip the first row of csv file.
"""
self._j_schema_builder.setSkipFirstDataRow(skip)
return self
def set_strict_headers(self, strict: bool = True):
"""
Set whether to use strict headers, which check column names in the header are consistent
with the schema.
"""
self._j_schema_builder.setStrictHeaders(strict)
return self
def set_use_header(self, use: bool = True):
"""
Set whether to read header.
"""
self._j_schema_builder.setUseHeader(use)
return self
def size(self):
return len(self._fields)
def __len__(self):
return self.size()
[docs]class CsvBulkWriters(object):
"""
CsvBulkWriter is for building :class:`~pyflink.common.serialization.BulkWriterFactory` to write
Rows with a predefined CSV schema to partitioned files in a bulk fashion.
Example:
::
>>> schema = CsvSchema.builder() \\
... .add_number_column('id', number_type=DataTypes.INT()) \\
... .add_string_column('name') \\
... .add_array_column('list', ',', element_type=DataTypes.STRING()) \\
... .set_column_separator('|') \\
... .build()
>>> sink = FileSink.for_bulk_format(
... OUTPUT_DIR, CsvBulkWriters.for_schema(schema)).build()
>>> ds.sink_to(sink)
.. versionadded:: 1.16.0
"""
@staticmethod
def for_schema(schema: 'CsvSchema') -> 'BulkWriterFactory':
"""
Creates a :class:`~pyflink.common.serialization.BulkWriterFactory` for writing records to
files in CSV format.
"""
from pyflink.table.types import _to_java_data_type
jvm = get_gateway().jvm
csv = jvm.org.apache.flink.formats.csv
j_factory = csv.PythonCsvUtils.createCsvBulkWriterFactory(
schema._j_schema,
_to_java_data_type(schema._row_type))
return RowDataBulkWriterFactory(j_factory, schema._row_type)
[docs]class CsvRowDeserializationSchema(DeserializationSchema):
"""
Deserialization schema from CSV to Flink types. Deserializes a byte[] message as a JsonNode and
converts it to Row.
Failure during deserialization are forwarded as wrapped IOException.
"""
def __init__(self, j_deserialization_schema):
super(CsvRowDeserializationSchema, self).__init__(
j_deserialization_schema=j_deserialization_schema)
class Builder(object):
"""
A builder for creating a CsvRowDeserializationSchema.
"""
def __init__(self, type_info: TypeInformation):
if type_info is None:
raise TypeError("Type information must not be None")
self._j_builder = get_gateway().jvm\
.org.apache.flink.formats.csv.CsvRowDeserializationSchema.Builder(
type_info.get_java_type_info())
def set_field_delimiter(self, delimiter: str):
self._j_builder = self._j_builder.setFieldDelimiter(delimiter)
return self
def set_allow_comments(self, allow_comments: bool):
self._j_builder = self._j_builder.setAllowComments(allow_comments)
return self
def set_array_element_delimiter(self, delimiter: str):
self._j_builder = self._j_builder.setArrayElementDelimiter(delimiter)
return self
def set_quote_character(self, c: str):
self._j_builder = self._j_builder.setQuoteCharacter(c)
return self
def set_escape_character(self, c: str):
self._j_builder = self._j_builder.setEscapeCharacter(c)
return self
def set_null_literal(self, null_literal: str):
self._j_builder = self._j_builder.setNullLiteral(null_literal)
return self
def set_ignore_parse_errors(self, ignore_parse_errors: bool):
self._j_builder = self._j_builder.setIgnoreParseErrors(ignore_parse_errors)
return self
def build(self):
j_csv_row_deserialization_schema = self._j_builder.build()
return CsvRowDeserializationSchema(
j_deserialization_schema=j_csv_row_deserialization_schema)
[docs]class CsvRowSerializationSchema(SerializationSchema):
"""
Serialization schema that serializes an object of Flink types into a CSV bytes. Serializes the
input row into an ObjectNode and converts it into byte[].
Result byte[] messages can be deserialized using CsvRowDeserializationSchema.
"""
def __init__(self, j_csv_row_serialization_schema):
super(CsvRowSerializationSchema, self).__init__(j_csv_row_serialization_schema)
class Builder(object):
"""
A builder for creating a CsvRowSerializationSchema.
"""
def __init__(self, type_info: TypeInformation):
if type_info is None:
raise TypeError("Type information must not be None")
self._j_builder = get_gateway().jvm\
.org.apache.flink.formats.csv.CsvRowSerializationSchema.Builder(
type_info.get_java_type_info())
def set_field_delimiter(self, c: str):
self._j_builder = self._j_builder.setFieldDelimiter(c)
return self
def set_line_delimiter(self, delimiter: str):
self._j_builder = self._j_builder.setLineDelimiter(delimiter)
return self
def set_array_element_delimiter(self, delimiter: str):
self._j_builder = self._j_builder.setArrayElementDelimiter(delimiter)
return self
def disable_quote_character(self):
self._j_builder = self._j_builder.disableQuoteCharacter()
return self
def set_quote_character(self, c: str):
self._j_builder = self._j_builder.setQuoteCharacter(c)
return self
def set_escape_character(self, c: str):
self._j_builder = self._j_builder.setEscapeCharacter(c)
return self
def set_null_literal(self, s: str):
self._j_builder = self._j_builder.setNullLiteral(s)
return self
def build(self):
j_serialization_schema = self._j_builder.build()
return CsvRowSerializationSchema(j_csv_row_serialization_schema=j_serialization_schema)