@@ -488,19 +488,70 @@ optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& pay
488488 LOG_DEBUG (" Process message chunk (chunkId: " << chunkId << " , uuid: " << uuid
489489 << " , messageId: " << messageId << " ) of "
490490 << payload.readableBytes () << " bytes" );
491-
492491 Lock lock (chunkProcessMutex_);
493-
492+ // For non-last chunks, increase available permits immediately since they don't occupy receiver queue.
493+ if (chunkId != metadata.num_chunks_from_msg () - 1 ) {
494+ increaseAvailablePermits (cnx);
495+ }
494496 // Lazy task scheduling to expire incomplete chunk message
495497 bool expected = false ;
496498 if (expireTimeOfIncompleteChunkedMessageMs_ > 0 &&
497499 expireChunkMessageTaskScheduled_.compare_exchange_strong (expected, true )) {
498500 triggerCheckExpiredChunkedTimer ();
499501 }
500502
503+ // Part 1: chunkId == 0, this is the first chunk of a chunked message.
504+ // If a previous incomplete context with the same uuid exists, it means either:
505+ // a) Message redeliver: the first chunk's messageId matches one of the cached chunk messageIds.
506+ // In this case, the old context is simply removed and a new context is created to restart
507+ // assembling from scratch. No ack is needed since the old chunks will be redelivered.
508+ // b) Corrupted chunk message: the first chunk's messageId does NOT match any cached chunk messageId,
509+ // meaning a new producer sent a message with the same uuid. In this case, ack the old cached
510+ // chunks to avoid ack holes, then remove the old context and create a new one.
511+ // After handling the old context, check maxPendingChunkedMessage limit and create a new context.
501512 auto it = chunkedMessageCache_.find (uuid);
502-
503- if (chunkId == 0 && it == chunkedMessageCache_.end ()) {
513+ if (chunkId == 0 ) {
514+ // Handle ack hole when receiving duplicated first chunk.
515+ // For example (message redeliver):
516+ // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
517+ // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
518+ // Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:1
519+ // Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:2
520+ // Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:3
521+ // For example (corrupted chunk message):
522+ // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
523+ // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
524+ // Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:3
525+ // Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
526+ // Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
527+ if (it != chunkedMessageCache_.end ()) {
528+ auto & existingCtx = it->second ;
529+ bool isCorruptedChunkMessage = true ;
530+ for (const MessageId& cachedMsgId : existingCtx.getChunkedMessageIds ()) {
531+ if (cachedMsgId.ledgerId () == messageId.ledgerId () &&
532+ cachedMsgId.entryId () == messageId.entryId ()) {
533+ isCorruptedChunkMessage = false ;
534+ break ;
535+ }
536+ }
537+ if (isCorruptedChunkMessage) {
538+ for (const MessageId& cachedMsgId : existingCtx.getChunkedMessageIds ()) {
539+ LOG_INFO (" Acking corrupted chunk message to avoid ack hole, uuid: "
540+ << uuid << " , messageId: " << cachedMsgId);
541+ acknowledgeAsync (cachedMsgId, [uuid, cachedMsgId](Result result) {
542+ if (result != ResultOk) {
543+ LOG_ERROR (" Failed to acknowledge corrupted chunk, uuid: "
544+ << uuid << " , messageId: " << cachedMsgId);
545+ }
546+ });
547+ }
548+ }
549+ LOG_WARN (" Received a duplicated first chunk (uuid: "
550+ << uuid << " , messageId: " << messageId
551+ << " ). Remove previous chunk context and restart assembling" );
552+ chunkedMessageCache_.remove (uuid);
553+ it = chunkedMessageCache_.end ();
554+ }
504555 if (maxPendingChunkedMessage_ > 0 && chunkedMessageCache_.size () >= maxPendingChunkedMessage_) {
505556 chunkedMessageCache_.removeOldestValues (
506557 chunkedMessageCache_.size () - maxPendingChunkedMessage_ + 1 ,
@@ -512,45 +563,104 @@ optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& pay
512563 }
513564 it = chunkedMessageCache_.putIfAbsent (
514565 uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg (), metadata.total_chunk_msg_size ()});
566+ it->second .appendChunk (messageId, payload);
567+ lock.unlock ();
568+ return {};
515569 }
516570
517- auto & chunkedMsgCtx = it->second ;
518- if (it == chunkedMessageCache_.end () || !chunkedMsgCtx.validateChunkId (chunkId)) {
571+ // Part 2: chunkId != 0 but chunk context not found in cache.
572+ // This happens when the first chunk was not received (e.g., consumer used seek() or started
573+ // consuming from a specific message position that falls in the middle of a chunked message,
574+ // or the context was evicted due to maxPendingChunkedMessage limit).
575+ // In this case, the chunk message cannot be assembled, so just discard it.
576+ if (it == chunkedMessageCache_.end ()) {
519577 auto startMessageId = getStartMessageId ();
520578 if (!config_.isStartMessageIdInclusive () && startMessageId &&
521579 startMessageId->ledgerId () == messageId.ledgerId () &&
522580 startMessageId->entryId () == messageId.entryId ()) {
523- // When the start message id is not inclusive, the last chunk of the previous chunked message will
524- // be delivered, which is expected and we only need to filter it out.
525- chunkedMessageCache_.remove (uuid);
526581 LOG_INFO (" Filtered the chunked message before the start message id (uuid: "
527582 << uuid << " chunkId: " << chunkId << " , messageId: " << messageId << " )" );
528- } else if (it == chunkedMessageCache_. end ()) {
583+ } else {
529584 LOG_ERROR (" Received an uncached chunk (uuid: " << uuid << " chunkId: " << chunkId
530585 << " , messageId: " << messageId << " )" );
531- } else {
532- LOG_ERROR (" Received a chunk whose chunk id is invalid (uuid: "
533- << uuid << " chunkId: " << chunkId << " , messageId: " << messageId << " )" );
534- chunkedMessageCache_.remove (uuid);
535586 }
536587 lock.unlock ();
537- increaseAvailablePermits (cnx);
538- trackMessage (messageId);
539588 return {};
540589 }
541590
591+ // Part 3: chunkId does not match the expected next chunk ID (out-of-order).
592+ // Two sub-cases:
593+ // a) chunkId <= lastChunkedMessageId: duplicated chunk caused by redeliver or corruption.
594+ // Filter and ack the duplicated chunk if it's corrupted, then discard it.
595+ // b) chunkId > lastChunkedMessageId + 1: gap detected, the chunked message is corrupted.
596+ // Remove the context and ack the current chunk if it has expired.
597+ auto & chunkedMsgCtx = it->second ;
598+ if (!chunkedMsgCtx.validateChunkId (chunkId)) {
599+ const int lastChunkedMessageId = static_cast <int >(chunkedMsgCtx.getChunkedMessageIds ().size ()) - 1 ;
600+ // For example (duplicated chunk):
601+ // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
602+ // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
603+ // Chunk-3 sequence ID: 0, chunk ID: 2, msgID: 1:3
604+ // Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
605+ // Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
606+ // Chunk-6 sequence ID: 0, chunk ID: 3, msgID: 1:6
607+ if (chunkId <= lastChunkedMessageId) {
608+ bool isCorruptedChunk = true ;
609+ for (const MessageId& cachedMsgId : chunkedMsgCtx.getChunkedMessageIds ()) {
610+ if (cachedMsgId.ledgerId () == messageId.ledgerId () &&
611+ cachedMsgId.entryId () == messageId.entryId ()) {
612+ isCorruptedChunk = false ;
613+ break ;
614+ }
615+ }
616+ LOG_WARN (" Received a duplicated chunk message (uuid: "
617+ << uuid << " chunkId: " << chunkId << " , lastChunkedMessageId: " << lastChunkedMessageId
618+ << " , messageId: " << messageId << " )" );
619+ if (isCorruptedChunk) {
620+ LOG_INFO (" Acking corrupted duplicated chunk to avoid ack hole, uuid: "
621+ << uuid << " , messageId: " << messageId);
622+ acknowledgeAsync (messageId, [uuid, messageId](Result result) {
623+ if (result != ResultOk) {
624+ LOG_WARN (" Failed to acknowledge duplicated chunk, uuid: "
625+ << uuid << " , messageId: " << messageId);
626+ }
627+ });
628+ }
629+ lock.unlock ();
630+ return {};
631+ }
632+ // chunkId > lastChunkedMessageId + 1, the chunked message is corrupted.
633+ LOG_WARN (" Received unexpected chunk (uuid: "
634+ << uuid << " chunkId: " << chunkId << " , lastChunkedMessageId: " << lastChunkedMessageId
635+ << " , messageId: " << messageId << " )" );
636+ chunkedMessageCache_.remove (uuid);
637+ lock.unlock ();
638+ if (expireTimeOfIncompleteChunkedMessageMs_ > 0 &&
639+ TimeUtils::currentTimeMillis () >
640+ static_cast <long >(metadata.publish_time ()) + expireTimeOfIncompleteChunkedMessageMs_) {
641+ LOG_INFO (" Acking corrupted gap chunk to avoid ack hole, uuid: "
642+ << uuid << " , messageId: " << messageId);
643+ acknowledgeAsync (messageId, [uuid, messageId](Result result) {
644+ if (result != ResultOk) {
645+ LOG_WARN (" Failed to acknowledge gap chunk, uuid: " << uuid
646+ << " , messageId: " << messageId);
647+ }
648+ });
649+ }
650+ return {};
651+ }
652+
653+ // Part 4: chunkId matches the expected next chunk ID, append chunk payload.
654+ // If all chunks have been received, assemble the full message, build a ChunkMessageId
655+ // containing all individual chunk message IDs, and return the uncompressed payload.
542656 chunkedMsgCtx.appendChunk (messageId, payload);
543657 if (!chunkedMsgCtx.isCompleted ()) {
544658 lock.unlock ();
545- increaseAvailablePermits (cnx);
546659 return {};
547660 }
548-
549661 messageId = std::make_shared<ChunkMessageIdImpl>(chunkedMsgCtx.moveChunkedMessageIds ())->build ();
550-
551662 LOG_DEBUG (" Chunked message completed chunkId: " << chunkId << " , ChunkedMessageCtx: " << chunkedMsgCtx
552663 << " , sequenceId: " << metadata.sequence_id ());
553-
554664 auto wholePayload = chunkedMsgCtx.getBuffer ();
555665 chunkedMessageCache_.remove (uuid);
556666 if (uncompressMessageIfNeeded (cnx, messageIdData, metadata, wholePayload, false )) {
0 commit comments