This connector provides access to event streams served by Apache Kafka.
Flink provides special Kafka Connectors for reading and writing data from/to Kafka topics. The Flink Kafka Consumer integrates with Flink’s checkpointing mechanism to provide exactly-once processing semantics. To achieve that, Flink does not purely rely on Kafka’s consumer group offset tracking, but tracks and checkpoints these offsets internally as well.
Please pick a package (maven artifact id) and class name for your use-case and environment.
For most users, the FlinkKafkaConsumer08
(part of flink-connector-kafka
) is appropriate.
Maven Dependency | Supported since | Consumer and Producer Class name |
Kafka version | Notes |
---|---|---|---|---|
flink-connector-kafka-0.8_2.10 | 1.0.0 | FlinkKafkaConsumer08 FlinkKafkaProducer08 |
0.8.x | Uses the SimpleConsumer API of Kafka internally. Offsets are committed to ZK by Flink. |
flink-connector-kafka-0.9_2.10 | 1.0.0 | FlinkKafkaConsumer09 FlinkKafkaProducer09 |
0.9.x | Uses the new Consumer API Kafka. |
flink-connector-kafka-0.10_2.10 | 1.2.0 | FlinkKafkaConsumer010 FlinkKafkaProducer010 |
0.10.x | This connector supports Kafka messages with timestamps both for producing and consuming. |
Then, import the connector in your maven project:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.10</artifactId>
<version>1.3.2</version>
</dependency>
Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution here.
advertised.host.name
setting in the config/server.properties
file must be set to the machine’s IP address.Flink’s Kafka consumer is called FlinkKafkaConsumer08
(or 09
for Kafka 0.9.0.x versions, etc.). It provides access to one or more Kafka topics.
The constructor accepts the following arguments:
Example:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
stream = env
.addSource(new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties))
.print()
DeserializationSchema
The Flink Kafka Consumer needs to know how to turn the binary data in Kafka into Java/Scala objects. The
DeserializationSchema
allows users to specify such a schema. The T deserialize(byte[] message)
method gets called for each Kafka message, passing the value from Kafka.
It is usually helpful to start from the AbstractDeserializationSchema
, which takes care of describing the
produced Java/Scala type to Flink’s type system. Users that implement a vanilla DeserializationSchema
need
to implement the getProducedType(...)
method themselves.
For accessing both the key and value of the Kafka message, the KeyedDeserializationSchema
has
the following deserialize method ` T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)`.
For convenience, Flink provides the following schemas:
TypeInformationSerializationSchema
(and TypeInformationKeyValueSerializationSchema
) which creates
a schema based on a Flink’s TypeInformation
. This is useful if the data is both written and read by Flink.
This schema is a performant Flink-specific alternative to other generic serialization approaches.
JsonDeserializationSchema
(and JSONKeyValueDeserializationSchema
) which turns the serialized JSON
into an ObjectNode object, from which fields can be accessed using objectNode.get(“field”).as(Int/String/…)().
The KeyValue objectNode contains a “key” and “value” field which contain all fields, as well as
an optional “metadata” field that exposes the offset/partition/topic for this message.
When encountering a corrupted message that cannot be deserialized for any reason, there
are two options - either throwing an exception from the deserialize(...)
method
which will cause the job to fail and be restarted, or returning null
to allow
the Flink Kafka consumer to silently skip the corrupted message. Note that
due to the consumer’s fault tolerance (see below sections for more details),
failing the job on the corrupted message will let the consumer attempt
to deserialize the message again. Therefore, if deserialization still fails, the
consumer will fall into a non-stop restart and fail loop on that corrupted
message.
The Flink Kafka Consumer allows configuring how the start position for Kafka partitions are determined.
Example:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
myConsumer.setStartFromEarliest(); // start from the earliest record possible
myConsumer.setStartFromLatest(); // start from the latest record
myConsumer.setStartFromGroupOffsets(); // the default behaviour
DataStream<String> stream = env.addSource(myConsumer);
...
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val myConsumer = new FlinkKafkaConsumer08[String](...)
myConsumer.setStartFromEarliest() // start from the earliest record possible
myConsumer.setStartFromLatest() // start from the latest record
myConsumer.setStartFromGroupOffsets() // the default behaviour
val stream = env.addSource(myConsumer)
...
All versions of the Flink Kafka Consumer have the above explicit configuration methods for start position.
setStartFromGroupOffsets
(default behaviour): Start reading partitions from
the consumer group’s (group.id
setting in the consumer properties) committed
offsets in Kafka brokers (or Zookeeper for Kafka 0.8). If offsets could not be
found for a partition, the auto.offset.reset
setting in the properties will be used.setStartFromEarliest()
/ setStartFromLatest()
: Start from the earliest / latest
record. Under these modes, committed offsets in Kafka will be ignored and
not used as starting positions.You can also specify the exact offsets the consumer should start from for each partition:
Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);
myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L)
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L)
myConsumer.setStartFromSpecificOffsets(specificStartOffsets)
The above example configures the consumer to start from the specified offsets for
partitions 0, 1, and 2 of topic myTopic
. The offset values should be the
next record that the consumer should read for each partition. Note that
if the consumer needs to read a partition which does not have a specified
offset within the provided offsets map, it will fallback to the default
group offsets behaviour (i.e. setStartFromGroupOffsets()
) for that
particular partition.
Note that these start position configuration methods do not affect the start position when the job is automatically restored from a failure or manually restored using a savepoint. On restore, the start position of each Kafka partition is determined by the offsets stored in the savepoint or checkpoint (please see the next section for information about checkpointing to enable fault tolerance for the consumer).
With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all its Kafka offsets, together with the state of other operations, in a consistent manner. In case of a job failure, Flink will restore the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that were stored in the checkpoint.
The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure.
To use fault tolerant Kafka Consumers, checkpointing of the topology needs to be enabled at the execution environment:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
Also note that Flink can only restart the topology if enough processing slots are available to restart the topology. So if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers.
If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper.
The Flink Kafka Consumer allows configuring the behaviour of how offsets are committed back to Kafka brokers (or Zookeeper in 0.8). Note that the Flink Kafka Consumer does not rely on the committed offsets for fault tolerance guarantees. The committed offsets are only a means to expose the consumer’s progress for monitoring purposes.
The way to configure offset commit behaviour is different, depending on whether or not checkpointing is enabled for the job.
Checkpointing disabled: if checkpointing is disabled, the Flink Kafka
Consumer relies on the automatic periodic offset committing capability
of the internally used Kafka clients. Therefore, to disable or enable offset
committing, simply set the enable.auto.commit
(or auto.commit.enable
for Kafka 0.8) / auto.commit.interval.ms
keys to appropriate values
in the provided Properties
configuration.
Checkpointing enabled: if checkpointing is enabled, the Flink Kafka
Consumer will commit the offsets stored in the checkpointed states when
the checkpoints are completed. This ensures that the committed offsets
in Kafka brokers is consistent with the offsets in the checkpointed states.
Users can choose to disable or enable offset committing by calling the
setCommitOffsetsOnCheckpoints(boolean)
method on the consumer (by default,
the behaviour is true
).
Note that in this scenario, the automatic periodic offset committing
settings in Properties
is completely ignored.
In many scenarios, the timestamp of a record is embedded (explicitly or implicitly) in the record itself.
In addition, the user may want to emit watermarks either periodically, or in an irregular fashion, e.g. based on
special records in the Kafka stream that contain the current event-time watermark. For these cases, the Flink Kafka
Consumer allows the specification of an AssignerWithPeriodicWatermarks
or an AssignerWithPunctuatedWatermarks
.
You can specify your custom timestamp extractor/watermark emitter as described here, or use one from the predefined ones. After doing so, you can pass it to your consumer in the following way:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer08<String> myConsumer =
new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
DataStream<String> stream = env
.addSource(myConsumer)
.print();
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties)
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
stream = env
.addSource(myConsumer)
.print()
Internally, an instance of the assigner is executed per Kafka partition.
When such an assigner is specified, for each record read from Kafka, the
extractTimestamp(T element, long previousElementTimestamp)
is called to assign a timestamp to the record and
the Watermark getCurrentWatermark()
(for periodic) or the
Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp)
(for punctuated) is called to determine
if a new watermark should be emitted and with which timestamp.
Flink’s Kafka Producer is called FlinkKafkaProducer08
(or 09
for Kafka 0.9.0.x versions, etc.).
It allows writing a stream of records to one or more Kafka topics.
Example:
DataStream<String> stream = ...;
FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(
"localhost:9092", // broker list
"my-topic", // target topic
new SimpleStringSchema()); // serialization schema
// the following is necessary for at-least-once delivery guarantee
myProducer.setLogFailuresOnly(false); // "false" by default
myProducer.setFlushOnCheckpoint(true); // "false" by default
stream.addSink(myProducer);
DataStream<String> stream = ...;
FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
stream, // input stream
"my-topic", // target topic
new SimpleStringSchema(), // serialization schema
properties); // custom configuration for KafkaProducer (including broker list)
// the following is necessary for at-least-once delivery guarantee
myProducerConfig.setLogFailuresOnly(false); // "false" by default
myProducerConfig.setFlushOnCheckpoint(true); // "false" by default
val stream: DataStream[String] = ...
val myProducer = new FlinkKafkaProducer08[String](
"localhost:9092", // broker list
"my-topic", // target topic
new SimpleStringSchema) // serialization schema
// the following is necessary for at-least-once delivery guarantee
myProducer.setLogFailuresOnly(false) // "false" by default
myProducer.setFlushOnCheckpoint(true) // "false" by default
stream.addSink(myProducer)
val stream: DataStream[String] = ...
val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
stream, // input stream
"my-topic", // target topic
new SimpleStringSchema, // serialization schema
properties) // custom configuration for KafkaProducer (including broker list)
// the following is necessary for at-least-once delivery guarantee
myProducerConfig.setLogFailuresOnly(false) // "false" by default
myProducerConfig.setFlushOnCheckpoint(true) // "false" by default
The above examples demonstrate the basic usage of creating a Flink Kafka Producer to write streams to a single Kafka target topic. For more advanced usages, there are other constructor variants that allow providing the following:
KafkaProducer
.
Please refer to the Apache Kafka documentation for
details on how to configure Kafka Producers.FlinkKafkaPartitioner
to the
constructor. This partitioner will be called for each record in the stream
to determine which exact partition of the target topic the record should be sent to.KeyedSerializationSchema
,
which allows serializing the key and value separately. It also allows to override the target topic,
so that one producer instance can send data to multiple topics.With Flink’s checkpointing enabled, the Flink Kafka Producer can provide at-least-once delivery guarantees.
Besides enabling Flink’s checkpointing, you should also configure the setter
methods setLogFailuresOnly(boolean)
and setFlushOnCheckpoint(boolean)
appropriately,
as shown in the above examples in the previous section:
setLogFailuresOnly(boolean)
: enabling this will let the producer log failures only
instead of catching and rethrowing them. This essentially accounts the record
to have succeeded, even if it was never written to the target Kafka topic. This
must be disabled for at-least-once.setFlushOnCheckpoint(boolean)
: with this enabled, Flink’s checkpoints will wait for any
on-the-fly records at the time of the checkpoint to be acknowledged by Kafka before
succeeding the checkpoint. This ensures that all records before the checkpoint have
been written to Kafka. This must be enabled for at-least-once.Note: By default, the number of retries is set to “0”. This means that when setLogFailuresOnly
is set to false
,
the producer fails immediately on errors, including leader changes. The value is set to “0” by default to avoid
duplicate messages in the target topic that are caused by retries. For most production environments with frequent broker changes,
we recommend setting the number of retries to a higher value.
Note: There is currently no transactional producer for Kafka, so Flink can not guarantee exactly-once delivery into a Kafka topic.
Since Apache Kafka 0.10+, Kafka’s messages can carry timestamps, indicating the time the event has occurred (see “event time” in Apache Flink) or the time when the message has been written to the Kafka broker.
The FlinkKafkaConsumer010
will emit records with the timestamp attached, if the time characteristic in Flink is
set to TimeCharacteristic.EventTime
(StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
).
The Kafka consumer does not emit watermarks. To emit watermarks, the same mechanisms as described above in
“Kafka Consumers and Timestamp Extraction/Watermark Emission” using the assignTimestampsAndWatermarks
method are applicable.
There is no need to define a timestamp extractor when using the timestamps from Kafka. The previousElementTimestamp
argument of
the extractTimestamp()
method contains the timestamp carried by the Kafka message.
A timestamp extractor for a Kafka consumer would look like this:
public long extractTimestamp(Long element, long previousElementTimestamp) {
return previousElementTimestamp;
}
The FlinkKafkaProducer010
only emits the record timestamp, if setWriteTimestampToKafka(true)
is set.
FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new SimpleStringSchema(), standardProps);
config.setWriteTimestampToKafka(true);
Flink’s Kafka connectors provide some metrics through Flink’s metrics system to analyze the behavior of the connector. The producers export Kafka’s internal metrics through Flink’s metric system for all supported versions. The consumers export all metrics starting from Kafka version 0.9. The Kafka documentation lists all exported metrics in its documentation.
In addition to these metrics, all consumers expose the current-offsets
and committed-offsets
for each topic partition.
The current-offsets
refers to the current offset in the partition. This refers to the offset of the last element that
we retrieved and emitted successfully. The committed-offsets
is the last committed offset.
The Kafka Consumers in Flink commit the offsets back to Zookeeper (Kafka 0.8) or the Kafka brokers (Kafka 0.9+). If checkpointing is disabled, offsets are committed periodically. With checkpointing, the commit happens once all operators in the streaming topology have confirmed that they’ve created a checkpoint of their state. This provides users with at-least-once semantics for the offsets committed to Zookeeper or the broker. For offsets checkpointed to Flink, the system provides exactly once guarantees.
The offsets committed to ZK or the broker can also be used to track the read progress of the Kafka consumer. The difference between the committed offset and the most recent offset in each partition is called the consumer lag. If the Flink topology is consuming the data slower from the topic than new data is added, the lag will increase and the consumer will fall behind. For large production deployments we recommend monitoring that metric to avoid increasing latency.
Flink provides first-class support through the Kafka connector to authenticate to a Kafka installation
configured for Kerberos. Simply configure Flink in flink-conf.yaml
to enable Kerberos authentication for Kafka like so:
security.kerberos.login.use-ticket-cache
: By default, this is true
and Flink will attempt to use Kerberos credentials in ticket caches managed by kinit
.
Note that when using the Kafka connector in Flink jobs deployed on YARN, Kerberos authorization using ticket caches will not work. This is also the case when deploying using Mesos, as authorization using ticket cache is not supported for Mesos deployments.security.kerberos.login.keytab
and security.kerberos.login.principal
: To use Kerberos keytabs instead, set values for both of these properties.KafkaClient
to security.kerberos.login.contexts
: This tells Flink to provide the configured Kerberos credentials to the Kafka login context to be used for Kafka authentication.Once Kerberos-based Flink security is enabled, you can authenticate to Kafka with either the Flink Kafka Consumer or Producer by simply including the following two settings in the provided properties configuration that is passed to the internal Kafka client:
security.protocol
to SASL_PLAINTEXT
(default NONE
): The protocol used to communicate to Kafka brokers.
When using standalone Flink deployment, you can also use SASL_SSL
; please see how to configure the Kafka client for SSL here.sasl.kerberos.service.name
to kafka
(default kafka
): The value for this should match the sasl.kerberos.service.name
used for Kafka broker configurations. A mismatch in service name between client and server configuration will cause the authentication to fail.For more information on Flink configuration for Kerberos security, please see here. You can also find here further details on how Flink internally setups Kerberos-based security.