Skip to content

Commit 015c93f

Browse files
committed
[fix][client] Added increaseAvailablePermits(cnx) for the last chunk case to prevent permit leak
1 parent 33e6aa9 commit 015c93f

1 file changed

Lines changed: 12 additions & 1 deletion

File tree

lib/ConsumerImpl.cc

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -584,6 +584,11 @@ optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& pay
584584
LOG_ERROR("Received an uncached chunk (uuid: " << uuid << " chunkId: " << chunkId
585585
<< ", messageId: " << messageId << ")");
586586
}
587+
// If this is the last chunk, its permit was not returned at the entry of processMessageChunk,
588+
// so we need to return it here to avoid permit leak.
589+
if (chunkId == metadata.num_chunks_from_msg() - 1) {
590+
increaseAvailablePermits(cnx);
591+
}
587592
lock.unlock();
588593
return {};
589594
}
@@ -616,6 +621,7 @@ optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& pay
616621
LOG_WARN("Received a duplicated chunk message (uuid: "
617622
<< uuid << " chunkId: " << chunkId << ", lastChunkedMessageId: " << lastChunkedMessageId
618623
<< ", messageId: " << messageId << ")");
624+
lock.unlock();
619625
if (isCorruptedChunk) {
620626
LOG_INFO("Acking corrupted duplicated chunk to avoid ack hole, uuid: "
621627
<< uuid << ", messageId: " << messageId);
@@ -626,14 +632,18 @@ optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& pay
626632
}
627633
});
628634
}
629-
lock.unlock();
630635
return {};
631636
}
632637
// chunkId > lastChunkedMessageId + 1, the chunked message is corrupted.
633638
LOG_WARN("Received unexpected chunk (uuid: "
634639
<< uuid << " chunkId: " << chunkId << ", lastChunkedMessageId: " << lastChunkedMessageId
635640
<< ", messageId: " << messageId << ")");
636641
chunkedMessageCache_.remove(uuid);
642+
// If this is the last chunk, its permit was not returned at the entry of processMessageChunk,
643+
// so we need to return it here to avoid permit leak.
644+
if (chunkId == metadata.num_chunks_from_msg() - 1) {
645+
increaseAvailablePermits(cnx);
646+
}
637647
lock.unlock();
638648
if (expireTimeOfIncompleteChunkedMessageMs_ > 0 &&
639649
TimeUtils::currentTimeMillis() >
@@ -663,6 +673,7 @@ optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& pay
663673
<< ", sequenceId: " << metadata.sequence_id());
664674
auto wholePayload = chunkedMsgCtx.getBuffer();
665675
chunkedMessageCache_.remove(uuid);
676+
lock.unlock();
666677
if (uncompressMessageIfNeeded(cnx, messageIdData, metadata, wholePayload, false)) {
667678
return wholePayload;
668679
} else {

0 commit comments

Comments
 (0)