@PublicEvolving public class KafkaSourceReaderMetrics extends Object
KafkaSourceReader
.
All metrics of Kafka source reader are registered under group "KafkaSourceReader", which is a
child group of OperatorMetricGroup
. Metrics related to a
specific topic partition will be registered in the group
"KafkaSourceReader.topic.{topic_name}.partition.{partition_id}".
For example, current consuming offset of topic "my-topic" and partition 1 will be reported in metric: "{some_parent_groups}.operator.KafkaSourceReader.topic.my-topic.partition.1.currentOffset"
and number of successful commits will be reported in metric: "{some_parent_groups}.operator.KafkaSourceReader.commitsSucceeded"
All metrics of Kafka consumer are also registered under group "KafkaSourceReader.KafkaConsumer". For example, Kafka consumer metric "records-consumed-total" can be found at: {some_parent_groups}.operator.KafkaSourceReader.KafkaConsumer.records-consumed-total"
Modifier and Type | Field and Description |
---|---|
static String |
BYTES_CONSUMED_TOTAL |
static String |
COMMITS_FAILED_METRIC_COUNTER |
static String |
COMMITS_SUCCEEDED_METRIC_COUNTER |
static String |
COMMITTED_OFFSET_METRIC_GAUGE |
static String |
CONSUMER_FETCH_MANAGER_GROUP |
static String |
CURRENT_OFFSET_METRIC_GAUGE |
static long |
INITIAL_OFFSET |
static String |
KAFKA_CONSUMER_METRIC_GROUP |
static String |
KAFKA_SOURCE_READER_METRIC_GROUP |
static String |
PARTITION_GROUP |
static String |
RECORDS_LAG |
static String |
TOPIC_GROUP |
Constructor and Description |
---|
KafkaSourceReaderMetrics(SourceReaderMetricGroup sourceReaderMetricGroup) |
Modifier and Type | Method and Description |
---|---|
void |
maybeAddRecordsLagMetric(org.apache.kafka.clients.consumer.KafkaConsumer<?,?> consumer,
org.apache.kafka.common.TopicPartition tp)
Add a partition's records-lag metric to tracking list if this partition never appears before.
|
void |
recordCommittedOffset(org.apache.kafka.common.TopicPartition tp,
long offset)
Update the latest committed offset of the given
TopicPartition . |
void |
recordCurrentOffset(org.apache.kafka.common.TopicPartition tp,
long offset)
Update current consuming offset of the given
TopicPartition . |
void |
recordFailedCommit()
Mark a failure commit.
|
void |
recordSucceededCommit()
Mark a successful commit.
|
void |
registerKafkaConsumerMetrics(org.apache.kafka.clients.consumer.KafkaConsumer<?,?> kafkaConsumer)
Register metrics of KafkaConsumer in Kafka metric group.
|
void |
registerNumBytesIn(org.apache.kafka.clients.consumer.KafkaConsumer<?,?> consumer)
Register
MetricNames.IO_NUM_BYTES_IN . |
void |
registerTopicPartition(org.apache.kafka.common.TopicPartition tp)
Register metric groups for the given
TopicPartition . |
void |
removeRecordsLagMetric(org.apache.kafka.common.TopicPartition tp)
Remove a partition's records-lag metric from tracking list.
|
void |
updateNumBytesInCounter()
Update
MetricNames.IO_NUM_BYTES_IN . |
public static final String KAFKA_SOURCE_READER_METRIC_GROUP
public static final String TOPIC_GROUP
public static final String PARTITION_GROUP
public static final String CURRENT_OFFSET_METRIC_GAUGE
public static final String COMMITTED_OFFSET_METRIC_GAUGE
public static final String COMMITS_SUCCEEDED_METRIC_COUNTER
public static final String COMMITS_FAILED_METRIC_COUNTER
public static final String KAFKA_CONSUMER_METRIC_GROUP
public static final String CONSUMER_FETCH_MANAGER_GROUP
public static final String BYTES_CONSUMED_TOTAL
public static final String RECORDS_LAG
public static final long INITIAL_OFFSET
public KafkaSourceReaderMetrics(SourceReaderMetricGroup sourceReaderMetricGroup)
public void registerKafkaConsumerMetrics(org.apache.kafka.clients.consumer.KafkaConsumer<?,?> kafkaConsumer)
kafkaConsumer
- Kafka consumer used by partition split reader.public void registerTopicPartition(org.apache.kafka.common.TopicPartition tp)
TopicPartition
.tp
- Registering topic partitionpublic void recordCurrentOffset(org.apache.kafka.common.TopicPartition tp, long offset)
TopicPartition
.tp
- Updating topic partitionoffset
- Current consuming offsetpublic void recordCommittedOffset(org.apache.kafka.common.TopicPartition tp, long offset)
TopicPartition
.tp
- Updating topic partitionoffset
- Committing offsetpublic void recordSucceededCommit()
public void recordFailedCommit()
public void registerNumBytesIn(org.apache.kafka.clients.consumer.KafkaConsumer<?,?> consumer)
MetricNames.IO_NUM_BYTES_IN
.consumer
- Kafka consumerpublic void maybeAddRecordsLagMetric(org.apache.kafka.clients.consumer.KafkaConsumer<?,?> consumer, org.apache.kafka.common.TopicPartition tp)
This method also lazily register MetricNames.PENDING_RECORDS
in SourceReaderMetricGroup
consumer
- Kafka consumertp
- Topic partitionpublic void removeRecordsLagMetric(org.apache.kafka.common.TopicPartition tp)
tp
- Unassigned topic partitionpublic void updateNumBytesInCounter()
MetricNames.IO_NUM_BYTES_IN
.
Instead of simply setting OperatorIOMetricGroup.getNumBytesInCounter()
to the same
value as bytes-consumed-total from Kafka consumer, which will screw TaskIOMetricGroup.getNumBytesInCounter()
if chained sources exist, we track the increment of
bytes-consumed-total and count it towards the counter.
Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.