Parquet
This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.

Parquet format #

Flink supports reading Parquet files and producing Flink rows. To use the format you need to add the Flink Parquet dependency to your project:

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-parquet</artifactId>
	<version>1.14.4</version>
</dependency>

This format is compatible with the new Source that can be used in both batch and streaming modes. Thus, you can use this format in two ways:

  • Bounded read for batch mode
  • Continuous read for streaming mode: monitors a directory for new files that appear

Bounded read example:

In this example we create a DataStream containing Parquet records as Flink Rows. We project the schema to read only certain fields (“f7”, “f4” and “f99”).
We read records in batches of 500 records. The first boolean parameter specifies if timestamp columns need to be interpreted as UTC. The second boolean instructs the application if the projected Parquet fields names are to be interpreted in a case sensitive way. There is no need for a watermark strategy as records do not contain event timestamps.

final LogicalType[] fieldTypes =
  new LogicalType[] {
  new DoubleType(), new IntType(), new VarCharType()
  };

final ParquetColumnarRowInputFormat<FileSourceSplit> format =
  new ParquetColumnarRowInputFormat<>(
  new Configuration(),
  RowType.of(fieldTypes, new String[] {"f7", "f4", "f99"}),
  500,
  false,
  true);
final FileSource<RowData> source =
  FileSource.forBulkFileFormat(format,  /* Flink Path */)
  .build();
final DataStream<RowData> stream =
  env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");

Continuous read example:

In this example we create a DataStream containing Parquet records as Flink Rows that will infinitely grow as new files are added to the directory. We monitor for new files each second. We project the schema to read only certain fields (“f7”, “f4” and “f99”).
We read records in batches of 500 records. The first boolean parameter specifies if timestamp columns need to be interpreted as UTC. The second boolean instructs the application if the projected Parquet fields names are to be interpreted in a case sensitive way. There is no need for a watermark strategy as records do not contain event timestamps.

final LogicalType[] fieldTypes =
  new LogicalType[] {
  new DoubleType(), new IntType(), new VarCharType()
  };

final ParquetColumnarRowInputFormat<FileSourceSplit> format =
  new ParquetColumnarRowInputFormat<>(
  new Configuration(),
  RowType.of(fieldTypes, new String[] {"f7", "f4", "f99"}),
  500,
  false,
  true);
final FileSource<RowData> source =
  FileSource.forBulkFileFormat(format,  /* Flink Path */)
  .monitorContinuously(Duration.ofSeconds(1L))
  .build();
final DataStream<RowData> stream =
  env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");