IN
- Type of the messages to write into Kafka.@Internal public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> implements CheckpointedFunction
Please note that this producer provides at-least-once reliability guarantees when checkpoints are enabled and setFlushOnCheckpoint(true) is set. Otherwise, the producer doesn't provide any reliability guarantees.
SinkFunction.Context<T>
Modifier and Type | Field and Description |
---|---|
protected Exception |
asyncException
Errors encountered in the async producer are stored here.
|
protected org.apache.kafka.clients.producer.Callback |
callback
The callback than handles error propagation or logging callbacks.
|
protected String |
defaultTopicId
The name of the default topic this producer is writing data to.
|
protected FlinkKafkaPartitioner<IN> |
flinkKafkaPartitioner
User-provided partitioner for assigning an object to a Kafka partition for each topic.
|
protected boolean |
flushOnCheckpoint
If true, the producer will wait until all outstanding records have been send to the broker.
|
static String |
KEY_DISABLE_METRICS
Configuration key for disabling the metrics reporting.
|
protected boolean |
logFailuresOnly
Flag indicating whether to accept failures (and log them), or to fail on failures.
|
protected long |
pendingRecords
Number of unacknowledged records.
|
protected SerializableObject |
pendingRecordsLock
Lock for accessing the pending records.
|
protected org.apache.kafka.clients.producer.KafkaProducer<byte[],byte[]> |
producer
KafkaProducer instance.
|
protected Properties |
producerConfig
User defined properties for the Producer.
|
protected KeyedSerializationSchema<IN> |
schema
(Serializable) SerializationSchema for turning objects used with Flink into.
|
protected Map<String,int[]> |
topicPartitionsMap
Partitions of each topic.
|
Constructor and Description |
---|
FlinkKafkaProducerBase(String defaultTopicId,
KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig,
FlinkKafkaPartitioner<IN> customPartitioner)
The main constructor for creating a FlinkKafkaProducer.
|
Modifier and Type | Method and Description |
---|---|
protected void |
checkErroneous() |
void |
close()
Tear-down method for the user code.
|
protected abstract void |
flush()
Flush pending records.
|
protected <K,V> org.apache.kafka.clients.producer.KafkaProducer<K,V> |
getKafkaProducer(Properties props)
Used for testing only.
|
protected static int[] |
getPartitionsByTopic(String topic,
org.apache.kafka.clients.producer.KafkaProducer<byte[],byte[]> producer) |
static Properties |
getPropertiesFromBrokerList(String brokerList) |
void |
initializeState(FunctionInitializationContext context)
This method is called when the parallel function instance is created during distributed
execution.
|
void |
invoke(IN next,
SinkFunction.Context context)
Called when new data arrives to the sink, and forwards it to Kafka.
|
protected long |
numPendingRecords() |
void |
open(Configuration configuration)
Initializes the connection to Kafka.
|
void |
setFlushOnCheckpoint(boolean flush)
If set to true, the Flink producer will wait for all outstanding messages in the Kafka
buffers to be acknowledged by the Kafka producer on a checkpoint.
|
void |
setLogFailuresOnly(boolean logFailuresOnly)
Defines whether the producer should fail on errors, or only log them.
|
void |
snapshotState(FunctionSnapshotContext ctx)
This method is called when a snapshot for a checkpoint is requested.
|
getIterationRuntimeContext, getRuntimeContext, setRuntimeContext
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
invoke
public static final String KEY_DISABLE_METRICS
protected final Properties producerConfig
protected final String defaultTopicId
protected final KeyedSerializationSchema<IN> schema
protected final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner
protected boolean logFailuresOnly
protected boolean flushOnCheckpoint
protected transient org.apache.kafka.clients.producer.KafkaProducer<byte[],byte[]> producer
protected transient org.apache.kafka.clients.producer.Callback callback
protected transient volatile Exception asyncException
protected final SerializableObject pendingRecordsLock
protected long pendingRecords
public FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner)
defaultTopicId
- The default topic to write data toserializationSchema
- A serializable serialization schema for turning user objects into
a kafka-consumable byte[] supporting key/value messagesproducerConfig
- Configuration properties for the KafkaProducer. 'bootstrap.servers.' is
the only required argument.customPartitioner
- A serializable partitioner for assigning messages to Kafka
partitions. Passing null will use Kafka's partitioner.public void setLogFailuresOnly(boolean logFailuresOnly)
logFailuresOnly
- The flag to indicate logging-only on exceptions.public void setFlushOnCheckpoint(boolean flush)
flush
- Flag indicating the flushing mode (true = flush on checkpoint)@VisibleForTesting protected <K,V> org.apache.kafka.clients.producer.KafkaProducer<K,V> getKafkaProducer(Properties props)
public void open(Configuration configuration) throws Exception
open
in interface RichFunction
open
in class AbstractRichFunction
configuration
- The configuration containing the parameters attached to the contract.Exception
- Implementations may forward exceptions, which are caught by the runtime.
When the runtime catches an exception, it aborts the task and lets the fail-over logic
decide whether to retry the task execution.Configuration
public void invoke(IN next, SinkFunction.Context context) throws Exception
invoke
in interface SinkFunction<IN>
next
- The incoming datacontext
- Additional context about the input record.Exception
- This method may throw exceptions. Throwing an exception will cause the
operation to fail and may trigger recovery.public void close() throws Exception
RichFunction
This method can be used for clean up work.
close
in interface RichFunction
close
in class AbstractRichFunction
Exception
- Implementations may forward exceptions, which are caught by the runtime.
When the runtime catches an exception, it aborts the task and lets the fail-over logic
decide whether to retry the task execution.protected abstract void flush()
public void initializeState(FunctionInitializationContext context) throws Exception
CheckpointedFunction
initializeState
in interface CheckpointedFunction
context
- the context for initializing the operatorException
- Thrown, if state could not be created ot restored.public void snapshotState(FunctionSnapshotContext ctx) throws Exception
CheckpointedFunction
FunctionInitializationContext
when the Function was initialized, or offered now by FunctionSnapshotContext
itself.snapshotState
in interface CheckpointedFunction
ctx
- the context for drawing a snapshot of the operatorException
- Thrown, if state could not be created ot restored.public static Properties getPropertiesFromBrokerList(String brokerList)
protected static int[] getPartitionsByTopic(String topic, org.apache.kafka.clients.producer.KafkaProducer<byte[],byte[]> producer)
@VisibleForTesting protected long numPendingRecords()
Copyright © 2014–2021 The Apache Software Foundation. All rights reserved.