@PublicEvolving public class FlinkKafkaProducer011<IN> extends TwoPhaseCommitSinkFunction<IN,FlinkKafkaProducer011.KafkaTransactionState,FlinkKafkaProducer011.KafkaTransactionContext>
FlinkKafkaProducer011.Semantic.AT_LEAST_ONCE
semantic. Before using FlinkKafkaProducer011.Semantic.EXACTLY_ONCE
please refer to Flink's Kafka connector documentation.Modifier and Type | Class and Description |
---|---|
static class |
FlinkKafkaProducer011.ContextStateSerializer
|
static class |
FlinkKafkaProducer011.KafkaTransactionContext
Context associated to this instance of the
FlinkKafkaProducer011 . |
static class |
FlinkKafkaProducer011.KafkaTransactionState
State for handling transactions.
|
static class |
FlinkKafkaProducer011.NextTransactionalIdHint
Keep information required to deduce next safe to use transactional id.
|
static class |
FlinkKafkaProducer011.NextTransactionalIdHintSerializer
|
static class |
FlinkKafkaProducer011.Semantic
Semantics that can be chosen.
|
static class |
FlinkKafkaProducer011.TransactionStateSerializer
|
TwoPhaseCommitSinkFunction.State<TXN,CONTEXT>, TwoPhaseCommitSinkFunction.StateSerializer<TXN,CONTEXT>, TwoPhaseCommitSinkFunction.StateSerializerConfigSnapshot<TXN,CONTEXT>, TwoPhaseCommitSinkFunction.StateSerializerSnapshot<TXN,CONTEXT>, TwoPhaseCommitSinkFunction.TransactionHolder<TXN>
SinkFunction.Context<T>
Modifier and Type | Field and Description |
---|---|
static int |
DEFAULT_KAFKA_PRODUCERS_POOL_SIZE
Default number of KafkaProducers in the pool.
|
static Time |
DEFAULT_KAFKA_TRANSACTION_TIMEOUT
Default value for kafka transaction timeout.
|
static String |
KEY_DISABLE_METRICS
Configuration key for disabling the metrics reporting.
|
static int |
SAFE_SCALE_DOWN_FACTOR
This coefficient determines what is the safe scale down factor.
|
pendingCommitTransactions, state, userContext
Constructor and Description |
---|
FlinkKafkaProducer011(String topicId,
KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig)
Creates a FlinkKafkaProducer for a given topic.
|
FlinkKafkaProducer011(String topicId,
KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig,
FlinkKafkaProducer011.Semantic semantic)
Creates a FlinkKafkaProducer for a given topic.
|
FlinkKafkaProducer011(String defaultTopicId,
KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig,
Optional<FlinkKafkaPartitioner<IN>> customPartitioner)
Creates a FlinkKafkaProducer for a given topic.
|
FlinkKafkaProducer011(String defaultTopicId,
KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig,
Optional<FlinkKafkaPartitioner<IN>> customPartitioner,
FlinkKafkaProducer011.Semantic semantic,
int kafkaProducersPoolSize)
Creates a FlinkKafkaProducer for a given topic.
|
FlinkKafkaProducer011(String topicId,
SerializationSchema<IN> serializationSchema,
Properties producerConfig)
Creates a FlinkKafkaProducer for a given topic.
|
FlinkKafkaProducer011(String topicId,
SerializationSchema<IN> serializationSchema,
Properties producerConfig,
Optional<FlinkKafkaPartitioner<IN>> customPartitioner)
Creates a FlinkKafkaProducer for a given topic.
|
FlinkKafkaProducer011(String brokerList,
String topicId,
KeyedSerializationSchema<IN> serializationSchema)
Creates a FlinkKafkaProducer for a given topic.
|
FlinkKafkaProducer011(String brokerList,
String topicId,
SerializationSchema<IN> serializationSchema)
Creates a FlinkKafkaProducer for a given topic.
|
Modifier and Type | Method and Description |
---|---|
protected void |
abort(FlinkKafkaProducer011.KafkaTransactionState transaction)
Abort a transaction.
|
protected FlinkKafkaProducer011.KafkaTransactionState |
beginTransaction()
Method that starts a new transaction.
|
void |
close()
Tear-down method for the user code.
|
protected void |
commit(FlinkKafkaProducer011.KafkaTransactionState transaction)
Commit a pre-committed transaction.
|
protected void |
finishRecoveringContext(Collection<FlinkKafkaProducer011.KafkaTransactionState> handledTransactions)
Callback for subclasses which is called after restoring (each) user context.
|
FlinkKafkaProducer011<IN> |
ignoreFailuresAfterTransactionTimeout()
Disables the propagation of exceptions thrown when committing presumably timed out Kafka
transactions during recovery of the job.
|
void |
initializeState(FunctionInitializationContext context)
This method is called when the parallel function instance is created during distributed
execution.
|
protected Optional<FlinkKafkaProducer011.KafkaTransactionContext> |
initializeUserContext() |
void |
invoke(FlinkKafkaProducer011.KafkaTransactionState transaction,
IN next,
SinkFunction.Context context)
Write value within a transaction.
|
void |
open(Configuration configuration)
Initializes the connection to Kafka.
|
protected void |
preCommit(FlinkKafkaProducer011.KafkaTransactionState transaction)
Pre commit previously created transaction.
|
protected void |
recoverAndAbort(FlinkKafkaProducer011.KafkaTransactionState transaction)
Abort a transaction that was rejected by a coordinator after a failure.
|
protected void |
recoverAndCommit(FlinkKafkaProducer011.KafkaTransactionState transaction)
Invoked on recovered transactions after a failure.
|
void |
setLogFailuresOnly(boolean logFailuresOnly)
Defines whether the producer should fail on errors, or only log them.
|
void |
setWriteTimestampToKafka(boolean writeTimestampToKafka)
If set to true, Flink will write the (event time) timestamp attached to each record into
Kafka.
|
void |
snapshotState(FunctionSnapshotContext context)
This method is called when a snapshot for a checkpoint is requested.
|
currentTransaction, enableTransactionTimeoutWarnings, getUserContext, invoke, invoke, notifyCheckpointAborted, notifyCheckpointComplete, pendingTransactions, setTransactionTimeout
getIterationRuntimeContext, getRuntimeContext, setRuntimeContext
public static final int SAFE_SCALE_DOWN_FACTOR
If the Flink application previously failed before first checkpoint completed or we are
starting new batch of FlinkKafkaProducer011
from scratch without clean shutdown of
the previous one, FlinkKafkaProducer011
doesn't know what was the set of previously
used Kafka's transactionalId's. In that case, it will try to play safe and abort all of the
possible transactionalIds from the range of: [0, getNumberOfParallelSubtasks() *
kafkaProducersPoolSize * SAFE_SCALE_DOWN_FACTOR)
The range of available to use transactional ids is: [0,
getNumberOfParallelSubtasks() * kafkaProducersPoolSize)
This means that if we decrease getNumberOfParallelSubtasks()
by a factor larger
than SAFE_SCALE_DOWN_FACTOR
we can have a left some lingering transaction.
public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE
FlinkKafkaProducer011.Semantic.EXACTLY_ONCE
.public static final Time DEFAULT_KAFKA_TRANSACTION_TIMEOUT
public static final String KEY_DISABLE_METRICS
public FlinkKafkaProducer011(String brokerList, String topicId, SerializationSchema<IN> serializationSchema)
brokerList
- Comma separated addresses of the brokerstopicId
- ID of the Kafka topic.serializationSchema
- User defined (keyless) serialization schema.public FlinkKafkaProducer011(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig)
Using this constructor, the default FlinkFixedPartitioner
will be used as the
partitioner. This default partitioner maps each sink subtask to a single Kafka partition
(i.e. all records received by a sink subtask will end up in the same Kafka partition).
To use a custom partitioner, please use FlinkKafkaProducer011(String,
SerializationSchema, Properties, Optional)
instead.
topicId
- ID of the Kafka topic.serializationSchema
- User defined key-less serialization schema.producerConfig
- Properties with the producer configuration.public FlinkKafkaProducer011(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner)
SerializationSchema
and possibly a custom FlinkKafkaPartitioner
.
Since a key-less SerializationSchema
is used, all records sent to Kafka will not
have an attached key. Therefore, if a partitioner is also not provided, records will be
distributed to Kafka partitions in a round-robin fashion.
topicId
- The topic to write data toserializationSchema
- A key-less serializable serialization schema for turning user
objects into a kafka-consumable byte[]producerConfig
- Configuration properties for the KafkaProducer. 'bootstrap.servers.' is
the only required argument.customPartitioner
- A serializable partitioner for assigning messages to Kafka
partitions. If a partitioner is not provided, records will be distributed to Kafka
partitions in a round-robin fashion.public FlinkKafkaProducer011(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema)
Using this constructor, the default FlinkFixedPartitioner
will be used as the
partitioner. This default partitioner maps each sink subtask to a single Kafka partition
(i.e. all records received by a sink subtask will end up in the same Kafka partition).
To use a custom partitioner, please use FlinkKafkaProducer011(String,
KeyedSerializationSchema, Properties, Optional)
instead.
brokerList
- Comma separated addresses of the brokerstopicId
- ID of the Kafka topic.serializationSchema
- User defined serialization schema supporting key/value messagespublic FlinkKafkaProducer011(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig)
Using this constructor, the default FlinkFixedPartitioner
will be used as the
partitioner. This default partitioner maps each sink subtask to a single Kafka partition
(i.e. all records received by a sink subtask will end up in the same Kafka partition).
To use a custom partitioner, please use FlinkKafkaProducer011(String,
KeyedSerializationSchema, Properties, Optional)
instead.
topicId
- ID of the Kafka topic.serializationSchema
- User defined serialization schema supporting key/value messagesproducerConfig
- Properties with the producer configuration.public FlinkKafkaProducer011(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaProducer011.Semantic semantic)
Using this constructor, the default FlinkFixedPartitioner
will be used as the
partitioner. This default partitioner maps each sink subtask to a single Kafka partition
(i.e. all records received by a sink subtask will end up in the same Kafka partition).
To use a custom partitioner, please use FlinkKafkaProducer011(String,
KeyedSerializationSchema, Properties, Optional, Semantic, int)
instead.
topicId
- ID of the Kafka topic.serializationSchema
- User defined serialization schema supporting key/value messagesproducerConfig
- Properties with the producer configuration.semantic
- Defines semantic that will be used by this producer (see FlinkKafkaProducer011.Semantic
).public FlinkKafkaProducer011(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner)
KeyedSerializationSchema
and possibly a custom FlinkKafkaPartitioner
.
If a partitioner is not provided, written records will be partitioned by the attached key
of each record (as determined by KeyedSerializationSchema.serializeKey(Object)
). If
written records do not have a key (i.e., KeyedSerializationSchema.serializeKey(Object)
returns null
), they will be
distributed to Kafka partitions in a round-robin fashion.
defaultTopicId
- The default topic to write data toserializationSchema
- A serializable serialization schema for turning user objects into
a kafka-consumable byte[] supporting key/value messagesproducerConfig
- Configuration properties for the KafkaProducer. 'bootstrap.servers.' is
the only required argument.customPartitioner
- A serializable partitioner for assigning messages to Kafka
partitions. If a partitioner is not provided, records will be partitioned by the key of
each record (determined by KeyedSerializationSchema.serializeKey(Object)
). If the
keys are null
, then records will be distributed to Kafka partitions in a
round-robin fashion.public FlinkKafkaProducer011(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner, FlinkKafkaProducer011.Semantic semantic, int kafkaProducersPoolSize)
KeyedSerializationSchema
and possibly a custom FlinkKafkaPartitioner
.
If a partitioner is not provided, written records will be partitioned by the attached key
of each record (as determined by KeyedSerializationSchema.serializeKey(Object)
). If
written records do not have a key (i.e., KeyedSerializationSchema.serializeKey(Object)
returns null
), they will be
distributed to Kafka partitions in a round-robin fashion.
defaultTopicId
- The default topic to write data toserializationSchema
- A serializable serialization schema for turning user objects into
a kafka-consumable byte[] supporting key/value messagesproducerConfig
- Configuration properties for the KafkaProducer. 'bootstrap.servers.' is
the only required argument.customPartitioner
- A serializable partitioner for assigning messages to Kafka
partitions. If a partitioner is not provided, records will be partitioned by the key of
each record (determined by KeyedSerializationSchema.serializeKey(Object)
). If the
keys are null
, then records will be distributed to Kafka partitions in a
round-robin fashion.semantic
- Defines semantic that will be used by this producer (see FlinkKafkaProducer011.Semantic
).kafkaProducersPoolSize
- Overwrite default KafkaProducers pool size (see FlinkKafkaProducer011.Semantic.EXACTLY_ONCE
).public void setWriteTimestampToKafka(boolean writeTimestampToKafka)
writeTimestampToKafka
- Flag indicating if Flink's internal timestamps are written to
Kafka.public void setLogFailuresOnly(boolean logFailuresOnly)
logFailuresOnly
- The flag to indicate logging-only on exceptions.public FlinkKafkaProducer011<IN> ignoreFailuresAfterTransactionTimeout()
Note that we use System.currentTimeMillis()
to track the age of a transaction.
Moreover, only exceptions thrown during the recovery are caught, i.e., the producer will
attempt at least one commit of the transaction before giving up.
public void open(Configuration configuration) throws Exception
open
in interface RichFunction
open
in class AbstractRichFunction
configuration
- The configuration containing the parameters attached to the contract.Exception
- Implementations may forward exceptions, which are caught by the runtime.
When the runtime catches an exception, it aborts the task and lets the fail-over logic
decide whether to retry the task execution.Configuration
public void invoke(FlinkKafkaProducer011.KafkaTransactionState transaction, IN next, SinkFunction.Context context) throws FlinkKafka011Exception
TwoPhaseCommitSinkFunction
invoke
in class TwoPhaseCommitSinkFunction<IN,FlinkKafkaProducer011.KafkaTransactionState,FlinkKafkaProducer011.KafkaTransactionContext>
FlinkKafka011Exception
public void close() throws FlinkKafka011Exception
RichFunction
This method can be used for clean up work.
close
in interface RichFunction
close
in class TwoPhaseCommitSinkFunction<IN,FlinkKafkaProducer011.KafkaTransactionState,FlinkKafkaProducer011.KafkaTransactionContext>
FlinkKafka011Exception
protected FlinkKafkaProducer011.KafkaTransactionState beginTransaction() throws FlinkKafka011Exception
TwoPhaseCommitSinkFunction
beginTransaction
in class TwoPhaseCommitSinkFunction<IN,FlinkKafkaProducer011.KafkaTransactionState,FlinkKafkaProducer011.KafkaTransactionContext>
FlinkKafka011Exception
protected void preCommit(FlinkKafkaProducer011.KafkaTransactionState transaction) throws FlinkKafka011Exception
TwoPhaseCommitSinkFunction
Usually implementation involves flushing the data.
preCommit
in class TwoPhaseCommitSinkFunction<IN,FlinkKafkaProducer011.KafkaTransactionState,FlinkKafkaProducer011.KafkaTransactionContext>
FlinkKafka011Exception
protected void commit(FlinkKafkaProducer011.KafkaTransactionState transaction)
TwoPhaseCommitSinkFunction
TwoPhaseCommitSinkFunction.recoverAndCommit(Object)
will be called again for the
same transaction.protected void recoverAndCommit(FlinkKafkaProducer011.KafkaTransactionState transaction)
TwoPhaseCommitSinkFunction
protected void abort(FlinkKafkaProducer011.KafkaTransactionState transaction)
TwoPhaseCommitSinkFunction
protected void recoverAndAbort(FlinkKafkaProducer011.KafkaTransactionState transaction)
TwoPhaseCommitSinkFunction
public void snapshotState(FunctionSnapshotContext context) throws Exception
CheckpointedFunction
FunctionInitializationContext
when the Function was initialized, or offered now by FunctionSnapshotContext
itself.snapshotState
in interface CheckpointedFunction
snapshotState
in class TwoPhaseCommitSinkFunction<IN,FlinkKafkaProducer011.KafkaTransactionState,FlinkKafkaProducer011.KafkaTransactionContext>
context
- the context for drawing a snapshot of the operatorException
- Thrown, if state could not be created ot restored.public void initializeState(FunctionInitializationContext context) throws Exception
CheckpointedFunction
initializeState
in interface CheckpointedFunction
initializeState
in class TwoPhaseCommitSinkFunction<IN,FlinkKafkaProducer011.KafkaTransactionState,FlinkKafkaProducer011.KafkaTransactionContext>
context
- the context for initializing the operatorException
- Thrown, if state could not be created ot restored.protected Optional<FlinkKafkaProducer011.KafkaTransactionContext> initializeUserContext()
protected void finishRecoveringContext(Collection<FlinkKafkaProducer011.KafkaTransactionState> handledTransactions)
TwoPhaseCommitSinkFunction
finishRecoveringContext
in class TwoPhaseCommitSinkFunction<IN,FlinkKafkaProducer011.KafkaTransactionState,FlinkKafkaProducer011.KafkaTransactionContext>
handledTransactions
- transactions which were already committed or aborted and do not
need further handlingCopyright © 2014–2021 The Apache Software Foundation. All rights reserved.