Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1786,6 +1786,7 @@ void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata,

SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
int skippedMessages = 0;
int processedMessages = 0;
try {
for (int i = 0; i < batchSize; ++i) {
final MessageImpl<T> message = newSingleMessage(i, batchSize, brokerEntryMetadata, msgMetadata,
Expand Down Expand Up @@ -1815,13 +1816,15 @@ void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata,
continue;
}
executeNotifyCallback(message);
processedMessages++;
}
if (ackBitSet != null) {
ackBitSet.recycle();
}
} catch (IllegalStateException e) {
log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName, e);
discardCorruptedMessage(messageId, cnx, ValidationError.BatchDeSerializeError);
Comment thread
gosonzhang marked this conversation as resolved.
} catch (IllegalStateException | IllegalArgumentException e) {
Comment thread
gosonzhang marked this conversation as resolved.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commands.deSerializeSingleMessageInBatch() may also throw IndexOutOfBoundsException for truncated payload (triggered by readUnsignedInt() or retainedSlice()), whereas newSingleMessage() currently does not wrap it into IllegalStateException. As a result, encountering a corrupted batch with truncation or length mismatch can still bypass discardCorruptedBatchMessage, and permit leak or stuck issues would persist.

// For IllegalArgumentException see PR: https://github.com/apache/pulsar/pull/24061
discardCorruptedBatchMessage(messageId, cnx, batchSize,
skippedMessages, processedMessages, ValidationError.BatchDeSerializeError);
}

if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null) {
Expand Down Expand Up @@ -2153,6 +2156,38 @@ private void discardCorruptedMessage(MessageIdData messageId, ClientCnx currentC
discardMessage(messageId, currentCnx, validationError, 1);
}

/**
* When batch index ack is enabled, ack the messages that failed to deserialize by their index,
* while keeping successfully enqueued messages unacknowledged to avoid message loss.
*/
private void discardCorruptedBatchMessage(MessageIdData messageId, ClientCnx currentCnx,
int batchSize, int skipped, int processed, ValidationError validationError) {
log.error("[{}] [{}] Discarding corrupted batch messages with batch index ack at {}:{}, "
+ "batchSize={}, skipped={}, processed={}, exception={}",
subscription, consumerName, messageId.getLedgerId(), messageId.getEntryId(),
batchSize, skipped, processed, validationError);
BitSetRecyclable ackBitSet = null;
int corruptedStartIndex = skipped + processed;
if (conf.isBatchIndexAckEnabled()) {
// When batch index ack is enabled, only ack the messages that failed to deserialize.
// Messages that have been successfully enqueued remain unacknowledged,
// waiting for the user to consume and acknowledge them normally.
ackBitSet = BitSetRecyclable.create();
ackBitSet.set(0, batchSize);
for (int i = corruptedStartIndex; i < batchSize; i++) {
ackBitSet.clear(i);
}
}
ByteBuf cmd = Commands.newAck(consumerId, messageId.getLedgerId(), messageId.getEntryId(),
ackBitSet, AckType.Individual, validationError, Collections.emptyMap(), -1);
currentCnx.ctx().writeAndFlush(cmd, currentCnx.ctx().voidPromise());
if (ackBitSet != null) {
ackBitSet.recycle();
}
increaseAvailablePermits(currentCnx, batchSize - corruptedStartIndex);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not advisable to directly use batchSize - (skipped + processed) to replenish permits. The receiveIndividualMessagesFromBatch() method will filter out already-acknowledged batch indices at the beginning based on the inbound ackSet; these indices have not consumed broker permits. If this is a partial batch redelivery, the current algorithm would include the undelivered indices, potentially leading to permit over-credit.

It might be better to pass in the current inbound ackBitSet, count only the bits after the failing index that are still deliverable, and construct a new ack set based on that.

stats.incrementNumReceiveFailed();
}

private void discardMessage(MessageIdData messageId, ClientCnx currentCnx, ValidationError validationError,
int batchMessages) {
ByteBuf cmd = Commands.newAck(consumerId, messageId.getLedgerId(), messageId.getEntryId(), null,
Expand Down
Loading
Loading