Skip to content

Commit dae4e01

Browse files
hrzzzzTechnoboy-
authored andcommitted
[fix][txn] fix the consumer stuck due to deduplicated messages in pending ack state (#21177)
1 parent 44db9a0 commit dae4e01

2 files changed

Lines changed: 65 additions & 5 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -213,12 +213,7 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i
213213
this.filterAcceptedMsgs.add(entryMsgCnt);
214214
}
215215

216-
totalEntries++;
217216
int batchSize = msgMetadata.getNumMessagesInBatch();
218-
totalMessages += batchSize;
219-
totalBytes += metadataAndPayload.readableBytes();
220-
totalChunkedMessages += msgMetadata.hasChunkId() ? 1 : 0;
221-
batchSizes.setBatchSize(i, batchSize);
222217
long[] ackSet = null;
223218
if (indexesAcks != null && cursor != null) {
224219
PositionImpl position = PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
@@ -262,6 +257,12 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i
262257
}
263258
}
264259

260+
totalEntries++;
261+
totalMessages += batchSize;
262+
totalBytes += metadataAndPayload.readableBytes();
263+
totalChunkedMessages += msgMetadata.hasChunkId() ? 1 : 0;
264+
batchSizes.setBatchSize(i, batchSize);
265+
265266
BrokerInterceptor interceptor = subscription.interceptor();
266267
if (null != interceptor) {
267268
// keep for compatibility if users has implemented the old interface

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,65 @@ private void testFilterMsgsInPendingAckStateWhenConsumerDisconnect(boolean enabl
254254
Assert.assertEquals(receiveCounter, count / 2);
255255
}
256256

257+
@Test
258+
private void testMsgsInPendingAckStateWouldNotGetTheConsumerStuck() throws Exception {
259+
final String topicName = NAMESPACE1 + "/testMsgsInPendingAckStateWouldNotGetTheConsumerStuck";
260+
final String subscription = "test";
261+
262+
@Cleanup
263+
Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
264+
.topic(topicName)
265+
.create();
266+
@Cleanup
267+
Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
268+
.topic(topicName)
269+
.subscriptionName(subscription)
270+
.subscriptionType(SubscriptionType.Shared)
271+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
272+
.subscribe();
273+
274+
int numStep1Receive = 2, numStep2Receive = 2, numStep3Receive = 2;
275+
int numTotalMessage = numStep1Receive + numStep2Receive + numStep3Receive;
276+
277+
for (int i = 0; i < numTotalMessage; i++) {
278+
producer.send(i);
279+
}
280+
281+
Transaction step1Txn = getTxn();
282+
Transaction step2Txn = getTxn();
283+
284+
// Step 1, try to consume some messages but do not commit the transaction
285+
for (int i = 0; i < numStep1Receive; i++) {
286+
consumer.acknowledgeAsync(consumer.receive().getMessageId(), step1Txn).get();
287+
}
288+
289+
// Step 2, try to consume some messages and commit the transaction
290+
for (int i = 0; i < numStep2Receive; i++) {
291+
consumer.acknowledgeAsync(consumer.receive().getMessageId(), step2Txn).get();
292+
}
293+
294+
// commit step2Txn
295+
step2Txn.commit().get();
296+
297+
// close and re-create consumer
298+
consumer.close();
299+
@Cleanup
300+
Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
301+
.topic(topicName)
302+
.receiverQueueSize(numStep3Receive)
303+
.subscriptionName(subscription)
304+
.subscriptionType(SubscriptionType.Shared)
305+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
306+
.subscribe();
307+
308+
// Step 3, try to consume the rest messages and should receive all of them
309+
for (int i = 0; i < numStep3Receive; i++) {
310+
// should get the message instead of timeout
311+
Message<Integer> msg = consumer2.receive(3, TimeUnit.SECONDS);
312+
Assert.assertEquals(msg.getValue(), numStep1Receive + numStep2Receive + i);
313+
}
314+
}
315+
257316
@Test(dataProvider="enableBatch")
258317
private void produceCommitTest(boolean enableBatch) throws Exception {
259318
@Cleanup

0 commit comments

Comments
 (0)