Flink’s Table API & SQL programs can be connected to other external systems for reading and writing both batch and streaming tables. A table source provides access to data which is stored in external systems (such as a database, key-value store, message queue, or file system). A table sink emits a table to an external storage system. Depending on the type of source and sink, they support different formats such as CSV, Avro, Parquet, or ORC.
This page describes how to register table sources and table sinks in Flink using the natively supported connectors. After a source or sink has been registered, it can be accessed by Table API & SQL statements.
NOTE If you want to implement your own custom table source or sink, have a look at the user-defined sources & sinks page.
Attention Flink Table & SQL introduces a new set of connector options since 1.11.0, if you are using the legacy connector options, please refer to the legacy documentation.
Flink natively support various connectors. The following tables list all available connectors.
Name | Version | Source | Sink |
---|---|---|---|
Filesystem | Bounded and Unbounded Scan, Lookup | Streaming Sink, Batch Sink | |
Elasticsearch | 6.x & 7.x | Not supported | Streaming Sink, Batch Sink |
Apache Kafka | 0.10+ | Unbounded Scan | Streaming Sink, Batch Sink |
JDBC | Bounded Scan, Lookup | Streaming Sink, Batch Sink | |
Apache HBase | 1.4.x | Bounded Scan, Lookup | Streaming Sink, Batch Sink |
Flink supports to use SQL CREATE TABLE statement to register a table. One can define the table name, the table schema, and the table options for connecting to an external system.
The following code shows a full example of how to connect to Kafka for reading Json records.
In this way the desired connection properties are converted into string-based key-value pairs. So-called table factories create configured table sources, table sinks, and corresponding formats from the key-value pairs. All table factories that can be found via Java’s Service Provider Interfaces (SPI) are taken into account when searching for exactly-one matching table factory.
If no factory can be found or multiple factories match for the given properties, an exception will be thrown with additional information about considered factories and supported properties.
The body clause of a SQL CREATE TABLE
statement defines the names and types of columns, constraints and watermarks. Flink doesn’t hold the data, thus the schema definition only declares how to map types from an external system to Flink’s representation. The mapping may not be mapped by names, it depends on the implementation of formats and connectors. For example, a MySQL database table is mapped by field names (not case sensitive), and a CSV filesystem is mapped by field order (field names can be arbitrary). This will be explained in every connectors.
The following example shows a simple schema without time attributes and one-to-one field mapping of input/output to table columns.
Primary key constraints tell that a column or a set of columns of a table are unique and they do not contain nulls. Primary key uniquely identifies a row in a table.
The primary key of a source table is a metadata information for optimization. The primary key of a sink table is usually used by the sink implementation for upserting.
SQL standard specifies that a constraint can either be ENFORCED or NOT ENFORCED. This controls if the constraint checks are performed on the incoming/outgoing data. Flink does not own the data the only mode we want to support is the NOT ENFORCED mode. Its up to the user to ensure that the query enforces key integrity.
Time attributes are essential when working with unbounded streaming tables. Therefore both proctime and rowtime attributes can be defined as part of the schema.
For more information about time handling in Flink and especially event-time, we recommend the general event-time section.
In order to declare a proctime attribute in the schema, you can use Computed Column syntax to declare a computed column which is generated from PROCTIME()
builtin function.
The computed column is a virtual column which is not stored in the physical data.
In order to control the event-time behavior for tables, Flink provides predefined timestamp extractors and watermark strategies.
Please refer to CREATE TABLE statements for more information about defining time attributes in DDL.
The following timestamp extractors are supported:
The following watermark strategies are supported:
Make sure to always declare both timestamps and watermarks. Watermarks are required for triggering time-based operations.
Please see the Data Types page about how to declare a type in SQL.