Source code for pyflink.datastream.formats.parquet

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from typing import Optional, TYPE_CHECKING

from pyflink.common import Configuration
from pyflink.common.serialization import BulkWriterFactory, RowDataBulkWriterFactory
from pyflink.datastream.connectors.file_system import StreamFormat, BulkFormat
from pyflink.datastream.formats.avro import AvroSchema
from pyflink.datastream.utils import create_hadoop_configuration
from pyflink.java_gateway import get_gateway

if TYPE_CHECKING:
    from pyflink.table.types import RowType

__all__ = [
    'AvroParquetReaders',
    'AvroParquetWriters',
    'ParquetColumnarRowInputFormat',
    'ParquetBulkWriters'
]


[docs]class AvroParquetReaders(object): """ A convenience builder to create reader format that reads individual Avro records from a Parquet stream. Only GenericRecord is supported in PyFlink. .. versionadded:: 1.16.0 """ @staticmethod def for_generic_record(schema: 'AvroSchema') -> 'StreamFormat': """ Creates a new AvroParquetRecordFormat that reads the parquet file into Avro GenericRecords. To read into GenericRecords, this method needs an Avro Schema. That is because Flink needs to be able to serialize the results in its data flow, which is very inefficient without the schema. And while the Schema is stored in the Avro file header, Flink needs this schema during 'pre-flight' time when the data flow is set up and wired, which is before there is access to the files. Example: :: >>> env = StreamExecutionEnvironment.get_execution_environment() >>> schema = AvroSchema.parse_string(JSON_SCHEMA) >>> source = FileSource.for_record_stream_format( ... AvroParquetReaders.for_generic_record(schema), ... PARQUET_FILE_PATH ... ).build() >>> ds = env.from_source(source, WatermarkStrategy.no_watermarks(), "parquet-source") :param schema: the Avro Schema. :return: StreamFormat for reading Avro GenericRecords. """ jvm = get_gateway().jvm JAvroParquetReaders = jvm.org.apache.flink.formats.parquet.avro.AvroParquetReaders return StreamFormat(JAvroParquetReaders.forGenericRecord(schema._j_schema))
[docs]class AvroParquetWriters(object): """ Convenient builder to create Parquet BulkWriterFactory instances for Avro types. Only GenericRecord is supported at present. .. versionadded:: 1.16.0 """ @staticmethod def for_generic_record(schema: 'AvroSchema') -> 'BulkWriterFactory': """ Creates a ParquetWriterFactory that accepts and writes Avro generic types. The Parquet writers will use the given schema to build and write the columnar data. Note that to make this works in PyFlink, you need to declare the output type of the predecessor before FileSink to be :class:`GenericRecordAvroTypeInfo`, and the predecessor cannot be :meth:`StreamExecutionEnvironment.from_collection`, you can add a pass-through map function before the sink, as the example shown below. The Python data records should match the Avro schema, and have the same behavior with vanilla Python data structure, e.g. an object for Avro array should behave like Python list, an object for Avro map should behave like Python dict. Example: :: >>> env = StreamExecutionEnvironment.get_execution_environment() >>> schema = AvroSchema(JSON_SCHEMA) >>> avro_type_info = GenericRecordAvroTypeInfo(schema) >>> ds = env.from_collection([{'array': [1, 2]}], type_info=Types.PICKLED_BYTE_ARRAY()) >>> sink = FileSink.for_bulk_format( ... OUTPUT_DIR, AvroParquetWriters.for_generic_record(schema)).build() >>> # A map to indicate its Avro type info is necessary for serialization >>> ds.map(lambda e: e, output_type=GenericRecordAvroTypeInfo(schema)) \\ ... .sink_to(sink) :param schema: The avro schema. :return: The BulkWriterFactory to write generic records into parquet files. """ jvm = get_gateway().jvm JAvroParquetWriters = jvm.org.apache.flink.formats.parquet.avro.AvroParquetWriters return BulkWriterFactory(JAvroParquetWriters.forGenericRecord(schema._j_schema))
[docs]class ParquetColumnarRowInputFormat(BulkFormat): """ A ParquetVectorizedInputFormat to provide RowData iterator. Using ColumnarRowData to provide a row view of column batch. Only **primitive** types are supported for a column, composite types such as array, map are not supported. Example: :: >>> row_type = DataTypes.ROW([ ... DataTypes.FIELD('a', DataTypes.INT()), ... DataTypes.FIELD('b', DataTypes.STRING()), ... ]) >>> source = FileSource.for_bulk_file_format(ParquetColumnarRowInputFormat( ... row_type=row_type, ... hadoop_config=Configuration(), ... batch_size=2048, ... is_utc_timestamp=False, ... is_case_sensitive=True, ... ), PARQUET_FILE_PATH).build() >>> ds = env.from_source(source, WatermarkStrategy.no_watermarks(), "parquet-source") .. versionadded:: 1.16.0 """ def __init__(self, row_type: 'RowType', hadoop_config: Optional[Configuration] = None, batch_size: int = 2048, is_utc_timestamp: bool = False, is_case_sensitive: bool = True): if not hadoop_config: hadoop_config = Configuration() from pyflink.table.types import _to_java_data_type jvm = get_gateway().jvm j_row_type = _to_java_data_type(row_type).getLogicalType() produced_type_info = jvm.org.apache.flink.table.runtime.typeutils. \ InternalTypeInfo.of(j_row_type) j_parquet_columnar_format = jvm.org.apache.flink.formats.parquet. \ ParquetColumnarRowInputFormat(create_hadoop_configuration(hadoop_config), j_row_type, produced_type_info, batch_size, is_utc_timestamp, is_case_sensitive) super().__init__(j_parquet_columnar_format)
[docs]class ParquetBulkWriters(object): """ Convenient builder to create a :class:`~pyflink.common.serialization.BulkWriterFactory` that writes records with a predefined schema into Parquet files in a batch fashion. Example: :: >>> row_type = DataTypes.ROW([ ... DataTypes.FIELD('string', DataTypes.STRING()), ... DataTypes.FIELD('int_array', DataTypes.ARRAY(DataTypes.INT())) ... ]) >>> sink = FileSink.for_bulk_format( ... OUTPUT_DIR, ParquetBulkWriters.for_row_type( ... row_type, ... hadoop_config=Configuration(), ... utc_timestamp=True, ... ) ... ).build() >>> ds.sink_to(sink) .. versionadded:: 1.16.0 """ @staticmethod def for_row_type(row_type: 'RowType', hadoop_config: Optional[Configuration] = None, utc_timestamp: bool = False) -> 'BulkWriterFactory': """ Create a :class:`~pyflink.common.serialization.BulkWriterFactory` that writes records with a predefined schema into Parquet files in a batch fashion. :param row_type: The RowType of records, it should match the RowTypeInfo of Row records. :param hadoop_config: Hadoop configuration. :param utc_timestamp: Use UTC timezone or local timezone to the conversion between epoch time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x use UTC timezone. """ if not hadoop_config: hadoop_config = Configuration() from pyflink.table.types import _to_java_data_type jvm = get_gateway().jvm JParquetRowDataBuilder = jvm.org.apache.flink.formats.parquet.row.ParquetRowDataBuilder return RowDataBulkWriterFactory(JParquetRowDataBuilder.createWriterFactory( _to_java_data_type(row_type).getLogicalType(), create_hadoop_configuration(hadoop_config), utc_timestamp ), row_type)