This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
CSV
CSV Format #
Format: Serialization Schema Format: Deserialization Schema
The CSV format allows to read and write CSV data based on an CSV schema. Currently, the CSV schema is derived from table schema.
Dependencies #
In order to use the CSV format the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
Maven dependency | SQL Client |
---|---|
|
Built-in |
How to create a table with CSV format #
Here is an example to create a table using Kafka connector and CSV format.
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true',
'csv.allow-comments' = 'true'
)
Format Options #
Option | Required | Forwarded | Default | Type | Description |
---|---|---|---|---|---|
format |
required | no | (none) | String | Specify what format to use, here should be 'csv' . |
csv.field-delimiter |
optional | yes | , |
String | Field delimiter character (',' by default), must be single character. You can use backslash to specify special characters, e.g. '\t' represents the tab character.
You can also use unicode to specify them in plain SQL, e.g. 'csv.field-delimiter' = U&'\0001' represents the 0x01 character.
|
csv.disable-quote-character |
optional | yes | false | Boolean | Disabled quote character for enclosing field values (false by default).
If true, option 'csv.quote-character' can not be set. |
csv.quote-character |
optional | yes | " |
String | Quote character for enclosing field values (" by default). |
csv.allow-comments |
optional | yes | false | Boolean | Ignore comment lines that start with '#' (disabled by default).
If enabled, make sure to also ignore parse errors to allow empty rows. |
csv.ignore-parse-errors |
optional | no | false | Boolean | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors. |
csv.array-element-delimiter |
optional | yes | ; |
String | Array element delimiter string for separating
array and row element values (';' by default). |
csv.escape-character |
optional | yes | (none) | String | Escape character for escaping values (disabled by default). |
csv.null-literal |
optional | yes | (none) | String | Null literal string that is interpreted as a null value (disabled by default). |
csv.write-bigdecimal-in-scientific-notation |
optional | yes | true | Boolean | Enables representation of BigDecimal data type in scientific notation (default is true). For example, 100000 is encoded as 1E+5 by default, and will be written as 100000 if set this option to false. Note: Only when the value is not 0 and a multiple of 10 is converted to scientific notation. |
Data Type Mapping #
Currently, the CSV schema is always derived from table schema. Explicitly defining an CSV schema is not supported yet.
Flink CSV format uses jackson databind API to parse and generate CSV string.
The following table lists the type mapping from Flink type to CSV type.
Flink SQL type | CSV type |
---|---|
CHAR / VARCHAR / STRING |
string |
BOOLEAN |
boolean |
BINARY / VARBINARY |
string with encoding: base64 |
DECIMAL |
number |
TINYINT |
number |
SMALLINT |
number |
INT |
number |
BIGINT |
number |
FLOAT |
number |
DOUBLE |
number |
DATE |
string with format: date |
TIME |
string with format: time |
TIMESTAMP |
string with format: date-time |
INTERVAL |
number |
ARRAY |
array |
ROW |
object |