From 5cc2a86742e7abbd79afb2a5fbf88fa1a7f1bf0c Mon Sep 17 00:00:00 2001 From: hqbfzwang Date: Thu, 3 Apr 2025 09:56:37 +0800 Subject: [PATCH 1/7] feat: support clients to reset lmq consumption offset --- .../config/v2/ConsumerOffsetManagerV2.java | 27 +++++++++++++++++ .../broker/offset/ConsumerOffsetManager.java | 2 +- .../offset/LmqConsumerOffsetManager.java | 30 +++++++++++++++++++ 3 files changed, 58 insertions(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java index 1821c801cbc..6b722d039bb 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.broker.config.v2; +import com.google.common.base.Strings; import io.netty.buffer.ByteBuf; import io.netty.util.internal.PlatformDependent; import java.nio.ByteBuffer; @@ -425,4 +426,30 @@ public long queryPullOffset(String group, String topic, int queueId) { } return -1; } + + @Override + public void assignResetOffset(String topic, String group, int queueId, long offset) { + if (Strings.isNullOrEmpty(topic) || Strings.isNullOrEmpty(group) || queueId < 0 || offset < 0) { + LOG.warn("Illegal arguments when assigning reset offset. Topic={}, group={}, queueId={}, offset={}", + topic, group, queueId, offset); + return; + } + if (!MixAll.isLmq(group) || !MixAll.isLmq(topic)) { + super.assignResetOffset(topic, group, queueId, offset); + return; + } + + String key = topic + TOPIC_GROUP_SEPARATOR + group; + ConcurrentMap map = resetOffsetTable.get(key); + if (null == map) { + map = new ConcurrentHashMap<>(); + ConcurrentMap previous = resetOffsetTable.putIfAbsent(key, map); + if (null != previous) { + map = previous; + } + } + map.put(queueId, offset); + + this.commitOffset(null, topic, group, queueId, offset); + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index eafb47a89da..5eca1460885 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -50,7 +50,7 @@ public class ConsumerOffsetManager extends ConfigManager { protected ConcurrentMap> offsetTable = new ConcurrentHashMap<>(512); - private final ConcurrentMap> resetOffsetTable = + protected final ConcurrentMap> resetOffsetTable = new ConcurrentHashMap<>(512); private final ConcurrentMap> pullOffsetTable = diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java index 53e9e2e0634..6a87d7c3bfd 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java @@ -20,7 +20,9 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import com.google.common.base.Strings; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.MixAll; @@ -132,4 +134,32 @@ public void removeOffset(String group) { } } } + + @Override + public void assignResetOffset(String topic, String group, int queueId, long offset) { + if (Strings.isNullOrEmpty(topic) || Strings.isNullOrEmpty(group) || queueId < 0 || offset < 0) { + LOG.warn("Illegal arguments when assigning reset offset. Topic={}, group={}, queueId={}, offset={}", + topic, group, queueId, offset); + return; + } + if (!MixAll.isLmq(topic) || !MixAll.isLmq(group)) { + super.assignResetOffset(topic, group, queueId, offset); + return; + } + + String key = topic + TOPIC_GROUP_SEPARATOR + group; + ConcurrentMap map = resetOffsetTable.get(key); + if (null == map) { + map = new ConcurrentHashMap<>(); + ConcurrentMap previous = resetOffsetTable.putIfAbsent(key, map); + if (null != previous) { + map = previous; + } + } + map.put(queueId, offset); + + if (lmqOffsetTable.get(key) != null) { + lmqOffsetTable.put(key, offset); + } + } } From f9ec1debe3eb4e43cb5415b5e8b42ee1a369c1c8 Mon Sep 17 00:00:00 2001 From: hqbfzwang Date: Tue, 8 Apr 2025 11:38:58 +0800 Subject: [PATCH 2/7] fix --- .../config/v2/ConsumerOffsetManagerV2.java | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java index 6b722d039bb..02c5a79df02 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java @@ -436,19 +436,18 @@ public void assignResetOffset(String topic, String group, int queueId, long offs } if (!MixAll.isLmq(group) || !MixAll.isLmq(topic)) { super.assignResetOffset(topic, group, queueId, offset); - return; - } - - String key = topic + TOPIC_GROUP_SEPARATOR + group; - ConcurrentMap map = resetOffsetTable.get(key); - if (null == map) { - map = new ConcurrentHashMap<>(); - ConcurrentMap previous = resetOffsetTable.putIfAbsent(key, map); - if (null != previous) { - map = previous; + } else { + String key = topic + TOPIC_GROUP_SEPARATOR + group; + ConcurrentMap map = resetOffsetTable.get(key); + if (null == map) { + map = new ConcurrentHashMap<>(); + ConcurrentMap previous = resetOffsetTable.putIfAbsent(key, map); + if (null != previous) { + map = previous; + } } + map.put(queueId, offset); } - map.put(queueId, offset); this.commitOffset(null, topic, group, queueId, offset); } From c3e1e54d78161582c61f5811c0b5b8bd202a0f8e Mon Sep 17 00:00:00 2001 From: hqbfzwang Date: Fri, 25 Apr 2025 16:46:03 +0800 Subject: [PATCH 3/7] fix --- .../rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java | 2 +- .../apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java index 02c5a79df02..528dcc89ba3 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java @@ -434,7 +434,7 @@ public void assignResetOffset(String topic, String group, int queueId, long offs topic, group, queueId, offset); return; } - if (!MixAll.isLmq(group) || !MixAll.isLmq(topic)) { + if (!MixAll.isLmq(topic)) { super.assignResetOffset(topic, group, queueId, offset); } else { String key = topic + TOPIC_GROUP_SEPARATOR + group; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java index 6a87d7c3bfd..90c390dd4c4 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java @@ -142,7 +142,7 @@ public void assignResetOffset(String topic, String group, int queueId, long offs topic, group, queueId, offset); return; } - if (!MixAll.isLmq(topic) || !MixAll.isLmq(group)) { + if (!MixAll.isLmq(topic)) { super.assignResetOffset(topic, group, queueId, offset); return; } From d3131b79a074159ad947e783c6e70dcddaf03bbb Mon Sep 17 00:00:00 2001 From: hqbfzwang Date: Fri, 25 Apr 2025 17:16:54 +0800 Subject: [PATCH 4/7] fix --- .../rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java | 2 +- .../apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java index 528dcc89ba3..1f661a04c61 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java @@ -434,7 +434,7 @@ public void assignResetOffset(String topic, String group, int queueId, long offs topic, group, queueId, offset); return; } - if (!MixAll.isLmq(topic)) { + if (!MixAll.isLmq(topic) || !MixAll.isLmq(group)) { super.assignResetOffset(topic, group, queueId, offset); } else { String key = topic + TOPIC_GROUP_SEPARATOR + group; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java index 90c390dd4c4..6a87d7c3bfd 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java @@ -142,7 +142,7 @@ public void assignResetOffset(String topic, String group, int queueId, long offs topic, group, queueId, offset); return; } - if (!MixAll.isLmq(topic)) { + if (!MixAll.isLmq(topic) || !MixAll.isLmq(group)) { super.assignResetOffset(topic, group, queueId, offset); return; } From 4dac12210d811d081bb79bcba34a4a1a354dffcc Mon Sep 17 00:00:00 2001 From: hqbfzwang Date: Tue, 8 Jul 2025 19:11:39 +0800 Subject: [PATCH 5/7] fix: clean pull offset in #removeOffset --- .../rocketmq/broker/offset/LmqConsumerOffsetManager.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java index 6a87d7c3bfd..97322a85f65 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java @@ -158,8 +158,6 @@ public void assignResetOffset(String topic, String group, int queueId, long offs } map.put(queueId, offset); - if (lmqOffsetTable.get(key) != null) { - lmqOffsetTable.put(key, offset); - } + lmqOffsetTable.put(key, offset); } } From 16b6261610f33bc72ff2ba6cb453fdb42a00e48d Mon Sep 17 00:00:00 2001 From: hqbfzwang Date: Tue, 8 Jul 2025 19:22:32 +0800 Subject: [PATCH 6/7] fix: clean pull offset in #removeOffset --- .../apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java index 97322a85f65..a565ad07c3a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java @@ -158,6 +158,6 @@ public void assignResetOffset(String topic, String group, int queueId, long offs } map.put(queueId, offset); - lmqOffsetTable.put(key, offset); + lmqOffsetTable.computeIfPresent(key, (k, oldValue) -> offset); } } From c32306ddca854da8f8de3fbb3da7ec9e1a10597c Mon Sep 17 00:00:00 2001 From: hqbfzwang Date: Tue, 8 Jul 2025 22:38:29 +0800 Subject: [PATCH 7/7] rerun test