diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java index 1821c801cbc..1f661a04c61 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.broker.config.v2; +import com.google.common.base.Strings; import io.netty.buffer.ByteBuf; import io.netty.util.internal.PlatformDependent; import java.nio.ByteBuffer; @@ -425,4 +426,29 @@ public long queryPullOffset(String group, String topic, int queueId) { } return -1; } + + @Override + public void assignResetOffset(String topic, String group, int queueId, long offset) { + if (Strings.isNullOrEmpty(topic) || Strings.isNullOrEmpty(group) || queueId < 0 || offset < 0) { + LOG.warn("Illegal arguments when assigning reset offset. Topic={}, group={}, queueId={}, offset={}", + topic, group, queueId, offset); + return; + } + if (!MixAll.isLmq(topic) || !MixAll.isLmq(group)) { + super.assignResetOffset(topic, group, queueId, offset); + } else { + String key = topic + TOPIC_GROUP_SEPARATOR + group; + ConcurrentMap map = resetOffsetTable.get(key); + if (null == map) { + map = new ConcurrentHashMap<>(); + ConcurrentMap previous = resetOffsetTable.putIfAbsent(key, map); + if (null != previous) { + map = previous; + } + } + map.put(queueId, offset); + } + + this.commitOffset(null, topic, group, queueId, offset); + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index eafb47a89da..5eca1460885 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -50,7 +50,7 @@ public class ConsumerOffsetManager extends ConfigManager { protected ConcurrentMap> offsetTable = new ConcurrentHashMap<>(512); - private final ConcurrentMap> resetOffsetTable = + protected final ConcurrentMap> resetOffsetTable = new ConcurrentHashMap<>(512); private final ConcurrentMap> pullOffsetTable = diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java index 53e9e2e0634..a565ad07c3a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java @@ -20,7 +20,9 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import com.google.common.base.Strings; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.MixAll; @@ -132,4 +134,30 @@ public void removeOffset(String group) { } } } + + @Override + public void assignResetOffset(String topic, String group, int queueId, long offset) { + if (Strings.isNullOrEmpty(topic) || Strings.isNullOrEmpty(group) || queueId < 0 || offset < 0) { + LOG.warn("Illegal arguments when assigning reset offset. Topic={}, group={}, queueId={}, offset={}", + topic, group, queueId, offset); + return; + } + if (!MixAll.isLmq(topic) || !MixAll.isLmq(group)) { + super.assignResetOffset(topic, group, queueId, offset); + return; + } + + String key = topic + TOPIC_GROUP_SEPARATOR + group; + ConcurrentMap map = resetOffsetTable.get(key); + if (null == map) { + map = new ConcurrentHashMap<>(); + ConcurrentMap previous = resetOffsetTable.putIfAbsent(key, map); + if (null != previous) { + map = previous; + } + } + map.put(queueId, offset); + + lmqOffsetTable.computeIfPresent(key, (k, oldValue) -> offset); + } }