From 35511575f398e5335361e6f16a4d4adf01b4c011 Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Sat, 4 Apr 2026 11:38:22 +0800 Subject: [PATCH 1/4] KAFKA-19566: Deprecate ClientQuotaCallback#updateClusterMetadata --- .../apache/kafka/server/quota/CustomQuotaCallbackTest.java | 1 + .../org/apache/kafka/server/quota/ClientQuotaCallback.java | 4 +++- .../scala/integration/kafka/api/CustomQuotaCallbackTest.scala | 1 + .../test/scala/unit/kafka/server/ClientQuotaManagerTest.scala | 1 + .../metadata/publisher/DynamicTopicClusterQuotaPublisher.java | 1 + .../org/apache/kafka/server/quota/ClientQuotaManager.java | 1 + .../test/java/org/apache/kafka/server/KRaftClusterTest.java | 1 + 7 files changed, 9 insertions(+), 1 deletion(-) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/server/quota/CustomQuotaCallbackTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/server/quota/CustomQuotaCallbackTest.java index de0948981b095..95169356a5945 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/server/quota/CustomQuotaCallbackTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/server/quota/CustomQuotaCallbackTest.java @@ -149,6 +149,7 @@ public boolean quotaResetRequired(ClientQuotaType quotaType) { return true; } + @SuppressWarnings("removal") @Override public boolean updateClusterMetadata(Cluster cluster) { COUNTERS.computeIfAbsent(nodeId, k -> new AtomicInteger()).incrementAndGet(); diff --git a/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java b/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java index 01a8181d86100..a56f8e6fb3fcd 100644 --- a/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java +++ b/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java @@ -96,14 +96,16 @@ public interface ClientQuotaCallback extends Configurable { boolean quotaResetRequired(ClientQuotaType quotaType); /** - * This callback is invoked whenever there are changes in the cluster metadata, such as + * This callback is invoked whenever there are changes in the cluster metadata, such as * brokers being added or removed, topics being created or deleted, or partition leadership updates. * This is useful if quota computation takes partitions into account. * Topics that are being deleted will not be included in `cluster`. * + * @deprecated since 4.4 and should not be used any longer. * @param cluster Cluster metadata including partitions and their leaders if known * @return true if quotas have changed and metric configs may need to be updated */ + @Deprecated(since = "4.4", forRemoval = true) boolean updateClusterMetadata(Cluster cluster); /** diff --git a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala index e1bd97c93b044..9f2b7457a8c48 100644 --- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala +++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala @@ -452,6 +452,7 @@ class GroupedUserQuotaCallback extends ClientQuotaCallback with Reconfigurable w if (group != null) quotaOrDefault(group, quotaType) else null } + @SuppressWarnings(Array("removal")) override def updateClusterMetadata(cluster: Cluster): Boolean = { val topicsByGroup = cluster.topics.asScala.groupBy(group) diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala index 142c00441eb3f..2424ee49cf758 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala @@ -571,6 +571,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { override def quotaMetricTags(quotaType: ClientQuotaType, principal: KafkaPrincipal, clientId: String): Map[String, String] = Collections.emptyMap() override def quotaLimit(quotaType: ClientQuotaType, metricTags: Map[String, String]): java.lang.Double = 1 + @SuppressWarnings(Array("removal")) override def updateClusterMetadata(cluster: Cluster): Boolean = false override def updateQuota(quotaType: ClientQuotaType, entity: ClientQuotaEntity, newValue: Double): Unit = { diff --git a/metadata/src/main/java/org/apache/kafka/metadata/publisher/DynamicTopicClusterQuotaPublisher.java b/metadata/src/main/java/org/apache/kafka/metadata/publisher/DynamicTopicClusterQuotaPublisher.java index 1ffc8f6078326..2f5f7b32d6adc 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/publisher/DynamicTopicClusterQuotaPublisher.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/publisher/DynamicTopicClusterQuotaPublisher.java @@ -57,6 +57,7 @@ public String name() { return "DynamicTopicClusterQuotaPublisher " + nodeType + " id=" + nodeId; } + @SuppressWarnings("removal") @Override public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest) { try { diff --git a/server/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java b/server/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java index ca61593e4858f..753b88e759f50 100644 --- a/server/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java +++ b/server/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java @@ -796,6 +796,7 @@ private Quota findClientQuota(ClientIdEntity clientIdEntity) { return overriddenQuotas.get(DEFAULT_CLIENT_ID_QUOTA_ENTITY); } + @SuppressWarnings("removal") @Override public boolean updateClusterMetadata(Cluster cluster) { // The default quota callback does not use any cluster metadata diff --git a/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java b/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java index 02bf5d13655a8..9ade519e9dd35 100644 --- a/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java +++ b/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java @@ -1779,6 +1779,7 @@ public boolean quotaResetRequired(ClientQuotaType quotaType) { return true; } + @SuppressWarnings("removal") @Override public boolean updateClusterMetadata(Cluster cluster) { return false; From ed2b1801238518a2d02f113b9897ca23b6f99fb7 Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Sat, 4 Apr 2026 14:56:45 +0800 Subject: [PATCH 2/4] Provide default implementation --- .../org/apache/kafka/server/quota/ClientQuotaCallback.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java b/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java index a56f8e6fb3fcd..dbb4641f03fb9 100644 --- a/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java +++ b/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java @@ -106,7 +106,9 @@ public interface ClientQuotaCallback extends Configurable { * @return true if quotas have changed and metric configs may need to be updated */ @Deprecated(since = "4.4", forRemoval = true) - boolean updateClusterMetadata(Cluster cluster); + default boolean updateClusterMetadata(Cluster cluster) { + return false; + } /** * Closes this instance. From 9ed532707f60fe932b2920dadff864f4ca591678 Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Sat, 4 Apr 2026 15:39:32 +0800 Subject: [PATCH 3/4] remove redundant default implementation --- .../scala/unit/kafka/server/ClientQuotaManagerTest.scala | 3 --- .../org/apache/kafka/server/quota/ClientQuotaManager.java | 8 -------- .../java/org/apache/kafka/server/KRaftClusterTest.java | 7 ------- 3 files changed, 18 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala index 2424ee49cf758..f0d86ba9fdff8 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala @@ -16,7 +16,6 @@ */ package kafka.server -import org.apache.kafka.common.Cluster import java.net.InetAddress import org.apache.kafka.common.internals.Plugin import org.apache.kafka.common.metrics.Quota @@ -571,8 +570,6 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { override def quotaMetricTags(quotaType: ClientQuotaType, principal: KafkaPrincipal, clientId: String): Map[String, String] = Collections.emptyMap() override def quotaLimit(quotaType: ClientQuotaType, metricTags: Map[String, String]): java.lang.Double = 1 - @SuppressWarnings(Array("removal")) - override def updateClusterMetadata(cluster: Cluster): Boolean = false override def updateQuota(quotaType: ClientQuotaType, entity: ClientQuotaEntity, newValue: Double): Unit = { quotas.put(entity.asInstanceOf[ClientQuotaManager.KafkaQuotaEntity], new Quota(newValue.toLong, true)) diff --git a/server/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java b/server/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java index 753b88e759f50..a755003f26f59 100644 --- a/server/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java +++ b/server/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.server.quota; -import org.apache.kafka.common.Cluster; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.internals.Plugin; import org.apache.kafka.common.metrics.MetricConfig; @@ -796,13 +795,6 @@ private Quota findClientQuota(ClientIdEntity clientIdEntity) { return overriddenQuotas.get(DEFAULT_CLIENT_ID_QUOTA_ENTITY); } - @SuppressWarnings("removal") - @Override - public boolean updateClusterMetadata(Cluster cluster) { - // The default quota callback does not use any cluster metadata - return false; - } - @Override public void updateQuota(ClientQuotaType quotaType, ClientQuotaEntity entity, double newValue) { KafkaQuotaEntity quotaEntity = (KafkaQuotaEntity) entity; diff --git a/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java b/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java index 9ade519e9dd35..cca9562d8cc83 100644 --- a/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java +++ b/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java @@ -34,7 +34,6 @@ import org.apache.kafka.clients.admin.QuorumInfo; import org.apache.kafka.clients.admin.SupportedVersionRange; import org.apache.kafka.clients.admin.TopicDescription; -import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Endpoint; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; @@ -1779,12 +1778,6 @@ public boolean quotaResetRequired(ClientQuotaType quotaType) { return true; } - @SuppressWarnings("removal") - @Override - public boolean updateClusterMetadata(Cluster cluster) { - return false; - } - @Override public void close() { } From c5752ee5c525598b87dd2f4855995bb07d4d11f3 Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Sat, 4 Apr 2026 20:13:31 +0800 Subject: [PATCH 4/4] add doc --- docs/getting-started/upgrade.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/docs/getting-started/upgrade.md b/docs/getting-started/upgrade.md index 05721adc35619..f4b5a634e0203 100644 --- a/docs/getting-started/upgrade.md +++ b/docs/getting-started/upgrade.md @@ -26,6 +26,14 @@ type: docs --> +## Upgrading to 4.4.0 + +### Upgrading Servers to 4.4.0 from any version 3.3.x through 4.3.0 + +### Notable changes in 4.4.0 + + * The `ClientQuotaCallback#updateClusterMetadata` method is deprecated and will be removed in Kafka 5.0. Custom implementations of `ClientQuotaCallback` no longer need to override this method, as a default no-op implementation is now provided. For further details, please refer to [KIP-1200](https://cwiki.apache.org/confluence/x/axBJFg). + ## Upgrading to 4.3.0 ### Upgrading Servers to 4.3.0 from any version 3.3.x through 4.2.0