Source code for pyflink.datastream.formats.avro

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from py4j.java_gateway import get_java_class, JavaObject, java_import

from pyflink.common.io import InputFormat
from pyflink.common.serialization import BulkWriterFactory, SerializationSchema, \
    DeserializationSchema
from pyflink.common.typeinfo import TypeInformation

from pyflink.datastream.utils import ResultTypeQueryable
from pyflink.java_gateway import get_gateway
from pyflink.util.java_utils import get_field_value, load_java_class

__all__ = [
    'AvroSchema',
    'GenericRecordAvroTypeInfo',
    'AvroInputFormat',
    'AvroBulkWriters',
    'AvroRowDeserializationSchema',
    'AvroRowSerializationSchema'
]


[docs]class AvroSchema(object): """ Avro Schema class contains Java org.apache.avro.Schema. .. versionadded:: 1.16.0 """ def __init__(self, j_schema): self._j_schema = j_schema self._schema_string = None def __str__(self): if self._schema_string is None: self._schema_string = get_field_value(self._j_schema, 'schema').toString() return self._schema_string @staticmethod def parse_string(json_schema: str) -> 'AvroSchema': """ Parse JSON string as Avro Schema. :param json_schema: JSON represented schema string. :return: the Avro Schema. """ JSchema = get_gateway().jvm.org.apache.flink.avro.shaded.org.apache.avro.Schema return AvroSchema(JSchema.Parser().parse(json_schema)) @staticmethod def parse_file(file_path: str) -> 'AvroSchema': """ Parse a schema definition file as Avro Schema. :param file_path: path to schema definition file. :return: the Avro Schema. """ jvm = get_gateway().jvm j_file = jvm.java.io.File(file_path) JSchema = jvm.org.apache.flink.avro.shaded.org.apache.avro.Schema return AvroSchema(JSchema.Parser().parse(j_file))
[docs]class GenericRecordAvroTypeInfo(TypeInformation): """ A :class:`TypeInformation` of Avro's GenericRecord, including the schema. This is a wrapper of Java org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo. Note that this type cannot be used as the type_info of data in :meth:`StreamExecutionEnvironment.from_collection`. .. versionadded::1.16.0 """ def __init__(self, schema: 'AvroSchema'): super(GenericRecordAvroTypeInfo, self).__init__() self._schema = schema self._j_typeinfo = get_gateway().jvm.org.apache.flink.formats.avro.typeutils \ .GenericRecordAvroTypeInfo(schema._j_schema) def get_java_type_info(self) -> JavaObject: return self._j_typeinfo
[docs]class AvroInputFormat(InputFormat, ResultTypeQueryable): """ Provides a FileInputFormat for Avro records. Example: :: >>> env = StreamExecutionEnvironment.get_execution_environment() >>> schema = AvroSchema.parse_string(JSON_SCHEMA) >>> ds = env.create_input(AvroInputFormat(FILE_PATH, schema)) .. versionadded:: 1.16.0 """ def __init__(self, path: str, schema: 'AvroSchema'): """ :param path: The path to Avro data file. :param schema: The :class:`AvroSchema` of generic record. """ jvm = get_gateway().jvm j_avro_input_format = jvm.org.apache.flink.formats.avro.AvroInputFormat( jvm.org.apache.flink.core.fs.Path(path), get_java_class(jvm.org.apache.flink.avro.shaded.org.apache.avro.generic.GenericRecord) ) super(AvroInputFormat, self).__init__(j_avro_input_format) self._type_info = GenericRecordAvroTypeInfo(schema) def get_produced_type(self) -> GenericRecordAvroTypeInfo: return self._type_info
[docs]class AvroBulkWriters(object): """ Convenience builder to create :class:`~pyflink.common.serialization.BulkWriterFactory` for Avro types. .. versionadded:: 1.16.0 """ @staticmethod def for_generic_record(schema: 'AvroSchema') -> 'BulkWriterFactory': """ Creates an AvroWriterFactory that accepts and writes Avro generic types. The Avro writers will use the given schema to build and write the records. Note that to make this works in PyFlink, you need to declare the output type of the predecessor before FileSink to be :class:`~GenericRecordAvroTypeInfo`, and the predecessor cannot be :meth:`StreamExecutionEnvironment.from_collection`, you can add a pass-through map function before the sink, as the example shown below. The Python data records should match the Avro schema, and have the same behavior with vanilla Python data structure, e.g. an object for Avro array should behave like Python list, an object for Avro map should behave like Python dict. Example: :: >>> env = StreamExecutionEnvironment.get_execution_environment() >>> schema = AvroSchema(JSON_SCHEMA) >>> avro_type_info = GenericRecordAvroTypeInfo(schema) >>> ds = env.from_collection([{'array': [1, 2]}], type_info=Types.PICKLED_BYTE_ARRAY()) >>> sink = FileSink.for_bulk_format( ... OUTPUT_DIR, AvroBulkWriters.for_generic_record(schema)).build() >>> # A map to indicate its Avro type info is necessary for serialization >>> ds.map(lambda e: e, output_type=GenericRecordAvroTypeInfo(schema)) \\ ... .sink_to(sink) :param schema: The avro schema. :return: The BulkWriterFactory to write generic records into avro files. """ jvm = get_gateway().jvm j_bulk_writer_factory = jvm.org.apache.flink.formats.avro.AvroWriters.forGenericRecord( schema._j_schema ) return BulkWriterFactory(j_bulk_writer_factory)
[docs]class AvroRowDeserializationSchema(DeserializationSchema): """ Deserialization schema from Avro bytes to Row. Deserializes the byte[] messages into (nested) Flink rows. It converts Avro types into types that are compatible with Flink's Table & SQL API. Projects with Avro records containing logical date/time types need to add a JodaTime dependency. """ def __init__(self, record_class: str = None, avro_schema_string: str = None): """ Creates an Avro deserialization schema for the given specific record class or Avro schema string. Having the concrete Avro record class might improve performance. :param record_class: Avro record class used to deserialize Avro's record to Flink's row. :param avro_schema_string: Avro schema string to deserialize Avro's record to Flink's row. """ if avro_schema_string is None and record_class is None: raise TypeError("record_class or avro_schema_string should be specified.") j_deserialization_schema = None if record_class is not None: gateway = get_gateway() java_import(gateway.jvm, record_class) j_record_class = load_java_class(record_class) JAvroRowDeserializationSchema = get_gateway().jvm \ .org.apache.flink.formats.avro.AvroRowDeserializationSchema j_deserialization_schema = JAvroRowDeserializationSchema(j_record_class) elif avro_schema_string is not None: JAvroRowDeserializationSchema = get_gateway().jvm \ .org.apache.flink.formats.avro.AvroRowDeserializationSchema j_deserialization_schema = JAvroRowDeserializationSchema(avro_schema_string) super(AvroRowDeserializationSchema, self).__init__(j_deserialization_schema)
[docs]class AvroRowSerializationSchema(SerializationSchema): """ Serialization schema that serializes to Avro binary format. """ def __init__(self, record_class: str = None, avro_schema_string: str = None): """ Creates AvroSerializationSchema that serializes SpecificRecord using provided schema or record class. :param record_class: Avro record class used to serialize Flink's row to Avro's record. :param avro_schema_string: Avro schema string to serialize Flink's row to Avro's record. """ if avro_schema_string is None and record_class is None: raise TypeError("record_class or avro_schema_string should be specified.") j_serialization_schema = None if record_class is not None: gateway = get_gateway() java_import(gateway.jvm, record_class) j_record_class = load_java_class(record_class) JAvroRowSerializationSchema = get_gateway().jvm \ .org.apache.flink.formats.avro.AvroRowSerializationSchema j_serialization_schema = JAvroRowSerializationSchema(j_record_class) elif avro_schema_string is not None: JAvroRowSerializationSchema = get_gateway().jvm \ .org.apache.flink.formats.avro.AvroRowSerializationSchema j_serialization_schema = JAvroRowSerializationSchema(avro_schema_string) super(AvroRowSerializationSchema, self).__init__(j_serialization_schema)