[fix][client] Fixed stuck issues caused by an exception encountered in the receiveIndividualMessagesFromBatch()#25386
[fix][client] Fixed stuck issues caused by an exception encountered in the receiveIndividualMessagesFromBatch()#25386gosonzhang wants to merge 5 commits intoapache:masterfrom
Conversation
…while processing batch messages
|
/pulsarbot rerun-failure-checks |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #25386 +/- ##
============================================
+ Coverage 72.72% 72.77% +0.04%
- Complexity 34277 34296 +19
============================================
Files 1954 1954
Lines 154857 154897 +40
Branches 17739 17745 +6
============================================
+ Hits 112627 112724 +97
+ Misses 33197 33132 -65
- Partials 9033 9041 +8
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
|
@lhotari Could you please take another look? I've made the modifications according to your suggestions. |
| if (ackBitSet != null) { | ||
| ackBitSet.recycle(); | ||
| } | ||
| increaseAvailablePermits(currentCnx, batchSize - corruptedStartIndex); |
There was a problem hiding this comment.
It is not advisable to directly use batchSize - (skipped + processed) to replenish permits. The receiveIndividualMessagesFromBatch() method will filter out already-acknowledged batch indices at the beginning based on the inbound ackSet; these indices have not consumed broker permits. If this is a partial batch redelivery, the current algorithm would include the undelivered indices, potentially leading to permit over-credit.
It might be better to pass in the current inbound ackBitSet, count only the bits after the failing index that are still deliverable, and construct a new ack set based on that.
| } catch (IllegalStateException e) { | ||
| log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName, e); | ||
| discardCorruptedMessage(messageId, cnx, ValidationError.BatchDeSerializeError); | ||
| } catch (IllegalStateException | IllegalArgumentException e) { |
There was a problem hiding this comment.
Commands.deSerializeSingleMessageInBatch() may also throw IndexOutOfBoundsException for truncated payload (triggered by readUnsignedInt() or retainedSlice()), whereas newSingleMessage() currently does not wrap it into IllegalStateException. As a result, encountering a corrupted batch with truncation or length mismatch can still bypass discardCorruptedBatchMessage, and permit leak or stuck issues would persist.
Motivation
Fixed three issues with
receiveIndividualMessagesFromBatch()parsing failures:On the consumer side, corrupted batch messages that fail to parse based on formats formed by [a] or other conditions should be discarded to avoid consumption stucks.
When parsing a batch of messages and an exception is thrown in the middle, the permits (minus 1) of the remaining unprocessed messages are all leaked, causing consumption stucks.
When batch index ack is enabled, messages being processed will be confirmed in advance in the event of a parsing exception.
[a]. #24061
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
none