Flink’s RabbitMQ connector defines a Maven dependency on the “RabbitMQ AMQP Java Client”, licensed under the Mozilla Public License v1.1 (MPL 1.1).
Flink itself neither reuses source code from the “RabbitMQ AMQP Java Client” nor packages binaries from the “RabbitMQ AMQP Java Client”.
Users that create and publish derivative work based on Flink’s RabbitMQ connector (thereby re-distributing the “RabbitMQ AMQP Java Client”) must be aware that this may be subject to conditions declared in the Mozilla Public License v1.1 (MPL 1.1).
This connector provides access to data streams from RabbitMQ. To use this connector, add the following dependency to your project:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq_2.10</artifactId>
<version>1.2.1</version>
</dependency>
Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution here.
Follow the instructions from the RabbitMQ download page. After the installation the server automatically starts, and the application connecting to RabbitMQ can be launched.
This connector provides a RMQSource
class to consume messages from a RabbitMQ
queue. This source provides three different levels of guarantees, depending
on how it is configured with Flink:
At-least-once: When checkpointing is enabled, but correlation ids are not used or the source is parallel, the source only provides at-least-once guarantees.
Below is a code example for setting up an exactly-once RabbitMQ source. Inline comments explain which parts of the configuration can be ignored for more relaxed guarantees.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// checkpointing is required for exactly-once or at-least-once guarantees
env.enableCheckpointing(...);
final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5000)
...
.build();
final DataStream<String> stream = env
.addSource(new RMQSource<String>(
connectionConfig, // config for the RabbitMQ connection
"queueName", // name of the RabbitMQ queue to consume
true, // use correlation ids; can be false if only at-least-once is required
new SimpleStringSchema())) // deserialization schema to turn messages into Java objects
.setParallelism(1); // non-parallel source is only required for exactly-once
val env = StreamExecutionEnvironment.getExecutionEnvironment
// checkpointing is required for exactly-once or at-least-once guarantees
env.enableCheckpointing(...)
val connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5000)
...
.build
val stream = env
.addSource(new RMQSource[String](
connectionConfig, // config for the RabbitMQ connection
"queueName", // name of the RabbitMQ queue to consume
true, // use correlation ids; can be false if only at-least-once is required
new SimpleStringSchema)) // deserialization schema to turn messages into Java objects
.setParallelism(1) // non-parallel source is only required for exactly-once
This connector provides a RMQSink
class for sending messages to a RabbitMQ
queue. Below is a code example for setting up a RabbitMQ sink.
final DataStream<String> stream = ...
final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5000)
...
.build();
stream.addSink(new RMQSink<String>(
connectionConfig, // config for the RabbitMQ connection
"queueName", // name of the RabbitMQ queue to send messages to
new SimpleStringSchema())); // serialization schema to turn Java objects to messages
val stream: DataStream[String] = ...
val connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5000)
...
.build
stream.addSink(new RMQSink[String](
connectionConfig, // config for the RabbitMQ connection
"queueName", // name of the RabbitMQ queue to send messages to
new SimpleStringSchema)) // serialization schema to turn Java objects to messages
More about RabbitMQ can be found here.