From 011a684d6ee339f0ac2a1a09dbd85c82e8e80791 Mon Sep 17 00:00:00 2001 From: gosonzhang Date: Mon, 23 Mar 2026 11:20:47 +0800 Subject: [PATCH 1/5] [fix][client] Fixed a stuck issue caused by an exception encountered while processing batch messages --- .../apache/pulsar/client/impl/ConsumerImpl.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 868c45b277e48..bd1a52bc9d039 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1786,6 +1786,7 @@ void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata, SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata(); int skippedMessages = 0; + int processedMessages = 0; try { for (int i = 0; i < batchSize; ++i) { final MessageImpl message = newSingleMessage(i, batchSize, brokerEntryMetadata, msgMetadata, @@ -1815,13 +1816,14 @@ void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata, continue; } executeNotifyCallback(message); + processedMessages++; } if (ackBitSet != null) { ackBitSet.recycle(); } - } catch (IllegalStateException e) { - log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName, e); - discardCorruptedMessage(messageId, cnx, ValidationError.BatchDeSerializeError); + } catch (IllegalStateException | IllegalArgumentException e) { + discardCorruptedBatchMessage(messageId, cnx, + (batchSize - skippedMessages - processedMessages), ValidationError.BatchDeSerializeError); } if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null) { @@ -2153,6 +2155,14 @@ private void discardCorruptedMessage(MessageIdData messageId, ClientCnx currentC discardMessage(messageId, currentCnx, validationError, 1); } + private void discardCorruptedBatchMessage(MessageIdData messageId, ClientCnx currentCnx, + int unreadMessages, ValidationError validationError) { + log.error("[{}] [{}] Discarding corrupted batch message at {}:{}, unread count={}, exception={}", + subscription, consumerName, messageId.getLedgerId(), messageId.getEntryId(), + unreadMessages, validationError); + discardMessage(messageId, currentCnx, validationError, unreadMessages); + } + private void discardMessage(MessageIdData messageId, ClientCnx currentCnx, ValidationError validationError, int batchMessages) { ByteBuf cmd = Commands.newAck(consumerId, messageId.getLedgerId(), messageId.getEntryId(), null, From 9bc90c83fac34e6fa48dc7dcce458c59c469b0a5 Mon Sep 17 00:00:00 2001 From: gosonzhang Date: Mon, 23 Mar 2026 14:25:21 +0800 Subject: [PATCH 2/5] add test cases --- .../impl/BatchMessageDecodeFailureTest.java | 251 ++++++++++++++++++ 1 file changed, 251 insertions(+) create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageDecodeFailureTest.java diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageDecodeFailureTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageDecodeFailureTest.java new file mode 100644 index 0000000000000..11313094595f8 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageDecodeFailureTest.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertEquals; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.client.util.ExecutorProvider; +import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; +import org.apache.pulsar.common.api.proto.MessageIdData; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.api.proto.SingleMessageMetadata; +import org.apache.pulsar.common.protocol.Commands; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Tests for decode failure scenarios in + * {@link ConsumerImpl#receiveIndividualMessagesFromBatch}. + * + */ +public class BatchMessageDecodeFailureTest { + + private static final String TOPIC = "persistent://tenant/ns1/test-decode-failure"; + + private ExecutorProvider executorProvider; + private ExecutorService internalExecutor; + private ClientCnx mockCnx; + private ConsumerImpl consumer; + private ConsumerStatsRecorderImpl statsRecorder; + + @BeforeMethod(alwaysRun = true) + public void setUp() { + executorProvider = new ExecutorProvider(1, "BatchDecodeFailureTest"); + internalExecutor = Executors.newSingleThreadScheduledExecutor(); + + mockCnx = ClientTestFixtures.mockClientCnx(); + + PulsarClientImpl client = ClientTestFixtures.createPulsarClientMockWithMockedClientCnx( + executorProvider, internalExecutor, mockCnx); + ClientConfigurationData clientConf = client.getConfiguration(); + clientConf.setOperationTimeoutMs(100); + // Set StatsIntervalSeconds > 0 to enable real stats recording + clientConf.setStatsIntervalSeconds(1); + + ConsumerConfigurationData consumerConf = new ConsumerConfigurationData<>(); + consumerConf.setSubscriptionName("test-sub"); + + CompletableFuture> subscribeFuture = new CompletableFuture<>(); + consumer = ConsumerImpl.newConsumerImpl(client, TOPIC, consumerConf, + executorProvider, -1, false, subscribeFuture, null, null, null, + true); + consumer.setState(HandlerState.State.Ready); + consumer.setClientCnx(mockCnx); + + // Replace the stats field with a spy to verify incrementNumReceiveFailed calls. + // Use FieldUtils.writeField to inject the spy into the final field. + statsRecorder = spy(new ConsumerStatsRecorderImpl(consumer)); + try { + FieldUtils.writeField(consumer, "stats", statsRecorder, true); + } catch (Exception e) { + throw new RuntimeException("Failed to inject spy stats recorder", e); + } + } + + @AfterMethod(alwaysRun = true) + public void cleanup() { + if (executorProvider != null) { + executorProvider.shutdownNow(); + executorProvider = null; + } + if (internalExecutor != null) { + internalExecutor.shutdownNow(); + internalExecutor = null; + } + } + + /** + * All messages in the batch are corrupted (payload is random garbage bytes). + * Expected: no messages enqueued, discardCorruptedBatchMessage called, + * incrementNumReceiveFailed invoked once. + */ + @Test + public void testAllMessagesCorruptedInBatch() { + BrokerEntryMetadata brokerEntryMetadata = + new BrokerEntryMetadata().setBrokerTimestamp(1).setIndex(1); + + MessageMetadata metadata = new MessageMetadata() + .setProducerName("test-producer") + .setSequenceId(1) + .setPublishTime(1) + .setNumMessagesInBatch(3); + + // Construct corrupted payload: invalid metadata size triggers parse failure + // in deSerializeSingleMessageInBatch -> readUnsignedInt returns huge value + ByteBuf corruptedPayload = Unpooled.buffer(100); + corruptedPayload.writeInt(Integer.MAX_VALUE); + corruptedPayload.writeBytes(new byte[50]); + + consumer.receiveIndividualMessagesFromBatch(brokerEntryMetadata, metadata, 0, null, + corruptedPayload, new MessageIdData().setLedgerId(1000).setEntryId(1), + mockCnx, DEFAULT_CONSUMER_EPOCH, false); + + // No valid messages should be enqueued + assertEquals(consumer.numMessagesInQueue(), 0, + "Corrupted batch should not produce any messages in queue"); + + // Verify discardCorruptedBatchMessage incremented the failed counter + verify(statsRecorder, times(1)).incrementNumReceiveFailed(); + } + + /** + * Partial decode failure: the first message in the batch is valid, + * but the second message is corrupted. + * Expected: the first valid message is enqueued; remaining corrupted + * messages are discarded. + */ + @Test + public void testPartialDecodeFailureInBatch() throws Exception { + BrokerEntryMetadata brokerEntryMetadata = + new BrokerEntryMetadata().setBrokerTimestamp(1).setIndex(1); + + MessageMetadata metadata = new MessageMetadata() + .setProducerName("test-producer") + .setSequenceId(1) + .setPublishTime(1) + .setNumMessagesInBatch(3); + + ByteBuf batchBuffer = Unpooled.buffer(1000); + + // First message: valid, properly serialized using Commands utility + Commands.serializeSingleMessageInBatchWithPayload( + new SingleMessageMetadata().setPartitionKey("key1"), + Unpooled.wrappedBuffer("hello".getBytes()), batchBuffer); + + // Second message: corrupted payload (invalid metadata size) + batchBuffer.writeInt(Integer.MAX_VALUE); + batchBuffer.writeBytes(new byte[20]); + + consumer.receiveIndividualMessagesFromBatch(brokerEntryMetadata, metadata, 0, null, + batchBuffer, new MessageIdData().setLedgerId(2000).setEntryId(2), + mockCnx, DEFAULT_CONSUMER_EPOCH, false); + + // The first valid message should be enqueued via executeNotifyCallback, + // which uses internalPinnedExecutor.execute(). Since it's async, + // we check the stats synchronously — discardCorruptedBatchMessage + // is called inline in the catch block. + verify(statsRecorder, times(1)).incrementNumReceiveFailed(); + } + + /** + * Empty payload with non-zero batchSize: the buffer has no readable bytes + * but batchSize declares 2 messages. + * Expected: IndexOutOfBoundsException during readUnsignedInt, entire batch discarded. + */ + @Test + public void testEmptyPayloadWithNonZeroBatchSize() { + BrokerEntryMetadata brokerEntryMetadata = + new BrokerEntryMetadata().setBrokerTimestamp(1).setIndex(1); + + MessageMetadata metadata = new MessageMetadata() + .setProducerName("test-producer") + .setSequenceId(1) + .setPublishTime(1) + .setNumMessagesInBatch(2); + + // Empty buffer: readUnsignedInt() will throw IndexOutOfBoundsException + ByteBuf emptyPayload = Unpooled.buffer(0); + + consumer.receiveIndividualMessagesFromBatch(brokerEntryMetadata, metadata, 0, null, + emptyPayload, new MessageIdData().setLedgerId(3000).setEntryId(3), + mockCnx, DEFAULT_CONSUMER_EPOCH, false); + + // No messages should be enqueued + assertEquals(consumer.numMessagesInQueue(), 0, + "Empty payload should not produce any messages in queue"); + + // The receive-failed counter should have been incremented + verify(statsRecorder, times(1)).incrementNumReceiveFailed(); + } + + /** + * Truncated single-message payload: metadata size is valid but payload + * data is truncated (fewer bytes than declared payloadSize). + * Expected: exception during retainedSlice, entire batch discarded. + */ + @Test + public void testTruncatedSingleMessagePayload() { + BrokerEntryMetadata brokerEntryMetadata = + new BrokerEntryMetadata().setBrokerTimestamp(1).setIndex(1); + + MessageMetadata metadata = new MessageMetadata() + .setProducerName("test-producer") + .setSequenceId(1) + .setPublishTime(1) + .setNumMessagesInBatch(1); + + ByteBuf truncatedBuffer = Unpooled.buffer(100); + + // Write a valid SingleMessageMetadata with a large payloadSize, + // but provide fewer actual bytes than declared. + // This causes retainedSlice(readerIndex, 9999) to fail with + // IndexOutOfBoundsException. + SingleMessageMetadata smm = new SingleMessageMetadata() + .setPartitionKey("key-truncated") + .setPayloadSize(9999); // Claims 9999 bytes of payload + truncatedBuffer.writeInt(smm.getSerializedSize()); + smm.writeTo(truncatedBuffer); + // Only write 5 bytes instead of 9999 + truncatedBuffer.writeBytes(new byte[5]); + + consumer.receiveIndividualMessagesFromBatch(brokerEntryMetadata, metadata, 0, null, + truncatedBuffer, new MessageIdData().setLedgerId(4000).setEntryId(4), + mockCnx, DEFAULT_CONSUMER_EPOCH, false); + + // No messages should be enqueued + assertEquals(consumer.numMessagesInQueue(), 0, + "Truncated payload should not produce any messages in queue"); + + // The receive-failed counter should have been incremented + verify(statsRecorder, times(1)).incrementNumReceiveFailed(); + } +} From 9f670a41a745e3bb3a586869e94a2217e1271e23 Mon Sep 17 00:00:00 2001 From: gosonzhang Date: Mon, 23 Mar 2026 15:36:52 +0800 Subject: [PATCH 3/5] fix --- .../impl/BatchMessageDecodeFailureTest.java | 31 ------------------- 1 file changed, 31 deletions(-) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageDecodeFailureTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageDecodeFailureTest.java index 11313094595f8..bb4a420493b77 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageDecodeFailureTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageDecodeFailureTest.java @@ -176,37 +176,6 @@ batchBuffer, new MessageIdData().setLedgerId(2000).setEntryId(2), verify(statsRecorder, times(1)).incrementNumReceiveFailed(); } - /** - * Empty payload with non-zero batchSize: the buffer has no readable bytes - * but batchSize declares 2 messages. - * Expected: IndexOutOfBoundsException during readUnsignedInt, entire batch discarded. - */ - @Test - public void testEmptyPayloadWithNonZeroBatchSize() { - BrokerEntryMetadata brokerEntryMetadata = - new BrokerEntryMetadata().setBrokerTimestamp(1).setIndex(1); - - MessageMetadata metadata = new MessageMetadata() - .setProducerName("test-producer") - .setSequenceId(1) - .setPublishTime(1) - .setNumMessagesInBatch(2); - - // Empty buffer: readUnsignedInt() will throw IndexOutOfBoundsException - ByteBuf emptyPayload = Unpooled.buffer(0); - - consumer.receiveIndividualMessagesFromBatch(brokerEntryMetadata, metadata, 0, null, - emptyPayload, new MessageIdData().setLedgerId(3000).setEntryId(3), - mockCnx, DEFAULT_CONSUMER_EPOCH, false); - - // No messages should be enqueued - assertEquals(consumer.numMessagesInQueue(), 0, - "Empty payload should not produce any messages in queue"); - - // The receive-failed counter should have been incremented - verify(statsRecorder, times(1)).incrementNumReceiveFailed(); - } - /** * Truncated single-message payload: metadata size is valid but payload * data is truncated (fewer bytes than declared payloadSize). From 9a58823fb683bdeedcc403d9a4b2128ed40c957d Mon Sep 17 00:00:00 2001 From: gosonzhang Date: Mon, 23 Mar 2026 17:03:09 +0800 Subject: [PATCH 4/5] replace times() to timeout() --- .../pulsar/client/impl/BatchMessageDecodeFailureTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageDecodeFailureTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageDecodeFailureTest.java index bb4a420493b77..b146bd43fe0a9 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageDecodeFailureTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageDecodeFailureTest.java @@ -20,7 +20,7 @@ import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; import io.netty.buffer.ByteBuf; @@ -134,7 +134,7 @@ corruptedPayload, new MessageIdData().setLedgerId(1000).setEntryId(1), "Corrupted batch should not produce any messages in queue"); // Verify discardCorruptedBatchMessage incremented the failed counter - verify(statsRecorder, times(1)).incrementNumReceiveFailed(); + verify(statsRecorder, timeout(2000)).incrementNumReceiveFailed(); } /** @@ -173,7 +173,7 @@ batchBuffer, new MessageIdData().setLedgerId(2000).setEntryId(2), // which uses internalPinnedExecutor.execute(). Since it's async, // we check the stats synchronously — discardCorruptedBatchMessage // is called inline in the catch block. - verify(statsRecorder, times(1)).incrementNumReceiveFailed(); + verify(statsRecorder, timeout(2000)).incrementNumReceiveFailed(); } /** @@ -215,6 +215,6 @@ truncatedBuffer, new MessageIdData().setLedgerId(4000).setEntryId(4), "Truncated payload should not produce any messages in queue"); // The receive-failed counter should have been incremented - verify(statsRecorder, times(1)).incrementNumReceiveFailed(); + verify(statsRecorder, timeout(2000)).incrementNumReceiveFailed(); } } From 36a0257747fed782b15d4861bbd3b6d72ad093e7 Mon Sep 17 00:00:00 2001 From: gosonzhang Date: Tue, 24 Mar 2026 10:32:30 +0800 Subject: [PATCH 5/5] Add ack set processing logic when batch index ack is enabled --- .../pulsar/client/impl/ConsumerImpl.java | 37 ++- .../impl/BatchMessageDecodeFailureTest.java | 279 ++++++++++++++++-- 2 files changed, 284 insertions(+), 32 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index bd1a52bc9d039..69b5ac6216b8f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1822,8 +1822,9 @@ void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata, ackBitSet.recycle(); } } catch (IllegalStateException | IllegalArgumentException e) { - discardCorruptedBatchMessage(messageId, cnx, - (batchSize - skippedMessages - processedMessages), ValidationError.BatchDeSerializeError); + // For IllegalArgumentException see PR: https://github.com/apache/pulsar/pull/24061 + discardCorruptedBatchMessage(messageId, cnx, batchSize, + skippedMessages, processedMessages, ValidationError.BatchDeSerializeError); } if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null) { @@ -2155,12 +2156,36 @@ private void discardCorruptedMessage(MessageIdData messageId, ClientCnx currentC discardMessage(messageId, currentCnx, validationError, 1); } + /** + * When batch index ack is enabled, ack the messages that failed to deserialize by their index, + * while keeping successfully enqueued messages unacknowledged to avoid message loss. + */ private void discardCorruptedBatchMessage(MessageIdData messageId, ClientCnx currentCnx, - int unreadMessages, ValidationError validationError) { - log.error("[{}] [{}] Discarding corrupted batch message at {}:{}, unread count={}, exception={}", + int batchSize, int skipped, int processed, ValidationError validationError) { + log.error("[{}] [{}] Discarding corrupted batch messages with batch index ack at {}:{}, " + + "batchSize={}, skipped={}, processed={}, exception={}", subscription, consumerName, messageId.getLedgerId(), messageId.getEntryId(), - unreadMessages, validationError); - discardMessage(messageId, currentCnx, validationError, unreadMessages); + batchSize, skipped, processed, validationError); + BitSetRecyclable ackBitSet = null; + int corruptedStartIndex = skipped + processed; + if (conf.isBatchIndexAckEnabled()) { + // When batch index ack is enabled, only ack the messages that failed to deserialize. + // Messages that have been successfully enqueued remain unacknowledged, + // waiting for the user to consume and acknowledge them normally. + ackBitSet = BitSetRecyclable.create(); + ackBitSet.set(0, batchSize); + for (int i = corruptedStartIndex; i < batchSize; i++) { + ackBitSet.clear(i); + } + } + ByteBuf cmd = Commands.newAck(consumerId, messageId.getLedgerId(), messageId.getEntryId(), + ackBitSet, AckType.Individual, validationError, Collections.emptyMap(), -1); + currentCnx.ctx().writeAndFlush(cmd, currentCnx.ctx().voidPromise()); + if (ackBitSet != null) { + ackBitSet.recycle(); + } + increaseAvailablePermits(currentCnx, batchSize - corruptedStartIndex); + stats.incrementNumReceiveFailed(); } private void discardMessage(MessageIdData messageId, ClientCnx currentCnx, ValidationError validationError, diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageDecodeFailureTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageDecodeFailureTest.java index b146bd43fe0a9..b3214fe467375 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageDecodeFailureTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageDecodeFailureTest.java @@ -19,17 +19,26 @@ package org.apache.pulsar.client.impl; import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.DeadLetterPolicy; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.util.ExecutorProvider; @@ -38,14 +47,24 @@ import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.SingleMessageMetadata; import org.apache.pulsar.common.protocol.Commands; +import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; /** - * Tests for decode failure scenarios in - * {@link ConsumerImpl#receiveIndividualMessagesFromBatch}. + * Tests for {@link ConsumerImpl#receiveIndividualMessagesFromBatch}. * + *

Covers the following scenarios: + *

    + *
  • Decode failure: corrupted, partial-corrupt, empty, truncated payloads
  • + *
  • Normal path: all valid messages enqueued successfully
  • + *
  • Duplicate detection: messages flagged as duplicate are skipped
  • + *
  • Dead letter policy: messages exceeding max redelivery count
  • + *
+ * + *

Uses the same client-side mocking infrastructure as {@link ConsumerImplTest} + * (via {@link ClientTestFixtures}) without requiring a running broker. */ public class BatchMessageDecodeFailureTest { @@ -176,45 +195,253 @@ batchBuffer, new MessageIdData().setLedgerId(2000).setEntryId(2), verify(statsRecorder, timeout(2000)).incrementNumReceiveFailed(); } + // ==================== Normal path tests ==================== + + /** + * All messages in the batch are valid and properly serialized. + * Expected: all messages are enqueued via executeNotifyCallback, + * no receive-failed counter is incremented, and skippedMessages == 0 + * so increaseAvailablePermits is NOT called with extra skipped count. + */ + @Test + public void testAllValidMessagesEnqueued() throws Exception { + BrokerEntryMetadata brokerEntryMetadata = + new BrokerEntryMetadata().setBrokerTimestamp(1).setIndex(1); + + int batchSize = 3; + MessageMetadata metadata = new MessageMetadata() + .setProducerName("test-producer") + .setSequenceId(1) + .setPublishTime(1) + .setNumMessagesInBatch(batchSize); + + ByteBuf batchBuffer = Unpooled.buffer(1024); + + // Serialize 3 valid messages + for (int i = 0; i < batchSize; i++) { + Commands.serializeSingleMessageInBatchWithPayload( + new SingleMessageMetadata().setPartitionKey("key-" + i), + Unpooled.wrappedBuffer(("payload-" + i).getBytes()), batchBuffer); + } + + consumer.receiveIndividualMessagesFromBatch(brokerEntryMetadata, metadata, 0, null, + batchBuffer, new MessageIdData().setLedgerId(5000).setEntryId(1), + mockCnx, DEFAULT_CONSUMER_EPOCH, false); + + // All 3 messages should eventually be enqueued through the async executor. + // Use Awaitility since executeNotifyCallback dispatches via internalPinnedExecutor. + Awaitility.await().untilAsserted(() -> + assertEquals(consumer.numMessagesInQueue(), batchSize, + "All valid messages should be enqueued")); + + // No receive-failed counter should be incremented + verify(statsRecorder, never()).incrementNumReceiveFailed(); + } + + // ==================== Duplicate detection tests ==================== + /** - * Truncated single-message payload: metadata size is valid but payload - * data is truncated (fewer bytes than declared payloadSize). - * Expected: exception during retainedSlice, entire batch discarded. + * When acknowledgmentsGroupingTracker.isDuplicate() returns true for a message, + * that message should be skipped (skippedMessages++) and not enqueued. */ @Test - public void testTruncatedSingleMessagePayload() { + public void testDuplicateMessagesAreSkipped() throws Exception { + // Inject a mock acknowledgmentsGroupingTracker that marks all messages as duplicates + AcknowledgmentsGroupingTracker mockTracker = mock(AcknowledgmentsGroupingTracker.class); + when(mockTracker.isDuplicate(any(MessageId.class))).thenReturn(true); + FieldUtils.writeField(consumer, "acknowledgmentsGroupingTracker", mockTracker, true); + BrokerEntryMetadata brokerEntryMetadata = new BrokerEntryMetadata().setBrokerTimestamp(1).setIndex(1); + int batchSize = 3; MessageMetadata metadata = new MessageMetadata() .setProducerName("test-producer") .setSequenceId(1) .setPublishTime(1) - .setNumMessagesInBatch(1); - - ByteBuf truncatedBuffer = Unpooled.buffer(100); - - // Write a valid SingleMessageMetadata with a large payloadSize, - // but provide fewer actual bytes than declared. - // This causes retainedSlice(readerIndex, 9999) to fail with - // IndexOutOfBoundsException. - SingleMessageMetadata smm = new SingleMessageMetadata() - .setPartitionKey("key-truncated") - .setPayloadSize(9999); // Claims 9999 bytes of payload - truncatedBuffer.writeInt(smm.getSerializedSize()); - smm.writeTo(truncatedBuffer); - // Only write 5 bytes instead of 9999 - truncatedBuffer.writeBytes(new byte[5]); + .setNumMessagesInBatch(batchSize); + + ByteBuf batchBuffer = Unpooled.buffer(1024); + for (int i = 0; i < batchSize; i++) { + Commands.serializeSingleMessageInBatchWithPayload( + new SingleMessageMetadata().setPartitionKey("key-" + i), + Unpooled.wrappedBuffer(("payload-" + i).getBytes()), batchBuffer); + } consumer.receiveIndividualMessagesFromBatch(brokerEntryMetadata, metadata, 0, null, - truncatedBuffer, new MessageIdData().setLedgerId(4000).setEntryId(4), + batchBuffer, new MessageIdData().setLedgerId(5002).setEntryId(3), mockCnx, DEFAULT_CONSUMER_EPOCH, false); - // No messages should be enqueued + // All messages are duplicates, none should be enqueued assertEquals(consumer.numMessagesInQueue(), 0, - "Truncated payload should not produce any messages in queue"); + "Duplicate messages should not be enqueued"); - // The receive-failed counter should have been incremented - verify(statsRecorder, timeout(2000)).incrementNumReceiveFailed(); + // No receive-failed since there is no decode error + verify(statsRecorder, never()).incrementNumReceiveFailed(); + + // isDuplicate should have been called for each message + verify(mockTracker, times(batchSize)).isDuplicate(any(MessageId.class)); + } + + /** + * When only the second message out of 3 is a duplicate, messages at + * index 0 and 2 should be enqueued, message at index 1 is skipped. + */ + @Test + public void testPartialDuplicateMessages() throws Exception { + AcknowledgmentsGroupingTracker mockTracker = mock(AcknowledgmentsGroupingTracker.class); + // Only the second call returns true (message at index 1 is duplicate) + when(mockTracker.isDuplicate(any(MessageId.class))) + .thenReturn(false) + .thenReturn(true) + .thenReturn(false); + FieldUtils.writeField(consumer, "acknowledgmentsGroupingTracker", mockTracker, true); + + BrokerEntryMetadata brokerEntryMetadata = + new BrokerEntryMetadata().setBrokerTimestamp(1).setIndex(1); + + int batchSize = 3; + MessageMetadata metadata = new MessageMetadata() + .setProducerName("test-producer") + .setSequenceId(1) + .setPublishTime(1) + .setNumMessagesInBatch(batchSize); + + ByteBuf batchBuffer = Unpooled.buffer(1024); + for (int i = 0; i < batchSize; i++) { + Commands.serializeSingleMessageInBatchWithPayload( + new SingleMessageMetadata().setPartitionKey("key-" + i), + Unpooled.wrappedBuffer(("payload-" + i).getBytes()), batchBuffer); + } + + consumer.receiveIndividualMessagesFromBatch(brokerEntryMetadata, metadata, 0, null, + batchBuffer, new MessageIdData().setLedgerId(5003).setEntryId(4), + mockCnx, DEFAULT_CONSUMER_EPOCH, false); + + // 2 non-duplicate messages should be enqueued + Awaitility.await().untilAsserted(() -> + assertEquals(consumer.numMessagesInQueue(), 2, + "Only non-duplicate messages should be enqueued")); + + verify(statsRecorder, never()).incrementNumReceiveFailed(); + } + + // ==================== Dead letter policy tests ==================== + + /** + * Helper to create a consumer with dead letter policy enabled. + * The maxRedeliverCount is set so that we can test the dead letter branches. + */ + private ConsumerImpl createConsumerWithDeadLetterPolicy(int maxRedeliverCount) throws Exception { + ExecutorProvider dlqExecutorProvider = new ExecutorProvider(1, "DLQ-Test"); + ExecutorService dlqInternalExecutor = Executors.newSingleThreadScheduledExecutor(); + + PulsarClientImpl client = ClientTestFixtures.createPulsarClientMockWithMockedClientCnx( + dlqExecutorProvider, dlqInternalExecutor, mockCnx); + ClientConfigurationData clientConf = client.getConfiguration(); + clientConf.setOperationTimeoutMs(100); + clientConf.setStatsIntervalSeconds(1); + + ConsumerConfigurationData consumerConf = new ConsumerConfigurationData<>(); + consumerConf.setSubscriptionName("test-sub"); + consumerConf.setDeadLetterPolicy( + DeadLetterPolicy.builder() + .maxRedeliverCount(maxRedeliverCount) + .build()); + + CompletableFuture> subscribeFuture = new CompletableFuture<>(); + ConsumerImpl dlqConsumer = ConsumerImpl.newConsumerImpl(client, TOPIC, consumerConf, + dlqExecutorProvider, -1, false, subscribeFuture, null, null, null, + true); + dlqConsumer.setState(HandlerState.State.Ready); + dlqConsumer.setClientCnx(mockCnx); + + // Inject stats spy + ConsumerStatsRecorderImpl dlqStats = spy(new ConsumerStatsRecorderImpl(dlqConsumer)); + FieldUtils.writeField(dlqConsumer, "stats", dlqStats, true); + + return dlqConsumer; + } + + /** + * When redeliveryCount == maxRedeliverCount, messages should be added to + * possibleSendToDeadLetterTopicMessages but still enqueued (not skipped). + */ + @Test + @SuppressWarnings("unchecked") + public void testDeadLetterPolicyAtMaxRedeliverCount() throws Exception { + int maxRedeliverCount = 3; + ConsumerImpl dlqConsumer = createConsumerWithDeadLetterPolicy(maxRedeliverCount); + + BrokerEntryMetadata brokerEntryMetadata = + new BrokerEntryMetadata().setBrokerTimestamp(1).setIndex(1); + + int batchSize = 2; + MessageMetadata metadata = new MessageMetadata() + .setProducerName("test-producer") + .setSequenceId(1) + .setPublishTime(1) + .setNumMessagesInBatch(batchSize); + + ByteBuf batchBuffer = Unpooled.buffer(1024); + for (int i = 0; i < batchSize; i++) { + Commands.serializeSingleMessageInBatchWithPayload( + new SingleMessageMetadata().setPartitionKey("key-" + i), + Unpooled.wrappedBuffer(("payload-" + i).getBytes()), batchBuffer); + } + + // redeliveryCount == maxRedeliverCount (3) + dlqConsumer.receiveIndividualMessagesFromBatch(brokerEntryMetadata, metadata, + maxRedeliverCount, null, batchBuffer, + new MessageIdData().setLedgerId(6000).setEntryId(1), + mockCnx, DEFAULT_CONSUMER_EPOCH, false); + + // Messages should be enqueued (redeliveryCount == max, not > max) + Awaitility.await().untilAsserted(() -> + assertEquals(dlqConsumer.numMessagesInQueue(), batchSize, + "Messages at maxRedeliverCount should still be enqueued")); + + // Verify possibleSendToDeadLetterTopicMessages is populated + Map deadLetterMap = (Map) FieldUtils.readField( + dlqConsumer, "possibleSendToDeadLetterTopicMessages", true); + assertFalse(deadLetterMap.isEmpty(), + "possibleSendToDeadLetterTopicMessages should be populated at maxRedeliverCount"); + } + + /** + * When redeliveryCount > maxRedeliverCount, messages should be skipped + * (not enqueued) and redeliverUnacknowledgedMessages should be triggered. + */ + @Test + public void testDeadLetterPolicyExceedsMaxRedeliverCount() throws Exception { + int maxRedeliverCount = 3; + ConsumerImpl dlqConsumer = createConsumerWithDeadLetterPolicy(maxRedeliverCount); + + BrokerEntryMetadata brokerEntryMetadata = + new BrokerEntryMetadata().setBrokerTimestamp(1).setIndex(1); + + int batchSize = 2; + MessageMetadata metadata = new MessageMetadata() + .setProducerName("test-producer") + .setSequenceId(1) + .setPublishTime(1) + .setNumMessagesInBatch(batchSize); + + ByteBuf batchBuffer = Unpooled.buffer(1024); + for (int i = 0; i < batchSize; i++) { + Commands.serializeSingleMessageInBatchWithPayload( + new SingleMessageMetadata().setPartitionKey("key-" + i), + Unpooled.wrappedBuffer(("payload-" + i).getBytes()), batchBuffer); + } + + // redeliveryCount > maxRedeliverCount (4 > 3) + dlqConsumer.receiveIndividualMessagesFromBatch(brokerEntryMetadata, metadata, + maxRedeliverCount + 1, null, batchBuffer, + new MessageIdData().setLedgerId(6001).setEntryId(2), + mockCnx, DEFAULT_CONSUMER_EPOCH, false); + + // Messages should be skipped (redeliveryCount > max) + assertEquals(dlqConsumer.numMessagesInQueue(), 0, + "Messages exceeding maxRedeliverCount should be skipped"); } }