Parquet
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

Parquet format #

Flink 支持读取 Parquet 文件并生成 Flink RowData Avro 记录。 要使用 Parquet format,你需要将 flink-parquet 依赖添加到项目中:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-parquet</artifactId>
    <version>1.17-SNAPSHOT</version>
</dependency>

要使用 Avro 格式,你需要将 parquet-avro 依赖添加到项目中:

<dependency>
    <groupId>org.apache.parquet</groupId>
    <artifactId>parquet-avro</artifactId>
    <version>1.12.2</version>
    <optional>true</optional>
    <exclusions>
        <exclusion>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
        </exclusion>
        <exclusion>
            <groupId>it.unimi.dsi</groupId>
            <artifactId>fastutil</artifactId>
        </exclusion>
    </exclusions>
</dependency>

为了在 PyFlink 作业中使用 Parquet format ,需要添加下列依赖:

PyFlink JAR
Only available for stable releases.
在 PyFlink 中如何添加 JAR 包依赖参见 Python 依赖管理

此格式与新的 Source 兼容,可以同时在批和流模式下使用。 因此,你可使用此格式处理以下两类数据:

  • 有界数据: 列出所有文件并全部读取。
  • 无界数据:监控目录中出现的新文件
当你开启一个 File Source,会被默认为有界读取。 如果你想在连续读取模式下使用 File Source,你必须额外调用 AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)

Vectorized reader


// Parquet rows are decoded in batches
FileSource.forBulkFileFormat(BulkFormat,Path...)
// Monitor the Paths to read data as unbounded data
FileSource.forBulkFileFormat(BulkFormat,Path...)
.monitorContinuously(Duration.ofMillis(5L))
.build();

# Parquet rows are decoded in batches
FileSource.for_bulk_file_format(BulkFormat, Path...)

# Monitor the Paths to read data as unbounded data
FileSource.for_bulk_file_format(BulkFormat, Path...) \
          .monitor_continuously(Duration.of_millis(5)) \
          .build()

Avro Parquet reader


// Parquet rows are decoded in batches
FileSource.forRecordStreamFormat(StreamFormat,Path...)

// Monitor the Paths to read data as unbounded data
FileSource.forRecordStreamFormat(StreamFormat,Path...)
        .monitorContinuously(Duration.ofMillis(5L))
        .build();