This documentation is for an unreleased version of Apache Flink Stateful Functions. We recommend you use the latest stable version.
Apache Kafka #
Stateful Functions offers an Apache Kafka I/O Module for reading from and writing to Kafka topics. It is based on Apache Flink’s universal Kafka connector and provides exactly-once processing semantics. Kafka is configured in the module specification of your application.
Kafka Ingress Spec #
A Kafka ingress defines an input point that reads records from one or more topics.
version: "3.0"
module:
meta:
type: remote
spec:
ingresses:
- ingress:
meta:
type: io.statefun.kafka/ingress
id: com.example/users
spec:
address: kafka-broker:9092
consumerGroupId: my-consumer-group
startupPosition:
type: earliest
topics:
- topic: messages-1
valueType: com.example/User
targets:
- com.example.fns/greeter
The ingress also accepts properties to directly configure the Kafka client, using ingress.spec.properties
.
Please refer to the Kafka consumer configuration documentation for the full list of available properties.
Note that configuration passed using named paths, such as ingress.spec.address
, will have higher precedence and overwrite their respective settings in the provided properties.
Startup Position #
The ingress allows configuring the startup position to be one of the following:
From Group Offset (default) #
Starts from offsets that were committed to Kafka for the specified consumer group.
startupPosition:
type: group-offsets
Earliest #
Starts from the earliest offset.
startupPosition:
type: earliest
Latest #
Starts from the latest offset.
startupPosition:
type: latest
Specific Offsets #
Starts from specific offsets, defined as a map of partitions to their target starting offset.
startupPosition:
type: specific-offsets
offsets:
- user-topic/0: 91
- user-topic/1: 11
- user-topic/2: 8
Date #
Starts from offsets that have an ingestion time larger than or equal to a specified date.
startupPosition:
type: date
date: 2020-02-01 04:15:00.00 Z
On startup, if the specified startup offset for a partition is out-of-range or does not exist (which may be the case if the ingress is configured to start from group offsets, specific offsets, or from a date), then the ingress will fallback to using the position configured using ingress.spec.autoOffsetResetPosition
which may be set to either latest
or earliest
.
By default, this is set to be the latest
position.
Kafka Egress Spec #
A Kafka egress defines an input point where functions can write out records to one or more topics.
version: "3.0"
module:
meta:
type: remote
spec:
egresses:
- egress:
meta:
type: io.statefun.kafka/egress
id: example/output-messages
spec:
address: kafka-broker:9092
deliverySemantic:
type: exactly-once
transactionTimeout: 15min
properties:
- foo.config: bar
Please refer to the Kafka producer configuration documentation for the full list of available properties.
Kafka Egress and Fault Tolerance #
With fault tolerance enabled, the Kafka egress can provide exactly-once delivery guarantees. You can choose three different modes of operation.
None #
Nothing is guaranteed, produced records can be lost or duplicated.
deliverySemantic:
type: none
At Least Once #
Stateful Functions will guarantee that no records will be lost but they can be duplicated.
deliverySemantic:
type: at-least-once
Exactly Once #
Stateful Functions uses Kafka transactions to provide exactly-once semantics.
deliverySemantic:
type: exactly-once
transactionTimeoutMillis: 15min
Writing To Kafka #
Functions write directly to Kafka from their SDK context. See SDK specific documentation for more details.