Avro Format #
Format: Serialization Schema Format: Deserialization Schema
Apache Avro format 允许基于 Avro schema 读取和写入 Avro 数据。目前,Avro schema 从 table schema 推导而来。
依赖 #
In order to use the Avro format the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
Maven dependency | SQL Client |
---|---|
|
Download |
如何使用 Avro format 创建表 #
这是使用 Kafka 连接器和 Avro format 创建表的示例。
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'avro'
)
Format 参数 #
参数 | 是否必选 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
format |
必要 | (none) | String | 指定使用什么 format,这里应该是 'avro' 。 |
avro.codec |
可选 | (none) | String | 仅用于 filesystem,avro 压缩编解码器。默认 snappy 压缩。目前支持:null, deflate、snappy、bzip2、xz。 |
timestamp_mapping.legacy |
可选 | true | Boolean | 使用 avro 中时间戳的旧映射。 在 1.19 之前,Flink 的默认行为错误地将 SQL TIMESTAMP 和 TIMESTAMP_LTZ 类型映射到 AVRO TIMESTAMP。 正确的行为是 Flink SQL TIMESTAMP 映射 Avro LOCAL TIMESTAMP 和 Flink SQL TIMESTAMP_LTZ 映射 Avro TIMESTAMP,您可以通过禁用此旧映射来获得正确的映射。 出于兼容性考虑,默认使用旧行为。 |
数据类型映射 #
目前,Avro schema 通常是从 table schema 中推导而来。尚不支持显式定义 Avro schema。因此,下表列出了从 Flink 类型到 Avro 类型的类型映射。
Flink SQL 类型 | Avro 类型 | Avro 逻辑类型 |
---|---|---|
CHAR / VARCHAR / STRING | string | |
BOOLEAN |
boolean |
|
BINARY / VARBINARY |
bytes |
|
DECIMAL |
fixed |
decimal |
TINYINT |
int |
|
SMALLINT |
int |
|
INT |
int |
|
BIGINT |
long |
|
FLOAT |
float |
|
DOUBLE |
double |
|
DATE |
int |
date |
TIME |
int |
time-millis |
TIMESTAMP |
long |
timestamp-millis |
ARRAY |
array |
|
MAP (key 必须是 string/char/varchar 类型) |
map |
|
MULTISET (元素必须是 string/char/varchar 类型) |
map |
|
ROW |
record |
除了上面列出的类型,Flink 支持读取/写入 nullable 的类型。Flink 将 nullable 的类型映射到 Avro union(something, null)
,其中 something
是从 Flink 类型转换的 Avro 类型。
您可以参考 Avro 规范 获取更多有关 Avro 类型的信息。