Avro

Avro format #

Flink 内置支持 Apache Avro 格式。在 Flink 中将更容易地读写基于 Avro schema 的 Avro 数据。 Flink 的序列化框架可以处理基于 Avro schemas 生成的类。为了能够使用 Avro format,需要在自动构建工具(例如 Maven 或 SBT)中添加如下依赖到项目中。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-avro</artifactId>
  <version>1.16.2</version>
</dependency>

为了在 PyFlink 作业中使用 Avro format ,需要添加下列依赖:

PyFlink JAR
Download
在 PyFlink 中如何添加 JAR 包依赖请参考 Python 依赖管理

如果读取 Avro 文件数据,你必须指定 AvroInputFormat

示例

AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
DataStream<User> usersDS = env.createInput(users);

注意,User 是一个通过 Avro schema生成的 POJO 类。Flink 还允许选择 POJO 中字符串类型的键。例如:

usersDS.keyBy("name");

注意,在 Flink 中可以使用 GenericData.Record 类型,但是不推荐使用。因为该类型的记录中包含了完整的 schema,导致数据非常密集,使用起来可能很慢。

Flink 的 POJO 字段选择也适用于从 Avro schema 生成的 POJO 类。但是,只有将字段类型正确写入生成的类时,才能使用。如果字段是 Object 类型,则不能将该字段用作 join 键或 grouping 键。 在 Avro 中如 {"name": "type_double_test", "type": "double"}, 这样指定字段是可行的,但是如 ({"name": "type_double_test", "type": ["double"]},) 这样指定包含一个字段的复合类型就会生成 Object 类型的字段。注意,如 ({"name": "type_double_test", "type": ["null", "double"]},) 这样指定 nullable 类型字段也是可能产生 Object 类型的!

在 Python 作业中读取 Avro 文件,需要先定义 Avro schema,产生的 DataStream 元素为原生的 Python 对象 Generic。例如:

schema = AvroSchema.parse_string("""
{
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "favoriteNumber",  "type": ["int", "null"]},
        {"name": "favoriteColor", "type": ["string", "null"]}
    ]
}
""")

env = StreamExecutionEnvironment.get_execution_environment()
ds = env.create_input(AvroInputFormat(AVRO_FILE_PATH, schema))

def json_dumps(record):
    import json
    return json.dumps(record)

ds.map(json_dumps).print()