@PublicEvolving public class FlinkKafkaProducer<K,V> extends Object implements org.apache.kafka.clients.producer.Producer<K,V>
For happy path usage is exactly the same as KafkaProducer
. User is
expected to call:
initTransactions()
beginTransaction()
send(org.apache.kafka.clients.producer.ProducerRecord)
flush()
commitTransaction()
To actually implement two phase commit, it must be possible to always commit a transaction after pre-committing
it (here, pre-commit is just a flush()
). In case of some failure between
flush()
and commitTransaction()
this class allows to resume
interrupted transaction and commit if after a restart:
initTransactions()
beginTransaction()
send(org.apache.kafka.clients.producer.ProducerRecord)
flush()
getProducerId()
getEpoch()
resumeTransaction(long, short)
commitTransaction()
resumeTransaction(long, short)
replaces initTransactions()
as a way to obtain the producerId and epoch counters. It has to be done, because otherwise
initTransactions()
would automatically abort all on going transactions.
Second way this implementation differs from the reference KafkaProducer
is that this one actually flushes new partitions on flush()
instead of on
commitTransaction()
.
The last one minor difference is that it allows to obtain the producerId and epoch counters via
getProducerId()
and getEpoch()
methods (which are unfortunately
private fields).
Those changes are compatible with Kafka's 0.11.0 REST API although it clearly was not the intention of the Kafka's API authors to make them possible.
Internally this implementation uses KafkaProducer
and implements
required changes via Java Reflection API. It might not be the prettiest solution. An alternative would be to
re-implement whole Kafka's 0.11 REST API client on our own.
Constructor and Description |
---|
FlinkKafkaProducer(Properties properties) |
Modifier and Type | Method and Description |
---|---|
void |
abortTransaction() |
void |
beginTransaction() |
void |
close() |
void |
close(long timeout,
TimeUnit unit) |
void |
commitTransaction() |
void |
flush() |
short |
getEpoch() |
long |
getProducerId() |
String |
getTransactionalId() |
int |
getTransactionCoordinatorId() |
void |
initTransactions() |
Map<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric> |
metrics() |
List<org.apache.kafka.common.PartitionInfo> |
partitionsFor(String topic) |
void |
resumeTransaction(long producerId,
short epoch)
Instead of obtaining producerId and epoch from the transaction coordinator, re-use previously obtained ones,
so that we can resume transaction after a restart.
|
Future<org.apache.kafka.clients.producer.RecordMetadata> |
send(org.apache.kafka.clients.producer.ProducerRecord<K,V> record) |
Future<org.apache.kafka.clients.producer.RecordMetadata> |
send(org.apache.kafka.clients.producer.ProducerRecord<K,V> record,
org.apache.kafka.clients.producer.Callback callback) |
void |
sendOffsetsToTransaction(Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets,
String consumerGroupId) |
public FlinkKafkaProducer(Properties properties)
public void initTransactions()
public void beginTransaction() throws org.apache.kafka.common.errors.ProducerFencedException
org.apache.kafka.common.errors.ProducerFencedException
public void commitTransaction() throws org.apache.kafka.common.errors.ProducerFencedException
org.apache.kafka.common.errors.ProducerFencedException
public void abortTransaction() throws org.apache.kafka.common.errors.ProducerFencedException
org.apache.kafka.common.errors.ProducerFencedException
public void sendOffsetsToTransaction(Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets, String consumerGroupId) throws org.apache.kafka.common.errors.ProducerFencedException
org.apache.kafka.common.errors.ProducerFencedException
public Future<org.apache.kafka.clients.producer.RecordMetadata> send(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)
public Future<org.apache.kafka.clients.producer.RecordMetadata> send(org.apache.kafka.clients.producer.ProducerRecord<K,V> record, org.apache.kafka.clients.producer.Callback callback)
public Map<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric> metrics()
public void close()
public void close(long timeout, TimeUnit unit)
public void flush()
public void resumeTransaction(long producerId, short epoch)
org.apache.kafka.clients.producer.KafkaProducer#initTransactions
.public long getProducerId()
public short getEpoch()
@VisibleForTesting public int getTransactionCoordinatorId()
Copyright © 2014–2020 The Apache Software Foundation. All rights reserved.