Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
import org.apache.kafka.server.log.remote.TopicPartitionLog
import org.apache.kafka.server.log.remote.storage.RemoteLogManager
import org.apache.kafka.storage.internals.log.{AppendOrigin, AsyncOffsetReader, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogManager, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, OffsetResultHolder, UnifiedLog, VerificationGuard}
import org.apache.kafka.common.metrics.internals.MetricsUtils
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.partition.{AlterPartitionListener, AssignmentState, CommittedPartitionState, OngoingReassignmentState, PartitionListener, PartitionState, PendingExpandIsr, PendingPartitionChange, PendingShrinkIsr, SimpleAssignmentState}
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, DelayedProduce, TopicPartitionOperationKey}
Expand Down Expand Up @@ -131,7 +132,7 @@ object Partition {
}

def removeMetrics(topicPartition: TopicPartition): Unit = {
val tags = Map("topic" -> topicPartition.topic, "partition" -> topicPartition.partition.toString).asJava
val tags = MetricsUtils.getTags("topic", topicPartition.topic, "partition", topicPartition.partition.toString)
metricsGroup.removeMetric("UnderReplicated", tags)
metricsGroup.removeMetric("UnderMinIsr", tags)
metricsGroup.removeMetric("InSyncReplicasCount", tags)
Expand Down Expand Up @@ -215,7 +216,7 @@ class Partition(val topicPartition: TopicPartition,

this.logIdent = s"[Partition $topicPartition broker=$localBrokerId] "

private val tags = Map("topic" -> topic, "partition" -> partitionId.toString).asJava
private val tags = MetricsUtils.getTags("topic", topic, "partition", partitionId.toString)

metricsGroup.newGauge("UnderReplicated", () => if (isUnderReplicated) 1 else 0, tags)
metricsGroup.newGauge("InSyncReplicasCount", () => if (isLeader) partitionState.isr.size else 0, tags)
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/kafka/network/RequestChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.common.metrics.internals.MetricsUtils
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.network.{RequestConvertToJson, Session}
import org.apache.kafka.server.config.AbstractKafkaConfig
Expand Down Expand Up @@ -367,12 +368,13 @@ class RequestChannel(val queueSize: Int,
warn(s"Unexpected processor with processorId ${processor.id}")

metricsGroup.newGauge(ResponseQueueSizeMetric, () => processor.responseQueueSize,
Map(ProcessorMetricTag -> processor.id.toString).asJava)
MetricsUtils.getTags(ProcessorMetricTag, processor.id.toString))
}

def removeProcessor(processorId: Int): Unit = {
processors.remove(processorId)
metricsGroup.removeMetric(ResponseQueueSizeMetric, Map(ProcessorMetricTag -> processorId.toString).asJava)
metricsGroup.removeMetric(ResponseQueueSizeMetric,
MetricsUtils.getTags(ProcessorMetricTag, processorId.toString))
}

/** Send a request to be handled, potentially blocking until there is room in the queue for the request */
Expand Down
13 changes: 8 additions & 5 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.apache.kafka.network.{ConnectionQuotaEntity, ConnectionThrottledExcep
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.{ApiVersionManager, ServerSocketFactory}
import org.apache.kafka.server.config.QuotaConfig
import org.apache.kafka.common.metrics.internals.MetricsUtils
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.network.ConnectionDisconnectListener
import org.apache.kafka.server.quota.QuotaUtils
Expand Down Expand Up @@ -496,7 +497,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
private val backwardCompatibilityMetricGroup = new KafkaMetricsGroup("kafka.network", "Acceptor")
private val blockedPercentMeterMetricName = backwardCompatibilityMetricGroup.metricName(
"AcceptorBlockedPercent",
Map(ListenerMetricTag -> endPoint.listener).asJava)
MetricsUtils.getTags(ListenerMetricTag, endPoint.listener))
private val blockedPercentMeter = backwardCompatibilityMetricGroup.newMeter(blockedPercentMeterMetricName,"blocked time", TimeUnit.NANOSECONDS)
private var currentProcessorIndex = 0
private[network] val throttledSockets = new mutable.PriorityQueue[DelayedCloseSocket]()
Expand Down Expand Up @@ -839,7 +840,7 @@ private[kafka] class Processor(
},
// for compatibility, only add a networkProcessor tag to the Yammer Metrics alias (the equivalent Selector metric
// also includes the listener name)
Map(NetworkProcessorMetricTag -> id.toString).asJava
MetricsUtils.getTags(NetworkProcessorMetricTag, id.toString)
)

private val expiredConnectionsKilledCount = new CumulativeSum()
Expand Down Expand Up @@ -1195,7 +1196,8 @@ private[kafka] class Processor(
close(channel.id)
}
selector.close()
metricsGroup.removeMetric(IdlePercentMetricName, Map(NetworkProcessorMetricTag -> id.toString).asJava)
metricsGroup.removeMetric(IdlePercentMetricName,
MetricsUtils.getTags(NetworkProcessorMetricTag, id.toString))
}

// 'protected` to allow override for testing
Expand Down Expand Up @@ -1266,7 +1268,8 @@ private[kafka] class Processor(
Utils.swallow(this.logger.underlying, Level.ERROR, () => closeAll())
}
} finally {
metricsGroup.removeMetric("IdlePercent", Map("networkProcessor" -> id.toString).asJava)
metricsGroup.removeMetric("IdlePercent",
MetricsUtils.getTags("networkProcessor", id.toString))
metrics.removeMetric(expiredConnectionsKilledCountMetricName)
}
}
Expand Down Expand Up @@ -1686,7 +1689,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
val metricName = metrics.metricName(s"${throttlePrefix}connection-accept-throttle-time",
JSocketServer.METRICS_GROUP,
"Tracking average throttle-time, out of non-zero throttle times, per listener",
Map(ListenerMetricTag -> listener.value).asJava)
MetricsUtils.getTags(ListenerMetricTag, listener.value))
sensor.add(metricName, new Avg)
sensor
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/AbstractFetcherManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ package kafka.server
import kafka.utils.Logging
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.metrics.internals.MetricsUtils
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.PartitionFetchState

import scala.collection.{Map, Set, mutable}
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters._

abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: String, clientId: String, numFetchers: Int)
Expand All @@ -43,7 +43,7 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri
val failedPartitions = new FailedPartitions
this.logIdent = "[" + name + "] "

private val tags = Map("clientId" -> clientId).asJava
private val tags = MetricsUtils.getTags("clientId", clientId)

metricsGroup.newGauge("MaxLag", () => {
// current max lag across all fetchers/topics/partitions
Expand Down
10 changes: 3 additions & 7 deletions core/src/main/scala/kafka/server/AbstractFetcherThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.kafka.common.{ClientIdAndBroker, InvalidRecordException, Topic
import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.server.{LeaderEndPoint, PartitionFetchState, ReplicaState, ResultWithPartitions}
import org.apache.kafka.server.log.remote.storage.RetriableRemoteStorageException
import org.apache.kafka.common.metrics.internals.MetricsUtils
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.{LockUtils, ShutdownableThread}
import org.apache.kafka.storage.internals.log.LogAppendInfo
Expand Down Expand Up @@ -930,10 +931,7 @@ class FetcherLagMetrics(metricId: ClientIdTopicPartition) {
private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName)

private[this] val lagVal = new AtomicLong(-1L)
private[this] val tags = Map(
"clientId" -> metricId.clientId,
"topic" -> metricId.topicPartition.topic,
"partition" -> metricId.topicPartition.partition.toString).asJava
private[this] val tags = MetricsUtils.getTags("clientId", metricId.clientId, "topic", metricId.topicPartition.topic, "partition", metricId.topicPartition.partition.toString)

metricsGroup.newGauge(FetcherMetrics.ConsumerLag, () => lagVal.get, tags)

Expand Down Expand Up @@ -971,9 +969,7 @@ class FetcherStats(metricId: ClientIdAndBroker) {
private val metricsClassName = "FetcherStats"
private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName)

val tags: util.Map[String, String] = Map("clientId" -> metricId.clientId,
"brokerHost" -> metricId.brokerHost,
"brokerPort" -> metricId.brokerPort.toString).asJava
val tags: util.Map[String, String] = MetricsUtils.getTags("clientId", metricId.clientId, "brokerHost", metricId.brokerHost, "brokerPort", metricId.brokerPort.toString)

val requestRate: Meter = metricsGroup.newMeter(FetcherMetrics.RequestsPerSec, "requests", TimeUnit.SECONDS, tags)

Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/kafka/server/BrokerBlockingSender.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@ import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.clients.{ApiVersions, ClientResponse, ManualMetadataUpdater, NetworkClient}
import org.apache.kafka.common.{Node, Reconfigurable}
import org.apache.kafka.common.requests.AbstractRequest.Builder
import org.apache.kafka.common.metrics.internals.MetricsUtils
import org.apache.kafka.server.network.BrokerEndPoint

import scala.jdk.CollectionConverters._

trait BlockingSend {

def brokerEndPoint(): BrokerEndPoint
Expand Down Expand Up @@ -74,7 +73,7 @@ class BrokerBlockingSender(sourceBroker: BrokerEndPoint,
metrics,
time,
"replica-fetcher",
Map("broker-id" -> sourceBroker.id.toString, "fetcher-id" -> fetcherId.toString).asJava,
MetricsUtils.getTags("broker-id", sourceBroker.id.toString, "fetcher-id", fetcherId.toString),
false,
channelBuilder,
logContext
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/server/DelayedFetch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.kafka.common.TopicIdPartition
import org.apache.kafka.common.errors._
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.metrics.internals.MetricsUtils
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.purgatory.DelayedOperation
import org.apache.kafka.server.quota.ReplicaQuota
Expand Down Expand Up @@ -183,7 +184,7 @@ object DelayedFetchMetrics {
private val metricsClassName = "DelayedFetchMetrics"
private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName)
private val FetcherTypeKey = "fetcherType"
val followerExpiredRequestMeter: Meter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "follower").asJava)
val consumerExpiredRequestMeter: Meter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "consumer").asJava)
val followerExpiredRequestMeter: Meter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, MetricsUtils.getTags(FetcherTypeKey, "follower"))
val consumerExpiredRequestMeter: Meter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, MetricsUtils.getTags(FetcherTypeKey, "consumer"))
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.internals.MetricsUtils;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
Expand Down Expand Up @@ -200,14 +201,14 @@ public static void recordExpiration(TopicPartition partition) {
key -> METRICS_GROUP.newMeter("ExpiresPerSec",
"requests",
TimeUnit.SECONDS,
Map.of("topic", key.topic(), "partition", String.valueOf(key.partition())))
MetricsUtils.getTags("topic", key.topic(), "partition", String.valueOf(key.partition())))
).mark();
}

public static void removePartitionMetrics(TopicPartition partition) {
if (PARTITION_EXPIRATION_METERS.remove(partition) != null) {
METRICS_GROUP.removeMetric("ExpiresPerSec",
Map.of("topic", partition.topic(),
MetricsUtils.getTags("topic", partition.topic(),
"partition", String.valueOf(partition.partition())));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.kafka.common.metrics.internals.MetricsUtils;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.internal.FileRecords;
import org.apache.kafka.common.requests.ListOffsetsResponse;
Expand Down Expand Up @@ -186,13 +187,13 @@ private static void recordExpiration(TopicPartition partition) {
PARTITION_EXPIRATION_METERS.computeIfAbsent(partition, tp -> METRICS_GROUP.newMeter("ExpiresPerSec",
"requests",
TimeUnit.SECONDS,
Map.of("topic", tp.topic(), "partition", String.valueOf(tp.partition())))).mark();
MetricsUtils.getTags("topic", tp.topic(), "partition", String.valueOf(tp.partition())))).mark();
}

public static void removePartitionMetrics(TopicPartition partition) {
if (PARTITION_EXPIRATION_METERS.remove(partition) != null) {
METRICS_GROUP.removeMetric("ExpiresPerSec",
Map.of("topic", partition.topic(), "partition", String.valueOf(partition.partition())));
MetricsUtils.getTags("topic", partition.topic(), "partition", String.valueOf(partition.partition())));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.kafka.common.metrics.internals.MetricsUtils;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.internal.FileRecords;
import org.apache.kafka.common.requests.ListOffsetsResponse;
Expand Down Expand Up @@ -277,7 +278,7 @@ public void testRemovePartitionMetrics() {
DelayedRemoteListOffsets.PARTITION_EXPIRATION_METERS.computeIfAbsent(partition, tp ->
DelayedRemoteListOffsets.METRICS_GROUP.newMeter("ExpiresPerSec",
"requests", TimeUnit.SECONDS,
Map.of("topic", tp.topic(), "partition", String.valueOf(tp.partition()))));
MetricsUtils.getTags("topic", tp.topic(), "partition", String.valueOf(tp.partition()))));

// Verify the partition metric exists in the map
assertTrue(DelayedRemoteListOffsets.PARTITION_EXPIRATION_METERS.containsKey(partition),
Expand Down
Loading