Avro Format #
Format: Serialization Schema Format: Deserialization Schema
The Apache Avro format allows to read and write Avro data based on an Avro schema. Currently, the Avro schema is derived from table schema.
Dependencies #
In order to use the Avro format the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
Maven dependency | SQL Client |
---|---|
|
Download |
How to create a table with Avro format #
Here is an example to create a table using Kafka connector and Avro format.
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'avro'
)
Format Options #
Option | Required | Forwarded | Default | Type | Description |
---|---|---|---|---|---|
format |
required | no | (none) | String | Specify what format to use, here should be 'avro' . |
avro.encoding |
optional | yes | binary | String | Serialization encoding to use. The valid enumerations are: binary , json . (reference)Most applications will use the binary encoding, as it results in smaller and more efficient messages, reducing the usage of disk and network resources, and improving performance for high throughput data. JSON encoding results in human-readable messages which can be useful during development and debugging, and is useful for compatibility when interacting with systems that cannot process binary encoded data. |
avro.codec |
optional | yes | (none) | String | For Filesystem only, the compression codec for avro. Snappy compression as default. The valid enumerations are: null, deflate, snappy, bzip2, xz. |
timestamp_mapping.legacy |
optional | yes | true | Boolean | Use the legacy mapping of timestamp in avro. Before 1.19, The default behavior of Flink wrongly mapped both SQL TIMESTAMP and TIMESTAMP_LTZ type to AVRO TIMESTAMP. The correct behavior is Flink SQL TIMESTAMP maps Avro LOCAL TIMESTAMP and Flink SQL TIMESTAMP_LTZ maps Avro TIMESTAMP, you can obtain the correct mapping by disable using this legacy mapping. Use legacy behavior by default for compatibility consideration. |
Data Type Mapping #
Currently, the Avro schema is always derived from table schema. Explicitly defining an Avro schema is not supported yet. So the following table lists the type mapping from Flink type to Avro type.
Flink SQL type | Avro type | Avro logical type |
---|---|---|
CHAR / VARCHAR / STRING | string | |
BOOLEAN |
boolean |
|
BINARY / VARBINARY |
bytes |
|
DECIMAL |
fixed |
decimal |
TINYINT |
int |
|
SMALLINT |
int |
|
INT |
int |
|
BIGINT |
long |
|
FLOAT |
float |
|
DOUBLE |
double |
|
DATE |
int |
date |
TIME |
int |
time-millis |
TIMESTAMP |
long |
timestamp-millis |
ARRAY |
array |
|
MAP (key must be string/char/varchar type) |
map |
|
MULTISET (element must be string/char/varchar type) |
map |
|
ROW |
record |
In addition to the types listed above, Flink supports reading/writing nullable types. Flink maps nullable types to Avro union(something, null)
, where something
is the Avro type converted from Flink type.
You can refer to Avro Specification for more information about Avro types.