This documentation is for an unreleased version of Apache Flink Stateful Functions. We recommend you use the latest stable version.
Apache Kafka #
Stateful Functions offers an Apache Kafka I/O Module for reading from and writing to Kafka topics. It is based on Apache Flink’s universal Kafka connector and provides exactly-once processing semantics. Kafka is configured in the module specification of your application.
Kafka Ingress Spec #
A Kafka ingress defines an input point that reads records from one or more topics.
kind: io.statefun.kafka.v1/ingress spec: id: com.example/users address: kafka-broker:9092 consumerGroupId: my-consumer-group startupPosition: type: earliest topics: - topic: messages-1 valueType: com.example/User targets: - com.example.fns/greeter
The ingress also accepts properties to directly configure the Kafka client, using
Please refer to the Kafka consumer configuration documentation for the full list of available properties.
Note that configuration passed using named paths, such as
ingress.spec.address, will have higher precedence and overwrite their respective settings in the provided properties.
Startup Position #
The ingress allows configuring the startup position to be one of the following:
From Group Offset (default) #
Starts from offsets that were committed to Kafka for the specified consumer group.
startupPosition: type: group-offsets
Starts from the earliest offset.
startupPosition: type: earliest
Starts from the latest offset.
startupPosition: type: latest
Specific Offsets #
Starts from specific offsets, defined as a map of partitions to their target starting offset.
startupPosition: type: specific-offsets offsets: - user-topic/0: 91 - user-topic/1: 11 - user-topic/2: 8
Starts from offsets that have an ingestion time larger than or equal to a specified date.
startupPosition: type: date date: 2020-02-01 04:15:00.00 Z
On startup, if the specified startup offset for a partition is out-of-range or does not exist (which may be the case if the ingress is configured to start from group offsets, specific offsets, or from a date), then the ingress will fallback to using the position configured using
ingress.spec.autoOffsetResetPosition which may be set to either
By default, this is set to be the
Kafka Egress Spec #
A Kafka egress defines an input point where functions can write out records to one or more topics.
kind: io.statefun.kafka.v1/egress spec: id: com.example/users address: kafka-broker:9092 deliverySemantic: type: exactly-once transactionTimeout: 15min properties: - foo.config: bar
Please refer to the Kafka producer configuration documentation for the full list of available properties.
Kafka Egress and Fault Tolerance #
With fault tolerance enabled, the Kafka egress can provide exactly-once delivery guarantees. You can choose three different modes of operation.
Nothing is guaranteed, produced records can be lost or duplicated.
deliverySemantic: type: none
At Least Once #
Stateful Functions will guarantee that no records will be lost but they can be duplicated.
deliverySemantic: type: at-least-once
Exactly Once #
Stateful Functions uses Kafka transactions to provide exactly-once semantics.
deliverySemantic: type: exactly-once transactionTimeoutMillis: 15min
Writing To Kafka #
Functions write directly to Kafka from their SDK context. See SDK specific documentation for more details.