Skip to content

Commit c6fc39a

Browse files
authored
[ISSUE #10276] Fix PopConsumerService changeInvisibilityDuration losing CK record when visibilityTimeout collision (#10277)
1 parent 5ad6a3e commit c6fc39a

1 file changed

Lines changed: 10 additions & 3 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -511,8 +511,10 @@ public void changeInvisibilityDuration(long popTime, long invisibleTime, long ch
511511

512512
// No need to generate new records when the group does not exist,
513513
// because these retry messages will not be consumed by anyone.
514-
if (brokerConfig.isPopReviveSkipIfGroupAbsent() &&
515-
!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(groupId)) {
514+
boolean skipWrite = brokerConfig.isPopReviveSkipIfGroupAbsent() &&
515+
!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(groupId);
516+
517+
if (skipWrite) {
516518
log.info("PopConsumerService change invisibility skip, time={}, " +
517519
"groupId={}, topicId={}, queueId={}, offset={}", popTime, groupId, topicId, queueId, offset);
518520
} else {
@@ -525,7 +527,12 @@ public void changeInvisibilityDuration(long popTime, long invisibleTime, long ch
525527
}
526528
}
527529

528-
this.popConsumerStore.deleteRecords(Collections.singletonList(ackRecord));
530+
// If the new CK has the same key as the old CK (same visibilityTimeout),
531+
// the write already overwrites the old record in RocksDB, skip delete
532+
// to avoid removing the newly written record.
533+
if (skipWrite || ckRecord.getVisibilityTimeout() != ackRecord.getVisibilityTimeout()) {
534+
this.popConsumerStore.deleteRecords(Collections.singletonList(ackRecord));
535+
}
529536
}
530537

531538
// Use broker escape bridge to support remote read

0 commit comments

Comments
 (0)