Avro

Avro format #

Flink has built-in support for Apache Avro. This allows to easily read and write Avro data based on an Avro schema with Flink. The serialization framework of Flink is able to handle classes generated from Avro schemas. In order to use the Avro format the following dependencies are required for projects using a build automation tool (such as Maven or SBT).

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-avro</artifactId>
  <version>1.17.2</version>
</dependency>

In order to use the Avro format in PyFlink jobs, the following dependencies are required:

PyFlink JAR
Download
See Python dependency management for more details on how to use JARs in PyFlink.

In order to read data from an Avro file, you have to specify an AvroInputFormat.

Example:

AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
DataStream<User> usersDS = env.createInput(users);

Note that User is a POJO generated by Avro. Flink also allows to perform string-based key selection of these POJOs. For example:

usersDS.keyBy("name");

Note that using the GenericData.Record type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive and thus probably slow to use.

Flink’s POJO field selection also works with POJOs generated from Avro. However, the usage is only possible if the field types are written correctly to the generated class. If a field is of type Object you can not use the field as a join or grouping key. Specifying a field in Avro like this {"name": "type_double_test", "type": "double"}, works fine, however specifying it as a UNION-type with only one field ({"name": "type_double_test", "type": ["double"]},) will generate a field of type Object. Note that specifying nullable types ({"name": "type_double_test", "type": ["null", "double"]},) is possible!

For Python jobs, an Avro schema should be defined to read from Avro files, and the elements will be vanilla Python objects. For example:

schema = AvroSchema.parse_string("""
{
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "favoriteNumber",  "type": ["int", "null"]},
        {"name": "favoriteColor", "type": ["string", "null"]}
    ]
}
""")

env = StreamExecutionEnvironment.get_execution_environment()
ds = env.create_input(AvroInputFormat(AVRO_FILE_PATH, schema))

def json_dumps(record):
    import json
    return json.dumps(record)

ds.map(json_dumps).print()