Skip to content

Commit 5168fcb

Browse files
authored
[fix][broker] Decrement unacked counter when removeAllUpTo removes pending acks (apache#25581)
1 parent ad114ad commit 5168fcb

6 files changed

Lines changed: 243 additions & 12 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1121,6 +1121,35 @@ public PendingAcksMap getPendingAcks() {
11211121
return pendingAcks;
11221122
}
11231123

1124+
/**
1125+
* Remove all pending acks up to the given mark-delete position and decrement the consumer's unacked message
1126+
* counter by the remaining unacked count for each removed entry.
1127+
*
1128+
* <p>This is used when the cursor's mark-delete position advances past entries that are still in the consumer's
1129+
* pending acks. The remaining unacked count accounts for batch index level acknowledgments — only the truly
1130+
* unacked batch indexes are decremented.
1131+
*
1132+
* @param markDeleteLedgerId the ledger ID up to which to remove pending acks
1133+
* @param markDeleteEntryId the entry ID up to which to remove pending acks
1134+
*/
1135+
public void removePendingAcksUpToPositionAndDecrementUnacked(long markDeleteLedgerId, long markDeleteEntryId) {
1136+
if (pendingAcks == null) {
1137+
return;
1138+
}
1139+
1140+
MutableInt mutableTotalUnacked = new MutableInt(0);
1141+
pendingAcks.removeAllUpTo(markDeleteLedgerId, markDeleteEntryId,
1142+
(ledgerId, entryId, batchSize, stickyKeyHash) -> {
1143+
mutableTotalUnacked.add((int) getUnAckedCountForBatchIndexLevelEnabled(
1144+
PositionFactory.create(ledgerId, entryId), batchSize));
1145+
});
1146+
int totalUnacked = mutableTotalUnacked.intValue();
1147+
if (totalUnacked > 0) {
1148+
addAndGetUnAckedMsgs(this, -totalUnacked);
1149+
updateBlockedConsumerOnUnackedMsgs(this);
1150+
}
1151+
}
1152+
11241153
public int getPriorityLevel() {
11251154
return priorityLevel;
11261155
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -324,13 +324,16 @@ public boolean remove(long ledgerId, long entryId) {
324324
}
325325

326326
/**
327-
* Remove all pending acks up to the given ledger ID and entry ID.
327+
* Remove all pending acks up to the given ledger ID and entry ID, invoking a callback for each removed entry.
328328
*
329329
* @param markDeleteLedgerId the ledger ID up to which to remove pending acks
330330
* @param markDeleteEntryId the entry ID up to which to remove pending acks
331+
* @param removedEntryCallback optional callback invoked for each removed entry (within the write lock),
332+
* receiving ledgerId, entryId, batchSize, and stickyKeyHash
331333
*/
332-
public void removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) {
333-
internalRemoveAllUpTo(markDeleteLedgerId, markDeleteEntryId, false);
334+
public void removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId,
335+
PendingAcksConsumer removedEntryCallback) {
336+
internalRemoveAllUpTo(markDeleteLedgerId, markDeleteEntryId, false, removedEntryCallback);
334337
}
335338

336339
/**
@@ -343,8 +346,10 @@ public void removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) {
343346
* @param markDeleteLedgerId the ledger ID up to which to remove pending acks
344347
* @param markDeleteEntryId the entry ID up to which to remove pending acks
345348
* @param useWriteLock true if the method should use a write lock, false otherwise
349+
* @param removedEntryCallback optional callback invoked for each removed entry (within the write lock)
346350
*/
347-
private void internalRemoveAllUpTo(long markDeleteLedgerId, long markDeleteEntryId, boolean useWriteLock) {
351+
private void internalRemoveAllUpTo(long markDeleteLedgerId, long markDeleteEntryId, boolean useWriteLock,
352+
PendingAcksConsumer removedEntryCallback) {
348353
PendingAcksRemoveHandler pendingAcksRemoveHandler = pendingAcksRemoveHandlerSupplier.get();
349354
// track if the write lock was acquired
350355
boolean acquiredWriteLock = false;
@@ -380,14 +385,19 @@ private void internalRemoveAllUpTo(long markDeleteLedgerId, long markDeleteEntry
380385
retryWithWriteLock = true;
381386
return;
382387
}
388+
IntIntPair value = intIntPairEntry.getValue();
389+
int batchSize = value.leftInt();
390+
int stickyKeyHash = value.rightInt();
383391
if (pendingAcksRemoveHandler != null) {
384392
if (!batchStarted) {
385393
pendingAcksRemoveHandler.startBatch();
386394
batchStarted = true;
387395
}
388-
int stickyKeyHash = intIntPairEntry.getValue().rightInt();
389396
pendingAcksRemoveHandler.handleRemoving(consumer, ledgerId, entryId, stickyKeyHash, closed);
390397
}
398+
if (removedEntryCallback != null) {
399+
removedEntryCallback.accept(ledgerId, entryId, batchSize, stickyKeyHash);
400+
}
391401
entryMapIterator.remove();
392402
// also remove from the original map if we're iterating a copy
393403
if (ledgerId == markDeleteLedgerId) {
@@ -411,7 +421,7 @@ private void internalRemoveAllUpTo(long markDeleteLedgerId, long markDeleteEntry
411421
} else {
412422
readLock.unlock();
413423
if (retryWithWriteLock) {
414-
internalRemoveAllUpTo(markDeleteLedgerId, markDeleteEntryId, true);
424+
internalRemoveAllUpTo(markDeleteLedgerId, markDeleteEntryId, true, removedEntryCallback);
415425
}
416426
}
417427
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -367,8 +367,8 @@ public synchronized void readMoreEntries() {
367367
if (lastMarkDeletePositionBeforeReadMoreEntries != markDeletePosition) {
368368
redeliveryMessages.removeAllUpTo(markDeletePosition.getLedgerId(), markDeletePosition.getEntryId());
369369
for (Consumer consumer : consumerList) {
370-
consumer.getPendingAcks()
371-
.removeAllUpTo(markDeletePosition.getLedgerId(), markDeletePosition.getEntryId());
370+
consumer.removePendingAcksUpToPositionAndDecrementUnacked(
371+
markDeletePosition.getLedgerId(), markDeletePosition.getEntryId());
372372
}
373373
lastMarkDeletePositionBeforeReadMoreEntries = markDeletePosition;
374374
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ public void removeAllUpTo_RemovesAllPendingAcksUpToSpecifiedEntry() {
117117
pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 1, 124);
118118
pendingAcksMap.addPendingAckIfAllowed(2L, 1L, 1, 125);
119119

120-
pendingAcksMap.removeAllUpTo(1L, 2L);
120+
pendingAcksMap.removeAllUpTo(1L, 2L, (ledgerId, entryId, batchSize, stickyKeyHash) -> {
121+
});
121122

122123
assertFalse(pendingAcksMap.contains(1L, 1L));
123124
assertFalse(pendingAcksMap.contains(1L, 2L));
@@ -134,7 +135,8 @@ public void removeAllUpTo_RemovesAllPendingAcksUpToSpecifiedEntryAcrossMultipleL
134135
pendingAcksMap.addPendingAckIfAllowed(2L, 2L, 1, 126);
135136
pendingAcksMap.addPendingAckIfAllowed(3L, 1L, 1, 127);
136137

137-
pendingAcksMap.removeAllUpTo(2L, 1L);
138+
pendingAcksMap.removeAllUpTo(2L, 1L, (ledgerId, entryId, batchSize, stickyKeyHash) -> {
139+
});
138140

139141
assertFalse(pendingAcksMap.contains(1L, 1L));
140142
assertFalse(pendingAcksMap.contains(1L, 2L));
@@ -176,13 +178,36 @@ public void removeAllUpTo_InvokesRemoveHandlerForEachEntry() {
176178
pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 1, 124);
177179
pendingAcksMap.addPendingAckIfAllowed(2L, 1L, 1, 125);
178180

179-
pendingAcksMap.removeAllUpTo(1L, 2L);
181+
pendingAcksMap.removeAllUpTo(1L, 2L, (ledgerId, entryId, batchSize, stickyKeyHash) -> {
182+
});
180183

181184
verify(removeHandler).handleRemoving(consumer, 1L, 1L, 123, false);
182185
verify(removeHandler).handleRemoving(consumer, 1L, 2L, 124, false);
183186
verify(removeHandler, never()).handleRemoving(consumer, 2L, 1L, 125, false);
184187
}
185188

189+
@Test
190+
public void removeAllUpToWithCallback_InvokesCallbackForEachRemovedEntry() {
191+
Consumer consumer = createMockConsumer("consumer1");
192+
PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null);
193+
pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 3, 123);
194+
pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 5, 124);
195+
pendingAcksMap.addPendingAckIfAllowed(2L, 1L, 7, 125);
196+
197+
List<int[]> callbackInvocations = new ArrayList<>();
198+
pendingAcksMap.removeAllUpTo(1L, 2L,
199+
(ledgerId, entryId, batchSize, stickyKeyHash) -> {
200+
callbackInvocations.add(new int[]{(int) ledgerId, (int) entryId, batchSize, stickyKeyHash});
201+
});
202+
203+
assertEquals(callbackInvocations.size(), 2);
204+
assertEquals(callbackInvocations.get(0), new int[]{1, 1, 3, 123});
205+
assertEquals(callbackInvocations.get(1), new int[]{1, 2, 5, 124});
206+
assertFalse(pendingAcksMap.contains(1L, 1L));
207+
assertFalse(pendingAcksMap.contains(1L, 2L));
208+
assertTrue(pendingAcksMap.contains(2L, 1L));
209+
}
210+
186211
@Test
187212
public void size_ReturnsCorrectSize() {
188213
Consumer consumer = createMockConsumer("consumer1");

pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.apache.pulsar.broker.service.StickyKeyDispatcher;
6060
import org.apache.pulsar.broker.service.Subscription;
6161
import org.apache.pulsar.broker.service.Topic;
62+
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
6263
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
6364
import org.apache.pulsar.broker.service.plugin.EntryFilter;
6465
import org.apache.pulsar.broker.service.plugin.EntryFilterProducerTest;
@@ -106,6 +107,7 @@ protected void setup() throws Exception {
106107
@Override
107108
protected ServiceConfiguration getDefaultConf() {
108109
ServiceConfiguration conf = super.getDefaultConf();
110+
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
109111
conf.setMaxUnackedMessagesPerConsumer(0);
110112
// wait for shutdown of the broker, this prevents flakiness which could be caused by metrics being
111113
// unregistered asynchronously. This impacts the execution of the next test method if this would be happening.
@@ -731,6 +733,170 @@ public void testKeySharedDrainingHashesConsumerStats() throws Exception {
731733

732734
}
733735

736+
@DataProvider(name = "subscriptionTypes")
737+
public Object[][] subscriptionTypes() {
738+
return new Object[][]{
739+
{SubscriptionType.Shared},
740+
{SubscriptionType.Key_Shared}
741+
};
742+
}
743+
744+
/**
745+
* Verify unacked count is correctly decremented when removeAllUpTo removes non-batch
746+
* entries from pendingAcks after mark-delete advances via message expiry.
747+
*/
748+
@Test(dataProvider = "subscriptionTypes")
749+
public void testUnackedCountNonBatchAfterExpire(SubscriptionType subType) throws Exception {
750+
String topic = newTopicName();
751+
String sub = "sub";
752+
int numMessages = 10;
753+
754+
@Cleanup Producer<byte[]> producer = pulsarClient.newProducer()
755+
.topic(topic).enableBatching(false).create();
756+
@Cleanup Consumer<byte[]> consumer = pulsarClient.newConsumer()
757+
.topic(topic).subscriptionName(sub)
758+
.subscriptionType(subType)
759+
.subscribe();
760+
761+
for (int i = 0; i < numMessages; i++) {
762+
producer.send(("msg-" + i).getBytes());
763+
}
764+
765+
org.apache.pulsar.broker.service.Consumer svcConsumer =
766+
getTheUniqueServiceConsumer(topic, sub);
767+
for (int i = 0; i < numMessages; i++) {
768+
Message<byte[]> msg = consumer.receive(2, TimeUnit.SECONDS);
769+
Assert.assertNotNull(msg, "Expected to receive message " + i);
770+
}
771+
772+
Awaitility.await().untilAsserted(() ->
773+
assertEquals(numMessages, svcConsumer.getUnackedMessages()));
774+
775+
expireAndVerifyUnackedDrained(topic, sub, producer, consumer, svcConsumer);
776+
}
777+
778+
/**
779+
* Verify unacked count is correctly decremented when removeAllUpTo removes batch
780+
* entries from pendingAcks after mark-delete advances via message expiry.
781+
*/
782+
@Test(dataProvider = "subscriptionTypes")
783+
public void testUnackedCountBatchAfterExpire(SubscriptionType subType) throws Exception {
784+
String topic = newTopicName();
785+
String sub = "sub";
786+
int numMessages = 10;
787+
788+
@Cleanup Producer<byte[]> producer = pulsarClient.newProducer()
789+
.topic(topic)
790+
.batchingMaxMessages(20)
791+
.batchingMaxPublishDelay(1, TimeUnit.HOURS)
792+
.enableBatching(true)
793+
.create();
794+
@Cleanup Consumer<byte[]> consumer = pulsarClient.newConsumer()
795+
.topic(topic).subscriptionName(sub)
796+
.subscriptionType(subType)
797+
.subscribe();
798+
799+
for (int i = 0; i < numMessages; i++) {
800+
producer.newMessage().value(("batch-" + i).getBytes()).sendAsync();
801+
}
802+
producer.flush();
803+
804+
for (int i = 0; i < numMessages; i++) {
805+
Message<byte[]> msg = consumer.receive(2, TimeUnit.SECONDS);
806+
Assert.assertNotNull(msg, "Expected to receive message " + i);
807+
}
808+
809+
org.apache.pulsar.broker.service.Consumer svcConsumer =
810+
getTheUniqueServiceConsumer(topic, sub);
811+
812+
Awaitility.await().untilAsserted(() ->
813+
assertEquals(numMessages, svcConsumer.getUnackedMessages()));
814+
815+
expireAndVerifyUnackedDrained(topic, sub, producer, consumer, svcConsumer);
816+
}
817+
818+
/**
819+
* Verify unacked count is correctly decremented when removeAllUpTo removes a partially-acked
820+
* batch entry from pendingAcks after mark-delete advances via message expiry.
821+
*
822+
* <p>Flow: produce batch(batchSize=10) → consume all → ack 5 of 10 → expire → unacked should be 0.
823+
*/
824+
@Test(dataProvider = "subscriptionTypes")
825+
public void testUnackedCountBatchPartialAckAfterExpire(SubscriptionType subType) throws Exception {
826+
String topic = newTopicName();
827+
String sub = "sub";
828+
int numMessages = 10;
829+
int ackCount = 5;
830+
831+
@Cleanup Producer<byte[]> producer = pulsarClient.newProducer()
832+
.topic(topic)
833+
.batchingMaxMessages(20)
834+
.batchingMaxPublishDelay(1, TimeUnit.HOURS)
835+
.enableBatching(true)
836+
.create();
837+
@Cleanup Consumer<byte[]> consumer = pulsarClient.newConsumer()
838+
.topic(topic)
839+
.subscriptionName(sub)
840+
.enableBatchIndexAcknowledgment(true)
841+
.subscriptionType(subType)
842+
.subscribe();
843+
844+
for (int i = 0; i < numMessages; i++) {
845+
producer.newMessage().value(("batch-" + i).getBytes()).sendAsync();
846+
}
847+
producer.flush();
848+
849+
List<Message<byte[]>> messages = new ArrayList<>();
850+
for (int i = 0; i < numMessages; i++) {
851+
Message<byte[]> msg = consumer.receive(2, TimeUnit.SECONDS);
852+
Assert.assertNotNull(msg, "Expected to receive message " + i);
853+
messages.add(msg);
854+
}
855+
856+
org.apache.pulsar.broker.service.Consumer svcConsumer =
857+
getTheUniqueServiceConsumer(topic, sub);
858+
859+
Awaitility.await().untilAsserted(() ->
860+
assertEquals(numMessages, svcConsumer.getUnackedMessages()));
861+
862+
// Partially ack — ack 5 of 10 batch indexes
863+
for (int i = 0; i < ackCount; i++) {
864+
consumer.acknowledge(messages.get(i));
865+
}
866+
Awaitility.await().untilAsserted(() ->
867+
assertEquals(numMessages - ackCount, svcConsumer.getUnackedMessages()));
868+
869+
expireAndVerifyUnackedDrained(topic, sub, producer, consumer, svcConsumer);
870+
}
871+
872+
private void expireAndVerifyUnackedDrained(String topic, String sub,
873+
Producer<byte[]> producer, Consumer<byte[]> consumer,
874+
org.apache.pulsar.broker.service.Consumer svcConsumer)
875+
throws Exception {
876+
PersistentTopic pTopic = (PersistentTopic) pulsar.getBrokerService()
877+
.getTopicReference(topic).get();
878+
879+
Thread.sleep(1100);
880+
pTopic.getSubscription(sub).expireMessagesAsync(1).get();
881+
882+
// Trigger readMoreEntries to invoke removeAllUpTo
883+
producer.send("trigger".getBytes());
884+
Message<byte[]> triggerMsg = consumer.receive(2, TimeUnit.SECONDS);
885+
Assert.assertNotNull(triggerMsg);
886+
consumer.acknowledge(triggerMsg);
887+
888+
Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() ->
889+
assertEquals(0, svcConsumer.getUnackedMessages()));
890+
}
891+
892+
private org.apache.pulsar.broker.service.Consumer getTheUniqueServiceConsumer(String topic, String sub) {
893+
PersistentTopic persistentTopic =
894+
(PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get();
895+
AbstractPersistentDispatcherMultipleConsumers dispatcher =
896+
(AbstractPersistentDispatcherMultipleConsumers) persistentTopic.getSubscription(sub).getDispatcher();
897+
return dispatcher.getConsumers().iterator().next();
898+
}
899+
734900
private String findConsumerNameForHash(SubscriptionStats subscriptionStats, int hash) {
735901
return findConsumerForHash(subscriptionStats, hash).map(ConsumerStats::getConsumerName).orElse(null);
736902
}

pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.function.BiFunction;
3131
import org.apache.commons.lang3.tuple.Pair;
3232
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
33+
import org.apache.pulsar.common.naming.TopicName;
3334
import org.apache.pulsar.common.policies.data.ClusterData;
3435
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
3536
import org.testng.Assert;
@@ -71,7 +72,7 @@ protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, T rece
7172
private static final Random random = new Random();
7273

7374
protected String newTopicName() {
74-
return "my-property/my-ns/topic-" + Long.toHexString(random.nextLong());
75+
return TopicName.get("my-property/my-ns/topic-" + Long.toHexString(random.nextLong())).toString();
7576
}
7677

7778
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)