diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java index da3ccdcdadb..9ab5eb651be 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java @@ -511,8 +511,10 @@ public void changeInvisibilityDuration(long popTime, long invisibleTime, long ch // No need to generate new records when the group does not exist, // because these retry messages will not be consumed by anyone. - if (brokerConfig.isPopReviveSkipIfGroupAbsent() && - !brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(groupId)) { + boolean skipWrite = brokerConfig.isPopReviveSkipIfGroupAbsent() && + !brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(groupId); + + if (skipWrite) { log.info("PopConsumerService change invisibility skip, time={}, " + "groupId={}, topicId={}, queueId={}, offset={}", popTime, groupId, topicId, queueId, offset); } else { @@ -525,7 +527,12 @@ public void changeInvisibilityDuration(long popTime, long invisibleTime, long ch } } - this.popConsumerStore.deleteRecords(Collections.singletonList(ackRecord)); + // If the new CK has the same key as the old CK (same visibilityTimeout), + // the write already overwrites the old record in RocksDB, skip delete + // to avoid removing the newly written record. + if (skipWrite || ckRecord.getVisibilityTimeout() != ackRecord.getVisibilityTimeout()) { + this.popConsumerStore.deleteRecords(Collections.singletonList(ackRecord)); + } } // Use broker escape bridge to support remote read