From 8001d6a51ddea9d0c4041f1492400588836a2706 Mon Sep 17 00:00:00 2001 From: qianye Date: Tue, 28 Apr 2026 13:54:39 +0800 Subject: [PATCH 1/4] [ISSUE #10276] Fix PopConsumerService changeInvisibilityDuration losing CK record when visibilityTimeout collision --- .../rocketmq/broker/pop/PopConsumerService.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java index da3ccdcdadb..ca312a294c2 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java @@ -511,8 +511,10 @@ public void changeInvisibilityDuration(long popTime, long invisibleTime, long ch // No need to generate new records when the group does not exist, // because these retry messages will not be consumed by anyone. - if (brokerConfig.isPopReviveSkipIfGroupAbsent() && - !brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(groupId)) { + boolean skipWrite = brokerConfig.isPopReviveSkipIfGroupAbsent() && + !brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(groupId); + + if (skipWrite) { log.info("PopConsumerService change invisibility skip, time={}, " + "groupId={}, topicId={}, queueId={}, offset={}", popTime, groupId, topicId, queueId, offset); } else { @@ -525,6 +527,13 @@ public void changeInvisibilityDuration(long popTime, long invisibleTime, long ch } } + // If the new CK has the same key as the old CK (same visibilityTimeout), + // the write already overwrites the old record in RocksDB, skip delete + // to avoid removing the newly written record. + if (!skipWrite && ckRecord.getVisibilityTimeout() == ackRecord.getVisibilityTimeout()) { + return; + } + this.popConsumerStore.deleteRecords(Collections.singletonList(ackRecord)); } From 6152fc3d6840a196d62b42d7d9ac659e2b2791cc Mon Sep 17 00:00:00 2001 From: qianye Date: Tue, 28 Apr 2026 14:19:26 +0800 Subject: [PATCH 2/4] support batch change --- .../org/apache/rocketmq/broker/pop/PopConsumerService.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java index ca312a294c2..9ab5eb651be 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java @@ -530,11 +530,9 @@ public void changeInvisibilityDuration(long popTime, long invisibleTime, long ch // If the new CK has the same key as the old CK (same visibilityTimeout), // the write already overwrites the old record in RocksDB, skip delete // to avoid removing the newly written record. - if (!skipWrite && ckRecord.getVisibilityTimeout() == ackRecord.getVisibilityTimeout()) { - return; + if (skipWrite || ckRecord.getVisibilityTimeout() != ackRecord.getVisibilityTimeout()) { + this.popConsumerStore.deleteRecords(Collections.singletonList(ackRecord)); } - - this.popConsumerStore.deleteRecords(Collections.singletonList(ackRecord)); } // Use broker escape bridge to support remote read From fbc7e760f60637f6bac7adb357ae0f4d57a2b33f Mon Sep 17 00:00:00 2001 From: qianye Date: Tue, 28 Apr 2026 15:14:35 +0800 Subject: [PATCH 3/4] ci: trigger CI re-run for flaky test DefaultMessageStoreTest.testCleanUnusedTopic From aa5542c427adf23b20295e40d2c0fb4487ce1604 Mon Sep 17 00:00:00 2001 From: qianye Date: Tue, 28 Apr 2026 15:35:47 +0800 Subject: [PATCH 4/4] ci: re-trigger CI for flaky test DefaultMQLitePullConsumerWithTraceTest on macOS