CSV
This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.

CSV format #

To use the CSV format you need to add the Flink CSV dependency to your project:

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-csv</artifactId>
	<version>1.15.4</version>
</dependency>

Flink supports reading CSV files using CsvReaderFormat. The reader utilizes Jackson library and allows passing the corresponding configuration for the CSV schema and parsing options.

CsvReaderFormat can be initialized and used like this:

CsvReaderFormat<SomePojo> csvFormat = CsvReaderFormat.forPojo(SomePojo.class);
FileSource<SomePojo> source = 
        FileSource.forRecordStreamFormat(csvFormat, Path.fromLocalFile(...)).build();

The schema for CSV parsing, in this case, is automatically derived based on the fields of the SomePojo class using the Jackson library.

Note: you might need to add @JsonPropertyOrder({field1, field2, ...}) annotation to your class definition with the fields order exactly matching those of the CSV file columns.

Advanced configuration #

If you need more fine-grained control over the CSV schema or the parsing options, use the more low-level forSchema static factory method of CsvReaderFormat:

CsvReaderFormat<T> forSchema(CsvMapper mapper, 
                             CsvSchema schema, 
                             TypeInformation<T> typeInformation) 

Below is an example of reading a POJO with a custom columns’ separator:

//Has to match the exact order of columns in the CSV file
@JsonPropertyOrder({"city","lat","lng","country","iso2",
                    "adminName","capital","population"})
    public static class CityPojo {
    public String city;
    public BigDecimal lat;
    public BigDecimal lng;
    public String country;
    public String iso2;
    public String adminName;
    public String capital;
    public long population;
}

CsvMapper mapper = new CsvMapper();
CsvSchema schema =
        mapper.schemaFor(CityPojo.class).withoutQuoteChar().withColumnSeparator('|');

CsvReaderFormat<CityPojo> csvFormat =
        CsvReaderFormat.forSchema(mapper, schema, TypeInformation.of(CityPojo.class));

FileSource<CityPojo> source =
        FileSource.forRecordStreamFormat(csvFormat,Path.fromLocalFile(...)).build();

The corresponding CSV file:

Berlin|52.5167|13.3833|Germany|DE|Berlin|primary|3644826
San Francisco|37.7562|-122.443|United States|US|California||3592294
Beijing|39.905|116.3914|China|CN|Beijing|primary|19433000

It is also possible to read more complex data types using fine-grained Jackson settings:

public static class ComplexPojo {
    private long id;
    private int[] array;
}

CsvReaderFormat<ComplexPojo> csvFormat =
        CsvReaderFormat.forSchema(
                new CsvMapper(),
                CsvSchema.builder()
                        .addColumn(
                                new CsvSchema.Column(0, "id", CsvSchema.ColumnType.NUMBER))
                        .addColumn(
                                new CsvSchema.Column(4, "array", CsvSchema.ColumnType.ARRAY)
                                        .withArrayElementSeparator("#"))
                        .build(),
                TypeInformation.of(ComplexPojo.class));

The corresponding CSV file:

0,1#2#3
1,
2,1

Similarly to the TextLineInputFormat, CsvReaderFormat can be used in both continues and batch modes (see TextLineInputFormat for examples).