Skip to content

Commit 2f6345d

Browse files
[fix][broker] Restore the behavior to dispatch batch messages according to consumer permits (#24092)
1 parent 3e6f7de commit 2f6345d

2 files changed

Lines changed: 7 additions & 3 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -843,14 +843,14 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
843843
c, c.getAvailablePermits());
844844
}
845845

846-
int maxMessagesInThisBatch =
847-
Math.max(remainingMessages, serviceConfig.getDispatcherMaxRoundRobinBatchSize());
846+
int maxMessagesInThisBatch = Math.min(remainingMessages, availablePermits);
848847
if (c.getMaxUnackedMessages() > 0) {
849848
// Calculate the maximum number of additional unacked messages allowed
850849
int maxAdditionalUnackedMessages = Math.max(c.getMaxUnackedMessages() - c.getUnackedMessages(), 0);
851850
maxMessagesInThisBatch = Math.min(maxMessagesInThisBatch, maxAdditionalUnackedMessages);
852851
}
853-
int maxEntriesInThisBatch = Math.min(availablePermits,
852+
// TODO: add tests to verify dispatcherMaxRoundRobinBatchSize is respected
853+
int maxEntriesInThisBatch = Math.min(serviceConfig.getDispatcherMaxRoundRobinBatchSize(),
854854
// use the average batch size per message to calculate the number of entries to
855855
// dispatch. round up to the next integer without using floating point arithmetic.
856856
(maxMessagesInThisBatch + avgBatchSizePerMsg - 1) / avgBatchSizePerMsg);

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,6 +1012,10 @@ public void testBatchMessageDispatchingAccordingToPermits() throws Exception {
10121012
}
10131013
FutureUtil.waitForAll(sendFutureList).get();
10141014

1015+
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
1016+
assertTrue(consumer1.numMessagesInQueue() > 0);
1017+
assertTrue(consumer2.numMessagesInQueue() > 0);
1018+
});
10151019
assertEquals(consumer1.numMessagesInQueue(), batchMessages, batchMessages);
10161020
assertEquals(consumer2.numMessagesInQueue(), batchMessages, batchMessages);
10171021

0 commit comments

Comments
 (0)