@PublicEvolving public class FlinkKafkaProducer<K,V> extends Object implements org.apache.kafka.clients.producer.Producer<K,V>
For happy path usage is exactly the same as KafkaProducer
. User is expected to call:
initTransactions()
beginTransaction()
send(org.apache.kafka.clients.producer.ProducerRecord)
flush()
commitTransaction()
To actually implement two phase commit, it must be possible to always commit a transaction
after pre-committing it (here, pre-commit is just a flush()
). In case
of some failure between flush()
and commitTransaction()
this class allows to resume interrupted transaction and
commit if after a restart:
initTransactions()
beginTransaction()
send(org.apache.kafka.clients.producer.ProducerRecord)
flush()
getProducerId()
getEpoch()
resumeTransaction(long, short)
commitTransaction()
resumeTransaction(long, short)
replaces initTransactions()
as a way to obtain the producerId and epoch counters. It
has to be done, because otherwise initTransactions()
would
automatically abort all on going transactions.
Second way this implementation differs from the reference KafkaProducer
is that this one actually flushes new partitions
on flush()
instead of on commitTransaction()
.
The last one minor difference is that it allows to obtain the producerId and epoch counters
via getProducerId()
and getEpoch()
methods
(which are unfortunately private fields).
Those changes are compatible with Kafka's 0.11.0 REST API although it clearly was not the intention of the Kafka's API authors to make them possible.
Internally this implementation uses KafkaProducer
and implements required changes via Java Reflection API. It might not be the prettiest solution.
An alternative would be to re-implement whole Kafka's 0.11 REST API client on our own.
Constructor and Description |
---|
FlinkKafkaProducer(Properties properties) |
Modifier and Type | Method and Description |
---|---|
void |
abortTransaction() |
void |
beginTransaction() |
void |
close() |
void |
close(long timeout,
TimeUnit unit) |
void |
commitTransaction() |
void |
flush() |
short |
getEpoch() |
long |
getProducerId() |
String |
getTransactionalId() |
int |
getTransactionCoordinatorId() |
void |
initTransactions() |
Map<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric> |
metrics() |
List<org.apache.kafka.common.PartitionInfo> |
partitionsFor(String topic) |
void |
resumeTransaction(long producerId,
short epoch)
Instead of obtaining producerId and epoch from the transaction coordinator, re-use previously
obtained ones, so that we can resume transaction after a restart.
|
Future<org.apache.kafka.clients.producer.RecordMetadata> |
send(org.apache.kafka.clients.producer.ProducerRecord<K,V> record) |
Future<org.apache.kafka.clients.producer.RecordMetadata> |
send(org.apache.kafka.clients.producer.ProducerRecord<K,V> record,
org.apache.kafka.clients.producer.Callback callback) |
void |
sendOffsetsToTransaction(Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets,
String consumerGroupId) |
public FlinkKafkaProducer(Properties properties)
public void initTransactions()
public void beginTransaction() throws org.apache.kafka.common.errors.ProducerFencedException
public void commitTransaction() throws org.apache.kafka.common.errors.ProducerFencedException
public void abortTransaction() throws org.apache.kafka.common.errors.ProducerFencedException
public void sendOffsetsToTransaction(Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets, String consumerGroupId) throws org.apache.kafka.common.errors.ProducerFencedException
public Future<org.apache.kafka.clients.producer.RecordMetadata> send(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)
public Future<org.apache.kafka.clients.producer.RecordMetadata> send(org.apache.kafka.clients.producer.ProducerRecord<K,V> record, org.apache.kafka.clients.producer.Callback callback)
public Map<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric> metrics()
public void close()
public void close(long timeout, TimeUnit unit)
public void flush()
public void resumeTransaction(long producerId, short epoch)
KafkaProducer.initTransactions()
.public long getProducerId()
public short getEpoch()
@VisibleForTesting public int getTransactionCoordinatorId()
Copyright © 2014–2021 The Apache Software Foundation. All rights reserved.