Source code for pyflink.datastream.formats.avro
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from py4j.java_gateway import get_java_class, JavaObject, java_import
from pyflink.common.io import InputFormat
from pyflink.common.serialization import BulkWriterFactory, SerializationSchema, \
DeserializationSchema
from pyflink.common.typeinfo import TypeInformation
from pyflink.datastream.utils import ResultTypeQueryable
from pyflink.java_gateway import get_gateway
from pyflink.util.java_utils import get_field_value, load_java_class
__all__ = [
'AvroSchema',
'GenericRecordAvroTypeInfo',
'AvroInputFormat',
'AvroBulkWriters',
'AvroRowDeserializationSchema',
'AvroRowSerializationSchema'
]
[docs]class AvroSchema(object):
"""
Avro Schema class contains Java org.apache.avro.Schema.
.. versionadded:: 1.16.0
"""
def __init__(self, j_schema):
self._j_schema = j_schema
self._schema_string = None
def __str__(self):
if self._schema_string is None:
self._schema_string = get_field_value(self._j_schema, 'schema').toString()
return self._schema_string
@staticmethod
def parse_string(json_schema: str) -> 'AvroSchema':
"""
Parse JSON string as Avro Schema.
:param json_schema: JSON represented schema string.
:return: the Avro Schema.
"""
JSchema = get_gateway().jvm.org.apache.flink.avro.shaded.org.apache.avro.Schema
return AvroSchema(JSchema.Parser().parse(json_schema))
@staticmethod
def parse_file(file_path: str) -> 'AvroSchema':
"""
Parse a schema definition file as Avro Schema.
:param file_path: path to schema definition file.
:return: the Avro Schema.
"""
jvm = get_gateway().jvm
j_file = jvm.java.io.File(file_path)
JSchema = jvm.org.apache.flink.avro.shaded.org.apache.avro.Schema
return AvroSchema(JSchema.Parser().parse(j_file))
[docs]class GenericRecordAvroTypeInfo(TypeInformation):
"""
A :class:`TypeInformation` of Avro's GenericRecord, including the schema. This is a wrapper of
Java org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo.
Note that this type cannot be used as the type_info of data in
:meth:`StreamExecutionEnvironment.from_collection`.
.. versionadded::1.16.0
"""
def __init__(self, schema: 'AvroSchema'):
super(GenericRecordAvroTypeInfo, self).__init__()
self._schema = schema
self._j_typeinfo = get_gateway().jvm.org.apache.flink.formats.avro.typeutils \
.GenericRecordAvroTypeInfo(schema._j_schema)
def get_java_type_info(self) -> JavaObject:
return self._j_typeinfo
[docs]class AvroBulkWriters(object):
"""
Convenience builder to create :class:`~pyflink.common.serialization.BulkWriterFactory` for
Avro types.
.. versionadded:: 1.16.0
"""
@staticmethod
def for_generic_record(schema: 'AvroSchema') -> 'BulkWriterFactory':
"""
Creates an AvroWriterFactory that accepts and writes Avro generic types. The Avro writers
will use the given schema to build and write the records.
Note that to make this works in PyFlink, you need to declare the output type of the
predecessor before FileSink to be :class:`~GenericRecordAvroTypeInfo`, and the predecessor
cannot be :meth:`StreamExecutionEnvironment.from_collection`, you can add a pass-through map
function before the sink, as the example shown below.
The Python data records should match the Avro schema, and have the same behavior with
vanilla Python data structure, e.g. an object for Avro array should behave like Python list,
an object for Avro map should behave like Python dict.
Example:
::
>>> env = StreamExecutionEnvironment.get_execution_environment()
>>> schema = AvroSchema(JSON_SCHEMA)
>>> avro_type_info = GenericRecordAvroTypeInfo(schema)
>>> ds = env.from_collection([{'array': [1, 2]}], type_info=Types.PICKLED_BYTE_ARRAY())
>>> sink = FileSink.for_bulk_format(
... OUTPUT_DIR, AvroBulkWriters.for_generic_record(schema)).build()
>>> # A map to indicate its Avro type info is necessary for serialization
>>> ds.map(lambda e: e, output_type=GenericRecordAvroTypeInfo(schema)) \\
... .sink_to(sink)
:param schema: The avro schema.
:return: The BulkWriterFactory to write generic records into avro files.
"""
jvm = get_gateway().jvm
j_bulk_writer_factory = jvm.org.apache.flink.formats.avro.AvroWriters.forGenericRecord(
schema._j_schema
)
return BulkWriterFactory(j_bulk_writer_factory)
[docs]class AvroRowDeserializationSchema(DeserializationSchema):
"""
Deserialization schema from Avro bytes to Row. Deserializes the byte[] messages into (nested)
Flink rows. It converts Avro types into types that are compatible with Flink's Table & SQL API.
Projects with Avro records containing logical date/time types need to add a JodaTime dependency.
"""
def __init__(self, record_class: str = None, avro_schema_string: str = None):
"""
Creates an Avro deserialization schema for the given specific record class or Avro schema
string. Having the concrete Avro record class might improve performance.
:param record_class: Avro record class used to deserialize Avro's record to Flink's row.
:param avro_schema_string: Avro schema string to deserialize Avro's record to Flink's row.
"""
if avro_schema_string is None and record_class is None:
raise TypeError("record_class or avro_schema_string should be specified.")
j_deserialization_schema = None
if record_class is not None:
gateway = get_gateway()
java_import(gateway.jvm, record_class)
j_record_class = load_java_class(record_class)
JAvroRowDeserializationSchema = get_gateway().jvm \
.org.apache.flink.formats.avro.AvroRowDeserializationSchema
j_deserialization_schema = JAvroRowDeserializationSchema(j_record_class)
elif avro_schema_string is not None:
JAvroRowDeserializationSchema = get_gateway().jvm \
.org.apache.flink.formats.avro.AvroRowDeserializationSchema
j_deserialization_schema = JAvroRowDeserializationSchema(avro_schema_string)
super(AvroRowDeserializationSchema, self).__init__(j_deserialization_schema)
[docs]class AvroRowSerializationSchema(SerializationSchema):
"""
Serialization schema that serializes to Avro binary format.
"""
def __init__(self, record_class: str = None, avro_schema_string: str = None):
"""
Creates AvroSerializationSchema that serializes SpecificRecord using provided schema or
record class.
:param record_class: Avro record class used to serialize Flink's row to Avro's record.
:param avro_schema_string: Avro schema string to serialize Flink's row to Avro's record.
"""
if avro_schema_string is None and record_class is None:
raise TypeError("record_class or avro_schema_string should be specified.")
j_serialization_schema = None
if record_class is not None:
gateway = get_gateway()
java_import(gateway.jvm, record_class)
j_record_class = load_java_class(record_class)
JAvroRowSerializationSchema = get_gateway().jvm \
.org.apache.flink.formats.avro.AvroRowSerializationSchema
j_serialization_schema = JAvroRowSerializationSchema(j_record_class)
elif avro_schema_string is not None:
JAvroRowSerializationSchema = get_gateway().jvm \
.org.apache.flink.formats.avro.AvroRowSerializationSchema
j_serialization_schema = JAvroRowSerializationSchema(avro_schema_string)
super(AvroRowSerializationSchema, self).__init__(j_serialization_schema)