Flink’s Table API & SQL programs can be connected to other external systems for reading and writing both batch and streaming tables. A table source provides access to data which is stored in external systems (such as a database, key-value store, message queue, or file system). A table sink emits a table to an external storage system. Depending on the type of source and sink, they support different formats such as CSV, Parquet, or ORC.
This page describes how to declare built-in table sources and/or table sinks and register them in Flink. After a source or sink has been registered, it can be accessed by Table API & SQL statements.
Attention If you want to implement your own custom table source or sink, have a look at the user-defined sources & sinks page.
The following table list all available connectors and formats. Their mutual compatibility is tagged in the corresponding sections for table connectors and table formats. The following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
Name | Version | Maven dependency | SQL Client JAR |
---|---|---|---|
Filesystem | Built-in | Built-in | |
Apache Kafka | 0.8 | flink-connector-kafka-0.8 |
Not available |
Apache Kafka | 0.9 | flink-connector-kafka-0.9 |
Download |
Apache Kafka | 0.10 | flink-connector-kafka-0.10 |
Download |
Apache Kafka | 0.11 | flink-connector-kafka-0.11 |
Download |
Name | Maven dependency | SQL Client JAR |
---|---|---|
CSV | Built-in | Built-in |
JSON | flink-json |
Download |
Apache Avro | flink-avro |
Download |
Beginning from Flink 1.6, the declaration of a connection to an external system is separated from the actual implementation.
Connections can be specified either
Descriptor
under org.apache.flink.table.descriptors
for Table & SQL APIThis allows not only for better unification of APIs and SQL Client but also for better extensibility in case of custom implementations without changing the actual declaration.
Every declaration is similar to a SQL CREATE TABLE
statement. One can define the name of the table, the schema of the table, a connector, and a data format upfront for connecting to an external system.
The connector describes the external system that stores the data of a table. Storage systems such as Apacha Kafka or a regular file system can be declared here. The connector might already provide a fixed format with fields and schema.
Some systems support different data formats. For example, a table that is stored in Kafka or in files can encode its rows with CSV, JSON, or Avro. A database connector might need the table schema here. Whether or not a storage system requires the definition of a format, is documented for every connector. Different systems also require different types of formats (e.g., column-oriented formats vs. row-oriented formats). The documentation states which format types and connectors are compatible.
The table schema defines the schema of a table that is exposed to SQL queries. It describes how a source maps the data format to the table schema and a sink vice versa. The schema has access to fields defined by the connector or format. It can use one or more fields for extracting or inserting time attributes. If input fields have no determinstic field order, the schema clearly defines column names, their order, and origin.
The subsequent sections will cover each definition part (connector, format, and schema) in more detail. The following example shows how to pass them:
The table’s type (source
, sink
, or both
) determines how a table is registered. In case of table type both
, both a table source and table sink are registered under the same name. Logically, this means that we can both read and write to such a table similarly to a table in a regular DBMS.
For streaming queries, an update mode declares how to communicate between a dynamic table and the storage system for continous queries.
The following code shows a full example of how to connect to Kafka for reading Avro records.
In both ways the desired connection properties are converted into normalized, string-based key-value pairs. So-called table factories create configured table sources, table sinks, and corresponding formats from the key-value pairs. All table factories that can be found via Java’s Service Provider Interfaces (SPI) are taken into account when searching for exactly-one matching table factory.
If no factory can be found or multiple factories match for the given properties, an exception will be thrown with additional information about considered factories and supported properties.
The table schema defines the names and types of columns similar to the column definitions of a SQL CREATE TABLE
statement. In addition, one can specify how columns are mapped from and to fields of the format in which the table data is encoded. The origin of a field might be important if the name of the column should differ from the input/output format. For instance, a column user_name
should reference the field $$-user-name
from a JSON format. Additionally, the schema is needed to map types from an external system to Flink’s representation. In case of a table sink, it ensures that only data with valid schema is written to an external system.
The following example shows a simple schema without time attributes and one-to-one field mapping of input/output to table columns.
For each field, the following properties can be declared in addition to the column’s name and type:
Time attributes are essential when working with unbounded streaming tables. Therefore both processing-time and event-time (also known as “rowtime”) attributes can be defined as part of the schema.
For more information about time handling in Flink and especially event-time, we recommend the general event-time section.
In order to control the event-time behavior for tables, Flink provides predefined timestamp extractors and watermark strategies.
The following timestamp extractors are supported:
The following watermark strategies are supported:
Make sure to always declare both timestamps and watermarks. Watermarks are required for triggering time-based operations.
Because type information is only available in a programming language, the following type strings are supported for being defined in a YAML file:
For streaming queries, it is required to declare how to perform the conversion between a dynamic table and an external connector. The update mode specifies which kind of messages should be exchanged with the external system:
Append Mode: In append mode, a dynamic table and an external connector only exchange INSERT messages.
Retract Mode: In retract mode, a dynamic table and an external connector exchange ADD and RETRACT messages. An INSERT change is encoded as an ADD message, a DELETE change as a RETRACT message, and an UPDATE change as a RETRACT message for the updated (previous) row and an ADD message for the updating (new) row. In this mode, a key must not be defined as opposed to upsert mode. However, every update consists of two messages which is less efficient.
Upsert Mode: In upsert mode, a dynamic table and an external connector exchange UPSERT and DELETE messages. This mode requires a (possibly composite) unique key by which updates can be propagated. The external connector needs to be aware of the unique key attribute in order to apply messages correctly. INSERT and UPDATE changes are encoded as UPSERT messages. DELETE changes as DELETE messages. The main difference to a retract stream is that UPDATE changes are encoded with a single message and are therefore more efficient.
Attention The documentation of each connector states which update modes are supported.
See also the general streaming concepts documentation for more information.
Flink provides a set of connectors for connecting to external systems.
Please note that not all connectors are available in both batch and streaming yet. Furthermore, not every streaming connector supports every streaming mode. Therefore, each connector is tagged accordingly. A format tag indicates that the connector requires a certain type of format.
Source: Batch Source: Streaming Append Mode Sink: Batch Sink: Streaming Append Mode Format: CSV-only
The file system connector allows for reading and writing from a local or distributed filesystem. A filesystem can be defined as:
The file system connector itself is included in Flink and does not require an additional dependency. A corresponding format needs to be specified for reading and writing rows from and to a file system.
Attention Make sure to include Flink File System specific dependencies.
Attention File system sources and sinks for streaming are only experimental. In the future, we will support actual streaming use cases, i.e., directory monitoring and bucket output.
Source: Streaming Append Mode Sink: Streaming Append Mode Format: Serialization Schema Format: Deserialization Schema
The Kafka connector allows for reading and writing from and to an Apache Kafka topic. It can be defined as follows:
Specify the start reading position: By default, the Kafka source will start reading data from the committed group offsets in Zookeeper or Kafka brokers. You can specify other start positions, which correspond to the configurations in section Kafka Consumers Start Position Configuration.
Flink-Kafka Sink Partitioning: By default, a Kafka sink writes to at most as many partitions as its own parallelism (each parallel instance of the sink writes to exactly one partition). In order to distribute the writes to more partitions or control the routing of rows into partitions, a custom sink partitioner can be provided. The round-robin partitioner is useful to avoid an unbalanced partitioning. However, it will cause a lot of network connections between all the Flink instances and all the Kafka brokers.
Consistency guarantees: By default, a Kafka sink ingests data with at-least-once guarantees into a Kafka topic if the query is executed with checkpointing enabled.
Kafka 0.10+ Timestamps: Since Kafka 0.10, Kafka messages have a timestamp as metadata that specifies when the record was written into the Kafka topic. These timestamps can be used for a rowtime attribute by selecting timestamps: from-source
in YAML and timestampsFromSource()
in Java/Scala respectively.
Make sure to add the version-specific Kafka dependency. In addition, a corresponding format needs to be specified for reading and writing rows from and to Kafka.
Flink provides a set of table formats that can be used with table connectors.
A format tag indicates the format type for matching with a connector.
The CSV format allows to read and write comma-separated rows.
The CSV format is included in Flink and does not require additional dependencies.
Attention The CSV format for writing rows is limited at the moment. Only a custom field delimiter is supported as optional parameter.
Format: Serialization Schema Format: Deserialization Schema
The JSON format allows to read and write JSON data that corresponds to a given format schema. The format schema can be defined either as a Flink type, as a JSON schema, or derived from the desired table schema. A Flink type enables a more SQL-like definition and mapping to the corresponding SQL data types. The JSON schema allows for more complex and nested structures.
If the format schema is equal to the table schema, the schema can also be automatically derived. This allows for defining schema information only once. The names, types, and field order of the format are determined by the table’s schema. Time attributes are ignored if their origin is not a field. A from
definition in the table schema is interpreted as a field renaming in the format.
The following table shows the mapping of JSON schema types to Flink SQL types:
JSON schema | Flink SQL |
---|---|
object |
ROW |
boolean |
BOOLEAN |
array |
ARRAY[_] |
number |
DECIMAL |
integer |
DECIMAL |
string |
VARCHAR |
string with format: date-time |
TIMESTAMP |
string with format: date |
DATE |
string with format: time |
TIME |
string with encoding: base64 |
ARRAY[TINYINT] |
null |
NULL (unsupported yet) |
Currently, Flink supports only a subset of the JSON schema specification draft-07
. Union types (as well as allOf
, anyOf
, not
) are not supported yet. oneOf
and arrays of types are only supported for specifying nullability.
Simple references that link to a common definition in the document are supported as shown in the more complex example below:
Missing Field Handling: By default, a missing JSON field is set to null
. You can enable strict JSON parsing that will cancel the source (and query) if a field is missing.
Make sure to add the JSON format as a dependency.
Format: Serialization Schema Format: Deserialization Schema
The Apache Avro format allows to read and write Avro data that corresponds to a given format schema. The format schema can be defined either as a fully qualified class name of an Avro specific record or as an Avro schema string. If a class name is used, the class must be available in the classpath during runtime.
Avro types are mapped to the corresponding SQL data types. Union types are only supported for specifying nullability otherwise they are converted to an ANY
type. The following table shows the mapping:
Avro schema | Flink SQL |
---|---|
record |
ROW |
enum |
VARCHAR |
array |
ARRAY[_] |
map |
MAP[VARCHAR, _] |
union |
non-null type or ANY |
fixed |
ARRAY[TINYINT] |
string |
VARCHAR |
bytes |
ARRAY[TINYINT] |
int |
INT |
long |
BIGINT |
float |
FLOAT |
double |
DOUBLE |
boolean |
BOOLEAN |
int with logicalType: date |
DATE |
int with logicalType: time-millis |
TIME |
int with logicalType: time-micros |
INT |
long with logicalType: timestamp-millis |
TIMESTAMP |
long with logicalType: timestamp-micros |
BIGINT |
bytes with logicalType: decimal |
DECIMAL |
fixed with logicalType: decimal |
DECIMAL |
null |
NULL (unsupported yet) |
Avro uses Joda-Time for representing logical date and time types in specific record classes. The Joda-Time dependency is not part of Flink’s distribution. Therefore, make sure that Joda-Time is in your classpath together with your specific record class during runtime. Avro formats specified via a schema string do not require Joda-Time to be present.
Make sure to add the Apache Avro dependency.
The following table sources and sinks have not yet been migrated (or have not been migrated entirely) to the new unified interfaces.
These are the additional TableSource
s which are provided with Flink:
Class name | Maven dependency | Batch? | Streaming? | Description |
OrcTableSource |
flink-orc |
Y | N | A TableSource for ORC files. |
These are the additional TableSink
s which are provided with Flink:
Class name | Maven dependency | Batch? | Streaming? | Description |
CsvTableSink |
flink-table |
Y | Append | A simple sink for CSV files. |
JDBCAppendTableSink |
flink-jdbc |
Y | Append | Writes a Table to a JDBC table. |
CassandraAppendTableSink |
flink-connector-cassandra |
N | Append | Writes a Table to a Cassandra table. |
The OrcTableSource
reads ORC files. ORC is a file format for structured data and stores the data in a compressed, columnar representation. ORC is very storage efficient and supports projection and filter push-down.
An OrcTableSource
is created as shown below:
Note: The OrcTableSource
does not support ORC’s Union
type yet.
The CsvTableSink
emits a Table
to one or more CSV files.
The sink only supports append-only streaming tables. It cannot be used to emit a Table
that is continuously updated. See the documentation on Table to Stream conversions for details. When emitting a streaming table, rows are written at least once (if checkpointing is enabled) and the CsvTableSink
does not split output files into bucket files but continuously writes to the same files.
The JDBCAppendTableSink
emits a Table
to a JDBC connection. The sink only supports append-only streaming tables. It cannot be used to emit a Table
that is continuously updated. See the documentation on Table to Stream conversions for details.
The JDBCAppendTableSink
inserts each Table
row at least once into the database table (if checkpointing is enabled). However, you can specify the insertion query using REPLACE
or INSERT OVERWRITE
to perform upsert writes to the database.
To use the JDBC sink, you have to add the JDBC connector dependency (flink-jdbc
) to your project. Then you can create the sink using JDBCAppendSinkBuilder
:
Similar to using JDBCOutputFormat
, you have to explicitly specify the name of the JDBC driver, the JDBC URL, the query to be executed, and the field types of the JDBC table.
The CassandraAppendTableSink
emits a Table
to a Cassandra table. The sink only supports append-only streaming tables. It cannot be used to emit a Table
that is continuously updated. See the documentation on Table to Stream conversions for details.
The CassandraAppendTableSink
inserts all rows at least once into the Cassandra table if checkpointing is enabled. However, you can specify the query as upsert query.
To use the CassandraAppendTableSink
, you have to add the Cassandra connector dependency (flink-connector-cassandra
) to your project. The example below shows how to use the CassandraAppendTableSink
.