Avro
This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.

Avro formats #

Flink has extensive built-in support for Apache Avro. This allows to easily read from Avro files with Flink. Also, the serialization framework of Flink is able to handle classes generated from Avro schemas. Be sure to include the Flink Avro dependency to the pom.xml of your project.

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-avro</artifactId>
  <version>1.13.6</version>
</dependency>

In order to read data from an Avro file, you have to specify an AvroInputFormat.

Example:

AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
DataStream<User> usersDS = env.createInput(users);

Note that User is a POJO generated by Avro. Flink also allows to perform string-based key selection of these POJOs. For example:

usersDS.keyBy("name")

Note that using the GenericData.Record type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive and thus probably slow to use.

Flink’s POJO field selection also works with POJOs generated from Avro. However, the usage is only possible if the field types are written correctly to the generated class. If a field is of type Object you can not use the field as a join or grouping key. Specifying a field in Avro like this {"name": "type_double_test", "type": "double"}, works fine, however specifying it as a UNION-type with only one field ({"name": "type_double_test", "type": ["double"]},) will generate a field of type Object. Note that specifying nullable types ({"name": "type_double_test", "type": ["null", "double"]},) is possible!