Source code for pyflink.common.serialization

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from py4j.java_gateway import java_import
from pyflink.common import typeinfo
from pyflink.common.typeinfo import TypeInformation

from pyflink.util.java_utils import load_java_class

from pyflink.java_gateway import get_gateway

__all__ = ['SerializationSchema', 'DeserializationSchema', 'SimpleStringSchema',
           'JsonRowSerializationSchema', 'JsonRowDeserializationSchema',
           'CsvRowSerializationSchema', 'CsvRowDeserializationSchema',
           'AvroRowSerializationSchema', 'AvroRowDeserializationSchema', 'Encoder']


[docs]class SerializationSchema(object): """ Base class for SerializationSchema. The serialization schema describes how to turn a data object into a different serialized representation. Most data sinks (for example Apache Kafka) require the data to be handed to them in a specific format (for example as byte strings). """ def __init__(self, j_serialization_schema=None): self._j_serialization_schema = j_serialization_schema
[docs]class DeserializationSchema(object): """ Base class for DeserializationSchema. The deserialization schema describes how to turn the byte messages delivered by certain data sources (for example Apache Kafka) into data types (Java/ Scala objects) that are processed by Flink. In addition, the DeserializationSchema describes the produced type which lets Flink create internal serializers and structures to handle the type. """ def __init__(self, j_deserialization_schema=None): self._j_deserialization_schema = j_deserialization_schema
[docs]class SimpleStringSchema(SerializationSchema, DeserializationSchema): """ Very simple serialization/deserialization schema for strings. By default, the serializer uses 'UTF-8' for string/byte conversion. """ def __init__(self, charset: str = 'UTF-8'): gate_way = get_gateway() j_char_set = gate_way.jvm.java.nio.charset.Charset.forName(charset) j_simple_string_serialization_schema = gate_way \ .jvm.org.apache.flink.api.common.serialization.SimpleStringSchema(j_char_set) SerializationSchema.__init__(self, j_serialization_schema=j_simple_string_serialization_schema) DeserializationSchema.__init__( self, j_deserialization_schema=j_simple_string_serialization_schema)
[docs]class JsonRowDeserializationSchema(DeserializationSchema): """ Deserialization schema from JSON to Flink types. Deserializes a byte[] message as a JSON object and reads the specified fields. Failures during deserialization are forwarded as wrapped IOExceptions. """ def __init__(self, j_deserialization_schema): super(JsonRowDeserializationSchema, self).__init__(j_deserialization_schema)
[docs] @staticmethod def builder(): """ A static method to get a Builder for JsonRowDeserializationSchema. """ return JsonRowDeserializationSchema.Builder()
[docs] class Builder(object): """ Builder for JsonRowDeserializationSchema. """ def __init__(self): self._type_info = None self._fail_on_missing_field = False self._ignore_parse_errors = False
[docs] def type_info(self, type_info: TypeInformation): """ Creates a JSON deserialization schema for the given type information. :param type_info: Type information describing the result type. The field names of Row are used to parse the JSON properties. """ self._type_info = type_info return self
[docs] def json_schema(self, json_schema: str): """ Creates a JSON deserialization schema for the given JSON schema. :param json_schema: JSON schema describing the result type. """ if json_schema is None: raise TypeError("The json_schema must not be None.") j_type_info = get_gateway().jvm \ .org.apache.flink.formats.json.JsonRowSchemaConverter.convert(json_schema) self._type_info = typeinfo._from_java_type(j_type_info) return self
[docs] def fail_on_missing_field(self): """ Configures schema to fail if a JSON field is missing. A missing field is ignored and the field is set to null by default. """ self._fail_on_missing_field = True return self
[docs] def ignore_parse_errors(self): """ Configures schema to fail when parsing json failed. An exception will be thrown when parsing json fails. """ self._ignore_parse_errors = True return self
[docs] def build(self): JBuilder = get_gateway().jvm.org.apache.flink.formats.json.JsonRowDeserializationSchema\ .Builder j_builder = JBuilder(self._type_info.get_java_type_info()) if self._fail_on_missing_field: j_builder = j_builder.fialOnMissingField() if self._ignore_parse_errors: j_builder = j_builder.ignoreParseErrors() j_deserialization_schema = j_builder.build() return JsonRowDeserializationSchema(j_deserialization_schema=j_deserialization_schema)
[docs]class JsonRowSerializationSchema(SerializationSchema): """ Serialization schema that serializes an object of Flink types into a JSON bytes. Serializes the input Flink object into a JSON string and converts it into byte[]. Result byte[] message can be deserialized using JsonRowDeserializationSchema. """ def __init__(self, j_serialization_schema): super(JsonRowSerializationSchema, self).__init__(j_serialization_schema)
[docs] @staticmethod def builder(): return JsonRowSerializationSchema.Builder()
[docs] class Builder(object): """ Builder for JsonRowSerializationSchema. """ def __init__(self): self._type_info = None
[docs] def with_type_info(self, type_info: TypeInformation): """ Creates a JSON serialization schema for the given type information. :param type_info: Type information describing the result type. The field names of Row are used to parse the JSON properties. """ self._type_info = type_info return self
[docs] def build(self): if self._type_info is None: raise TypeError("Typeinfo should be set.") j_builder = get_gateway().jvm \ .org.apache.flink.formats.json.JsonRowSerializationSchema.builder() j_schema = j_builder.withTypeInfo(self._type_info.get_java_type_info()).build() return JsonRowSerializationSchema(j_serialization_schema=j_schema)
[docs]class CsvRowDeserializationSchema(DeserializationSchema): """ Deserialization schema from CSV to Flink types. Deserializes a byte[] message as a JsonNode and converts it to Row. Failure during deserialization are forwarded as wrapped IOException. """ def __init__(self, j_deserialization_schema): super(CsvRowDeserializationSchema, self).__init__( j_deserialization_schema=j_deserialization_schema)
[docs] class Builder(object): """ A builder for creating a CsvRowDeserializationSchema. """ def __init__(self, type_info: TypeInformation): if type_info is None: raise TypeError("Type information must not be None") self._j_builder = get_gateway().jvm\ .org.apache.flink.formats.csv.CsvRowDeserializationSchema.Builder( type_info.get_java_type_info())
[docs] def set_field_delimiter(self, delimiter: str): self._j_builder = self._j_builder.setFieldDelimiter(delimiter) return self
[docs] def set_allow_comments(self, allow_comments: bool): self._j_builder = self._j_builder.setAllowComments(allow_comments) return self
[docs] def set_array_element_delimiter(self, delimiter: str): self._j_builder = self._j_builder.setArrayElementDelimiter(delimiter) return self
[docs] def set_quote_character(self, c: str): self._j_builder = self._j_builder.setQuoteCharacter(c) return self
[docs] def set_escape_character(self, c: str): self._j_builder = self._j_builder.setEscapeCharacter(c) return self
[docs] def set_null_literal(self, null_literal: str): self._j_builder = self._j_builder.setNullLiteral(null_literal) return self
[docs] def set_ignore_parse_errors(self, ignore_parse_errors: bool): self._j_builder = self._j_builder.setIgnoreParseErrors(ignore_parse_errors) return self
[docs] def build(self): j_csv_row_deserialization_schema = self._j_builder.build() return CsvRowDeserializationSchema( j_deserialization_schema=j_csv_row_deserialization_schema)
[docs]class CsvRowSerializationSchema(SerializationSchema): """ Serialization schema that serializes an object of Flink types into a CSV bytes. Serializes the input row into an ObjectNode and converts it into byte[]. Result byte[] messages can be deserialized using CsvRowDeserializationSchema. """ def __init__(self, j_csv_row_serialization_schema): super(CsvRowSerializationSchema, self).__init__(j_csv_row_serialization_schema)
[docs] class Builder(object): """ A builder for creating a CsvRowSerializationSchema. """ def __init__(self, type_info: TypeInformation): if type_info is None: raise TypeError("Type information must not be None") self._j_builder = get_gateway().jvm\ .org.apache.flink.formats.csv.CsvRowSerializationSchema.Builder( type_info.get_java_type_info())
[docs] def set_field_delimiter(self, c: str): self._j_builder = self._j_builder.setFieldDelimiter(c) return self
[docs] def set_line_delimiter(self, delimiter: str): self._j_builder = self._j_builder.setLineDelimiter(delimiter) return self
[docs] def set_array_element_delimiter(self, delimiter: str): self._j_builder = self._j_builder.setArrayElementDelimiter(delimiter) return self
[docs] def disable_quote_character(self): self._j_builder = self._j_builder.disableQuoteCharacter() return self
[docs] def set_quote_character(self, c: str): self._j_builder = self._j_builder.setQuoteCharacter(c) return self
[docs] def set_escape_character(self, c: str): self._j_builder = self._j_builder.setEscapeCharacter(c) return self
[docs] def set_null_literal(self, s: str): self._j_builder = self._j_builder.setNullLiteral(s) return self
[docs] def build(self): j_serialization_schema = self._j_builder.build() return CsvRowSerializationSchema(j_csv_row_serialization_schema=j_serialization_schema)
[docs]class AvroRowDeserializationSchema(DeserializationSchema): """ Deserialization schema from Avro bytes to Row. Deserializes the byte[] messages into (nested) Flink rows. It converts Avro types into types that are compatible with Flink's Table & SQL API. Projects with Avro records containing logical date/time types need to add a JodaTime dependency. """ def __init__(self, record_class: str = None, avro_schema_string: str = None): """ Creates an Avro deserialization schema for the given specific record class or Avro schema string. Having the concrete Avro record class might improve performance. :param record_class: Avro record class used to deserialize Avro's record to Flink's row. :param avro_schema_string: Avro schema string to deserialize Avro's record to Flink's row. """ if avro_schema_string is None and record_class is None: raise TypeError("record_class or avro_schema_string should be specified.") j_deserialization_schema = None if record_class is not None: gateway = get_gateway() java_import(gateway.jvm, record_class) j_record_class = load_java_class(record_class) JAvroRowDeserializationSchema = get_gateway().jvm \ .org.apache.flink.formats.avro.AvroRowDeserializationSchema j_deserialization_schema = JAvroRowDeserializationSchema(j_record_class) elif avro_schema_string is not None: JAvroRowDeserializationSchema = get_gateway().jvm \ .org.apache.flink.formats.avro.AvroRowDeserializationSchema j_deserialization_schema = JAvroRowDeserializationSchema(avro_schema_string) super(AvroRowDeserializationSchema, self).__init__(j_deserialization_schema)
[docs]class AvroRowSerializationSchema(SerializationSchema): """ Serialization schema that serializes to Avro binary format. """ def __init__(self, record_class: str = None, avro_schema_string: str = None): """ Creates AvroSerializationSchema that serializes SpecificRecord using provided schema or record class. :param record_class: Avro record class used to serialize Flink's row to Avro's record. :param avro_schema_string: Avro schema string to serialize Flink's row to Avro's record. """ if avro_schema_string is None and record_class is None: raise TypeError("record_class or avro_schema_string should be specified.") j_serialization_schema = None if record_class is not None: gateway = get_gateway() java_import(gateway.jvm, record_class) j_record_class = load_java_class(record_class) JAvroRowSerializationSchema = get_gateway().jvm \ .org.apache.flink.formats.avro.AvroRowSerializationSchema j_serialization_schema = JAvroRowSerializationSchema(j_record_class) elif avro_schema_string is not None: JAvroRowSerializationSchema = get_gateway().jvm \ .org.apache.flink.formats.avro.AvroRowSerializationSchema j_serialization_schema = JAvroRowSerializationSchema(avro_schema_string) super(AvroRowSerializationSchema, self).__init__(j_serialization_schema)
[docs]class Encoder(object): """ Encoder is used by the file sink to perform the actual writing of the incoming elements to the files in a bucket. """ def __init__(self, j_encoder): self._j_encoder = j_encoder
[docs] @staticmethod def simple_string_encoder(charset_name: str = "UTF-8") -> 'Encoder': """ A simple Encoder that uses toString() on the input elements and writes them to the output bucket file separated by newline. """ j_encoder = get_gateway().jvm.org.apache.flink.api.common.serialization.\ SimpleStringEncoder(charset_name) return Encoder(j_encoder)