This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.
JDBC Connector #
This connector provides a sink that writes data to a JDBC database.
To use it, add the following dependency to your project (along with your JDBC driver):
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.13.6</version>
</dependency>
Note that the streaming connectors are currently NOT part of the binary distribution. See how to link with them for cluster execution here.
JdbcSink.sink
#
The JDBC sink provides at-least-once guarantee. Effectively though, exactly-once can be achieved by crafting upsert SQL statements or idempotent SQL updates. Configuration goes as follow (see also JdbcSink javadoc ).
JdbcSink.sink(
sqlDmlStatement, // mandatory
jdbcStatementBuilder, // mandatory
jdbcExecutionOptions, // optional
jdbcConnectionOptions // mandatory
);
SQL DML statement and JDBC statement builder #
The sink builds one JDBC prepared statement from a user-provider SQL string, e.g.:
INSERT INTO some_table field1, field2 values (?, ?)
It then repeatedly calls a user-provided function to update that prepared statement with each value of the stream, e.g.:
(preparedStatement, someRecord) -> { ... update here the preparedStatement with values from someRecord ... }
JDBC execution options #
The SQL DML statements are executed in batches, which can optionally be configured with the following instance (see also JdbcExecutionOptions javadoc )
JdbcExecutionOptions.builder()
.withBatchIntervalMs(200) // optional: default = 0, meaning no time-based execution is done
.withBathSize(1000) // optional: default = 5000 values
.withMaxRetries(5) // optional: default = 3
.build()
A JDBC batch is executed as soon as one of the following conditions is true:
- the configured batch interval time is elapsed
- the maximum batch size is reached
- a Flink checkpoint has started
JDBC connection parameters #
The connection to the database is configured with a JdbcConnectionOptions
instance.
Please see
JdbcConnectionOptions javadoc
for details
Full example #
public class JdbcSinkExample {
static class Book {
public Book(Long id, String title, String authors, Integer year) {
this.id = id;
this.title = title;
this.authors = authors;
this.year = year;
}
final Long id;
final String title;
final String authors;
final Integer year;
}
public static void main(String[] args) throws Exception {
var env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(
new Book(101L, "Stream Processing with Apache Flink", "Fabian Hueske, Vasiliki Kalavri", 2019),
new Book(102L, "Streaming Systems", "Tyler Akidau, Slava Chernyak, Reuven Lax", 2018),
new Book(103L, "Designing Data-Intensive Applications", "Martin Kleppmann", 2017),
new Book(104L, "Kafka: The Definitive Guide", "Gwen Shapira, Neha Narkhede, Todd Palino", 2017)
).addSink(
JdbcSink.sink(
"insert into books (id, title, authors, year) values (?, ?, ?, ?)",
(statement, book) -> {
statement.setLong(1, book.id);
statement.setString(2, book.title);
statement.setString(3, book.authors);
statement.setInt(4, book.year);
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:postgresql://dbhost:5432/postgresdb")
.withDriverName("org.postgresql.Driver")
.withUsername("someUser")
.withPassword("somePassword")
.build()
));
env.execute();
}
}
JdbcSink.exactlyOnceSink
#
Since 1.13, Flink JDBC sink supports exactly-once mode. The implementation relies on the JDBC driver support of XA standard.
Attention: In 1.13, Flink JDBC sink does not support exactly-once mode with MySQL or other databases that do not support multiple XA transaction per connection. We will improve the support in FLINK-22239.
To use it, create a sink using exactlyOnceSink()
method as above and additionally provide:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.fromElements(...)
.addSink(JdbcSink.exactlyOnceSink(
"insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
(ps, t) -> {
ps.setInt(1, t.id);
ps.setString(2, t.title);
ps.setString(3, t.author);
ps.setDouble(4, t.price);
ps.setInt(5, t.qty);
},
JdbcExecutionOptions.builder().build(),
JdbcExactlyOnceOptions.defaults(),
() -> {
// create a driver-specific XA DataSource
EmbeddedXADataSource ds = new EmbeddedXADataSource();
ds.setDatabaseName("my_db");
return ds;
});
env.execute();
Please refer to the JdbcXaSinkFunction
documentation for more details.