Skip to content

Commit 20d975a

Browse files
authored
MINOR: Replace Map conversions with MetricsUtils.getTags for metric tags (#21948)
Replace HashMap with LinkedHashMap for metrics tag maps so that JMX MBean names always reflect a deterministic, insertion-order tag sequence. Also migrate Scala core module tag construction to use the existing MetricsUtils.getTags(...) helper for consistency. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Ken Huang <s7133700@gmail.com>, Chia-Yi Chiu <cychiu8@gmail.com>
1 parent c39e2af commit 20d975a

10 files changed

Lines changed: 33 additions & 28 deletions

File tree

core/src/main/scala/kafka/cluster/Partition.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
4141
import org.apache.kafka.server.log.remote.TopicPartitionLog
4242
import org.apache.kafka.server.log.remote.storage.RemoteLogManager
4343
import org.apache.kafka.storage.internals.log.{AppendOrigin, AsyncOffsetReader, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogManager, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, OffsetResultHolder, UnifiedLog, VerificationGuard}
44+
import org.apache.kafka.common.metrics.internals.MetricsUtils
4445
import org.apache.kafka.server.metrics.KafkaMetricsGroup
4546
import org.apache.kafka.server.partition.{AlterPartitionListener, AssignmentState, CommittedPartitionState, OngoingReassignmentState, PartitionListener, PartitionState, PendingExpandIsr, PendingPartitionChange, PendingShrinkIsr, SimpleAssignmentState}
4647
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, DelayedProduce, TopicPartitionOperationKey}
@@ -131,7 +132,7 @@ object Partition {
131132
}
132133

133134
def removeMetrics(topicPartition: TopicPartition): Unit = {
134-
val tags = Map("topic" -> topicPartition.topic, "partition" -> topicPartition.partition.toString).asJava
135+
val tags = MetricsUtils.getTags("topic", topicPartition.topic, "partition", topicPartition.partition.toString)
135136
metricsGroup.removeMetric("UnderReplicated", tags)
136137
metricsGroup.removeMetric("UnderMinIsr", tags)
137138
metricsGroup.removeMetric("InSyncReplicasCount", tags)
@@ -215,7 +216,7 @@ class Partition(val topicPartition: TopicPartition,
215216

216217
this.logIdent = s"[Partition $topicPartition broker=$localBrokerId] "
217218

218-
private val tags = Map("topic" -> topic, "partition" -> partitionId.toString).asJava
219+
private val tags = MetricsUtils.getTags("topic", topic, "partition", partitionId.toString)
219220

220221
metricsGroup.newGauge("UnderReplicated", () => if (isUnderReplicated) 1 else 0, tags)
221222
metricsGroup.newGauge("InSyncReplicasCount", () => if (isLeader) partitionState.isr.size else 0, tags)

core/src/main/scala/kafka/network/RequestChannel.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.apache.kafka.common.requests._
3232
import org.apache.kafka.common.utils.Time
3333
import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
3434
import org.apache.kafka.server.common.RequestLocal
35+
import org.apache.kafka.common.metrics.internals.MetricsUtils
3536
import org.apache.kafka.server.metrics.KafkaMetricsGroup
3637
import org.apache.kafka.network.{RequestConvertToJson, Session}
3738
import org.apache.kafka.server.config.AbstractKafkaConfig
@@ -367,12 +368,13 @@ class RequestChannel(val queueSize: Int,
367368
warn(s"Unexpected processor with processorId ${processor.id}")
368369

369370
metricsGroup.newGauge(ResponseQueueSizeMetric, () => processor.responseQueueSize,
370-
Map(ProcessorMetricTag -> processor.id.toString).asJava)
371+
MetricsUtils.getTags(ProcessorMetricTag, processor.id.toString))
371372
}
372373

373374
def removeProcessor(processorId: Int): Unit = {
374375
processors.remove(processorId)
375-
metricsGroup.removeMetric(ResponseQueueSizeMetric, Map(ProcessorMetricTag -> processorId.toString).asJava)
376+
metricsGroup.removeMetric(ResponseQueueSizeMetric,
377+
MetricsUtils.getTags(ProcessorMetricTag, processorId.toString))
376378
}
377379

378380
/** Send a request to be handled, potentially blocking until there is room in the queue for the request */

core/src/main/scala/kafka/network/SocketServer.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import org.apache.kafka.network.{ConnectionQuotaEntity, ConnectionThrottledExcep
4646
import org.apache.kafka.security.CredentialProvider
4747
import org.apache.kafka.server.{ApiVersionManager, ServerSocketFactory}
4848
import org.apache.kafka.server.config.QuotaConfig
49+
import org.apache.kafka.common.metrics.internals.MetricsUtils
4950
import org.apache.kafka.server.metrics.KafkaMetricsGroup
5051
import org.apache.kafka.server.network.ConnectionDisconnectListener
5152
import org.apache.kafka.server.quota.QuotaUtils
@@ -496,7 +497,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
496497
private val backwardCompatibilityMetricGroup = new KafkaMetricsGroup("kafka.network", "Acceptor")
497498
private val blockedPercentMeterMetricName = backwardCompatibilityMetricGroup.metricName(
498499
"AcceptorBlockedPercent",
499-
Map(ListenerMetricTag -> endPoint.listener).asJava)
500+
MetricsUtils.getTags(ListenerMetricTag, endPoint.listener))
500501
private val blockedPercentMeter = backwardCompatibilityMetricGroup.newMeter(blockedPercentMeterMetricName,"blocked time", TimeUnit.NANOSECONDS)
501502
private var currentProcessorIndex = 0
502503
private[network] val throttledSockets = new mutable.PriorityQueue[DelayedCloseSocket]()
@@ -839,7 +840,7 @@ private[kafka] class Processor(
839840
},
840841
// for compatibility, only add a networkProcessor tag to the Yammer Metrics alias (the equivalent Selector metric
841842
// also includes the listener name)
842-
Map(NetworkProcessorMetricTag -> id.toString).asJava
843+
MetricsUtils.getTags(NetworkProcessorMetricTag, id.toString)
843844
)
844845

845846
private val expiredConnectionsKilledCount = new CumulativeSum()
@@ -1195,7 +1196,8 @@ private[kafka] class Processor(
11951196
close(channel.id)
11961197
}
11971198
selector.close()
1198-
metricsGroup.removeMetric(IdlePercentMetricName, Map(NetworkProcessorMetricTag -> id.toString).asJava)
1199+
metricsGroup.removeMetric(IdlePercentMetricName,
1200+
MetricsUtils.getTags(NetworkProcessorMetricTag, id.toString))
11991201
}
12001202

12011203
// 'protected` to allow override for testing
@@ -1266,7 +1268,8 @@ private[kafka] class Processor(
12661268
Utils.swallow(this.logger.underlying, Level.ERROR, () => closeAll())
12671269
}
12681270
} finally {
1269-
metricsGroup.removeMetric("IdlePercent", Map("networkProcessor" -> id.toString).asJava)
1271+
metricsGroup.removeMetric("IdlePercent",
1272+
MetricsUtils.getTags("networkProcessor", id.toString))
12701273
metrics.removeMetric(expiredConnectionsKilledCountMetricName)
12711274
}
12721275
}
@@ -1686,7 +1689,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
16861689
val metricName = metrics.metricName(s"${throttlePrefix}connection-accept-throttle-time",
16871690
JSocketServer.METRICS_GROUP,
16881691
"Tracking average throttle-time, out of non-zero throttle times, per listener",
1689-
Map(ListenerMetricTag -> listener.value).asJava)
1692+
MetricsUtils.getTags(ListenerMetricTag, listener.value))
16901693
sensor.add(metricName, new Avg)
16911694
sensor
16921695
}

core/src/main/scala/kafka/server/AbstractFetcherManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@ package kafka.server
2020
import kafka.utils.Logging
2121
import org.apache.kafka.common.{TopicPartition, Uuid}
2222
import org.apache.kafka.common.utils.Utils
23+
import org.apache.kafka.common.metrics.internals.MetricsUtils
2324
import org.apache.kafka.server.metrics.KafkaMetricsGroup
2425
import org.apache.kafka.server.network.BrokerEndPoint
2526
import org.apache.kafka.server.PartitionFetchState
2627

2728
import scala.collection.{Map, Set, mutable}
28-
import scala.jdk.CollectionConverters._
2929
import scala.jdk.OptionConverters._
3030

3131
abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: String, clientId: String, numFetchers: Int)
@@ -43,7 +43,7 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri
4343
val failedPartitions = new FailedPartitions
4444
this.logIdent = "[" + name + "] "
4545

46-
private val tags = Map("clientId" -> clientId).asJava
46+
private val tags = MetricsUtils.getTags("clientId", clientId)
4747

4848
metricsGroup.newGauge("MaxLag", () => {
4949
// current max lag across all fetchers/topics/partitions

core/src/main/scala/kafka/server/AbstractFetcherThread.scala

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.apache.kafka.common.{ClientIdAndBroker, InvalidRecordException, Topic
3232
import org.apache.kafka.server.common.OffsetAndEpoch
3333
import org.apache.kafka.server.{LeaderEndPoint, PartitionFetchState, ReplicaState, ResultWithPartitions}
3434
import org.apache.kafka.server.log.remote.storage.RetriableRemoteStorageException
35+
import org.apache.kafka.common.metrics.internals.MetricsUtils
3536
import org.apache.kafka.server.metrics.KafkaMetricsGroup
3637
import org.apache.kafka.server.util.{LockUtils, ShutdownableThread}
3738
import org.apache.kafka.storage.internals.log.LogAppendInfo
@@ -930,10 +931,7 @@ class FetcherLagMetrics(metricId: ClientIdTopicPartition) {
930931
private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName)
931932

932933
private[this] val lagVal = new AtomicLong(-1L)
933-
private[this] val tags = Map(
934-
"clientId" -> metricId.clientId,
935-
"topic" -> metricId.topicPartition.topic,
936-
"partition" -> metricId.topicPartition.partition.toString).asJava
934+
private[this] val tags = MetricsUtils.getTags("clientId", metricId.clientId, "topic", metricId.topicPartition.topic, "partition", metricId.topicPartition.partition.toString)
937935

938936
metricsGroup.newGauge(FetcherMetrics.ConsumerLag, () => lagVal.get, tags)
939937

@@ -971,9 +969,7 @@ class FetcherStats(metricId: ClientIdAndBroker) {
971969
private val metricsClassName = "FetcherStats"
972970
private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName)
973971

974-
val tags: util.Map[String, String] = Map("clientId" -> metricId.clientId,
975-
"brokerHost" -> metricId.brokerHost,
976-
"brokerPort" -> metricId.brokerPort.toString).asJava
972+
val tags: util.Map[String, String] = MetricsUtils.getTags("clientId", metricId.clientId, "brokerHost", metricId.brokerHost, "brokerPort", metricId.brokerPort.toString)
977973

978974
val requestRate: Meter = metricsGroup.newMeter(FetcherMetrics.RequestsPerSec, "requests", TimeUnit.SECONDS, tags)
979975

core/src/main/scala/kafka/server/BrokerBlockingSender.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,9 @@ import org.apache.kafka.common.utils.{LogContext, Time}
2626
import org.apache.kafka.clients.{ApiVersions, ClientResponse, ManualMetadataUpdater, NetworkClient}
2727
import org.apache.kafka.common.{Node, Reconfigurable}
2828
import org.apache.kafka.common.requests.AbstractRequest.Builder
29+
import org.apache.kafka.common.metrics.internals.MetricsUtils
2930
import org.apache.kafka.server.network.BrokerEndPoint
3031

31-
import scala.jdk.CollectionConverters._
32-
3332
trait BlockingSend {
3433

3534
def brokerEndPoint(): BrokerEndPoint
@@ -74,7 +73,7 @@ class BrokerBlockingSender(sourceBroker: BrokerEndPoint,
7473
metrics,
7574
time,
7675
"replica-fetcher",
77-
Map("broker-id" -> sourceBroker.id.toString, "fetcher-id" -> fetcherId.toString).asJava,
76+
MetricsUtils.getTags("broker-id", sourceBroker.id.toString, "fetcher-id", fetcherId.toString),
7877
false,
7978
channelBuilder,
8079
logContext

core/src/main/scala/kafka/server/DelayedFetch.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.kafka.common.TopicIdPartition
2525
import org.apache.kafka.common.errors._
2626
import org.apache.kafka.common.protocol.Errors
2727
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
28+
import org.apache.kafka.common.metrics.internals.MetricsUtils
2829
import org.apache.kafka.server.metrics.KafkaMetricsGroup
2930
import org.apache.kafka.server.purgatory.DelayedOperation
3031
import org.apache.kafka.server.quota.ReplicaQuota
@@ -183,7 +184,7 @@ object DelayedFetchMetrics {
183184
private val metricsClassName = "DelayedFetchMetrics"
184185
private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName)
185186
private val FetcherTypeKey = "fetcherType"
186-
val followerExpiredRequestMeter: Meter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "follower").asJava)
187-
val consumerExpiredRequestMeter: Meter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "consumer").asJava)
187+
val followerExpiredRequestMeter: Meter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, MetricsUtils.getTags(FetcherTypeKey, "follower"))
188+
val consumerExpiredRequestMeter: Meter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, MetricsUtils.getTags(FetcherTypeKey, "consumer"))
188189
}
189190

server/src/main/java/org/apache/kafka/server/purgatory/DelayedProduce.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.kafka.common.TopicIdPartition;
2020
import org.apache.kafka.common.TopicPartition;
21+
import org.apache.kafka.common.metrics.internals.MetricsUtils;
2122
import org.apache.kafka.common.protocol.Errors;
2223
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse;
2324
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
@@ -200,14 +201,14 @@ public static void recordExpiration(TopicPartition partition) {
200201
key -> METRICS_GROUP.newMeter("ExpiresPerSec",
201202
"requests",
202203
TimeUnit.SECONDS,
203-
Map.of("topic", key.topic(), "partition", String.valueOf(key.partition())))
204+
MetricsUtils.getTags("topic", key.topic(), "partition", String.valueOf(key.partition())))
204205
).mark();
205206
}
206207

207208
public static void removePartitionMetrics(TopicPartition partition) {
208209
if (PARTITION_EXPIRATION_METERS.remove(partition) != null) {
209210
METRICS_GROUP.removeMetric("ExpiresPerSec",
210-
Map.of("topic", partition.topic(),
211+
MetricsUtils.getTags("topic", partition.topic(),
211212
"partition", String.valueOf(partition.partition())));
212213
}
213214
}

storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsets.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.apache.kafka.common.TopicPartition;
2020
import org.apache.kafka.common.errors.ApiException;
2121
import org.apache.kafka.common.message.ListOffsetsResponseData;
22+
import org.apache.kafka.common.metrics.internals.MetricsUtils;
2223
import org.apache.kafka.common.protocol.Errors;
2324
import org.apache.kafka.common.record.internal.FileRecords;
2425
import org.apache.kafka.common.requests.ListOffsetsResponse;
@@ -186,13 +187,13 @@ private static void recordExpiration(TopicPartition partition) {
186187
PARTITION_EXPIRATION_METERS.computeIfAbsent(partition, tp -> METRICS_GROUP.newMeter("ExpiresPerSec",
187188
"requests",
188189
TimeUnit.SECONDS,
189-
Map.of("topic", tp.topic(), "partition", String.valueOf(tp.partition())))).mark();
190+
MetricsUtils.getTags("topic", tp.topic(), "partition", String.valueOf(tp.partition())))).mark();
190191
}
191192

192193
public static void removePartitionMetrics(TopicPartition partition) {
193194
if (PARTITION_EXPIRATION_METERS.remove(partition) != null) {
194195
METRICS_GROUP.removeMetric("ExpiresPerSec",
195-
Map.of("topic", partition.topic(), "partition", String.valueOf(partition.partition())));
196+
MetricsUtils.getTags("topic", partition.topic(), "partition", String.valueOf(partition.partition())));
196197
}
197198
}
198199
}

storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.kafka.common.TopicPartition;
2222
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
2323
import org.apache.kafka.common.message.ListOffsetsResponseData;
24+
import org.apache.kafka.common.metrics.internals.MetricsUtils;
2425
import org.apache.kafka.common.protocol.Errors;
2526
import org.apache.kafka.common.record.internal.FileRecords;
2627
import org.apache.kafka.common.requests.ListOffsetsResponse;
@@ -277,7 +278,7 @@ public void testRemovePartitionMetrics() {
277278
DelayedRemoteListOffsets.PARTITION_EXPIRATION_METERS.computeIfAbsent(partition, tp ->
278279
DelayedRemoteListOffsets.METRICS_GROUP.newMeter("ExpiresPerSec",
279280
"requests", TimeUnit.SECONDS,
280-
Map.of("topic", tp.topic(), "partition", String.valueOf(tp.partition()))));
281+
MetricsUtils.getTags("topic", tp.topic(), "partition", String.valueOf(tp.partition()))));
281282

282283
// Verify the partition metric exists in the map
283284
assertTrue(DelayedRemoteListOffsets.PARTITION_EXPIRATION_METERS.containsKey(partition),

0 commit comments

Comments
 (0)