Skip to content

Commit c39e2af

Browse files
authored
KAFKA-20408 Remove ConsumerMetrics wrapper (#21974)
https://issues.apache.org/jira/browse/KAFKA-20408 ConsumerMetrics which is redundant was referenced in ClassicKafkaConsumer and AsyncKafkaConsumer Updated those referenced classes, tests and build.gradle. Tried running the script `./gradlew genConsumerMetricsDocs` and generated consumer_metrics.html looks good. Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
1 parent 1ca017a commit c39e2af

5 files changed

Lines changed: 14 additions & 56 deletions

File tree

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1232,7 +1232,7 @@ project(':core') {
12321232

12331233
task genConsumerMetricsDocs(type: JavaExec) {
12341234
classpath = sourceSets.test.runtimeClasspath
1235-
mainClass = 'org.apache.kafka.clients.consumer.internals.ConsumerMetrics'
1235+
mainClass = 'org.apache.kafka.clients.consumer.internals.FetchMetricsRegistry'
12361236
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
12371237
standardOutput = new File(generatedDocsDir, "consumer_metrics.html").newOutputStream()
12381238
}

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@
148148
import static org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED;
149149
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
150150
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
151+
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
151152
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
152153
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED;
153154
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredConsumerInterceptors;
@@ -680,8 +681,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
680681
this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer, metrics);
681682
this.clientTelemetryReporter = Optional.empty();
682683

683-
ConsumerMetrics metricsRegistry = new ConsumerMetrics();
684-
this.fetchMetricsManager = new FetchMetricsManager(metrics, metricsRegistry.fetcherMetrics);
684+
FetchMetricsRegistry fetchMetricsRegistry = new FetchMetricsRegistry(CONSUMER_METRIC_GROUP_PREFIX);
685+
this.fetchMetricsManager = new FetchMetricsManager(metrics, fetchMetricsRegistry);
685686
this.fetchCollector = new FetchCollector<>(logContext,
686687
metadata,
687688
subscriptions,

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -378,8 +378,8 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
378378
int maxPollRecords = config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
379379
boolean checkCrcs = config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG);
380380

381-
ConsumerMetrics metricsRegistry = new ConsumerMetrics();
382-
this.fetchMetricsManager = new FetchMetricsManager(metrics, metricsRegistry.fetcherMetrics);
381+
FetchMetricsRegistry fetchMetricsRegistry = new FetchMetricsRegistry(CONSUMER_METRIC_GROUP_PREFIX);
382+
this.fetchMetricsManager = new FetchMetricsManager(metrics, fetchMetricsRegistry);
383383
ApiVersions apiVersions = new ApiVersions();
384384
FetchConfig fetchConfig = new FetchConfig(
385385
minBytes,

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetrics.java

Lines changed: 0 additions & 51 deletions
This file was deleted.

clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsRegistry.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.kafka.clients.consumer.internals;
1818

1919
import org.apache.kafka.common.MetricNameTemplate;
20+
import org.apache.kafka.common.metrics.Metrics;
2021

2122
import java.util.Arrays;
2223
import java.util.HashSet;
@@ -148,6 +149,13 @@ public FetchMetricsRegistry(Set<String> tags, String metricGrpPrefix) {
148149
"The current read replica for the partition, or -1 if reading from leader. " + DEPRECATED_TOPIC_METRICS_MESSAGE, partitionTags);
149150
}
150151

152+
public static void main(String[] args) {
153+
Set<String> tags = new HashSet<>();
154+
tags.add("client-id");
155+
FetchMetricsRegistry metrics = new FetchMetricsRegistry(tags, "consumer");
156+
System.out.println(Metrics.toHtmlTable("kafka.consumer", metrics.getAllTemplates()));
157+
}
158+
151159
public List<MetricNameTemplate> getAllTemplates() {
152160
return Arrays.asList(
153161
fetchSizeAvg,

0 commit comments

Comments
 (0)