Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -511,8 +511,10 @@ public void changeInvisibilityDuration(long popTime, long invisibleTime, long ch

// No need to generate new records when the group does not exist,
// because these retry messages will not be consumed by anyone.
if (brokerConfig.isPopReviveSkipIfGroupAbsent() &&
!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(groupId)) {
boolean skipWrite = brokerConfig.isPopReviveSkipIfGroupAbsent() &&
!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(groupId);

if (skipWrite) {
log.info("PopConsumerService change invisibility skip, time={}, " +
"groupId={}, topicId={}, queueId={}, offset={}", popTime, groupId, topicId, queueId, offset);
} else {
Expand All @@ -525,7 +527,12 @@ public void changeInvisibilityDuration(long popTime, long invisibleTime, long ch
}
}

this.popConsumerStore.deleteRecords(Collections.singletonList(ackRecord));
// If the new CK has the same key as the old CK (same visibilityTimeout),
// the write already overwrites the old record in RocksDB, skip delete
// to avoid removing the newly written record.
if (skipWrite || ckRecord.getVisibilityTimeout() != ackRecord.getVisibilityTimeout()) {
this.popConsumerStore.deleteRecords(Collections.singletonList(ackRecord));
}
}

// Use broker escape bridge to support remote read
Expand Down
Loading