This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.
Avro
Avro format #
Flink 内置支持 Apache Avro 格式。在 Flink 中将更容易地读写基于 Avro schema 的 Avro 数据。 Flink 的序列化框架可以处理基于 Avro schemas 生成的类。为了能够使用 Avro format,需要在自动构建工具(例如 Maven 或 SBT)中添加如下依赖到项目中。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.15.4</version>
</dependency>
如果读取 Avro 文件数据,你必须指定 AvroInputFormat
。
示例:
AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
DataStream<User> usersDS = env.createInput(users);
注意,User
是一个通过 Avro schema生成的 POJO 类。Flink 还允许选择 POJO 中字符串类型的键。例如:
usersDS.keyBy("name");
注意,在 Flink 中可以使用 GenericData.Record
类型,但是不推荐使用。因为该类型的记录中包含了完整的 schema,导致数据非常密集,使用起来可能很慢。
Flink 的 POJO 字段选择也适用于从 Avro schema 生成的 POJO 类。但是,只有将字段类型正确写入生成的类时,才能使用。如果字段是 Object
类型,则不能将该字段用作 join 键或 grouping 键。
在 Avro 中如 {"name": "type_double_test", "type": "double"},
这样指定字段是可行的,但是如 ({"name": "type_double_test", "type": ["double"]},
) 这样指定包含一个字段的复合类型就会生成 Object
类型的字段。注意,如 ({"name": "type_double_test", "type": ["null", "double"]},
) 这样指定 nullable 类型字段也是可能产生 Object
类型的!