Skip to content

Commit 2c2d3a6

Browse files
committed
[fix] Fix consumer doesn't acknowledge all chunk message Ids
1 parent b242e1a commit 2c2d3a6

5 files changed

Lines changed: 63 additions & 7 deletions

File tree

lib/AckGroupingTracker.cc

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121

2222
#include <atomic>
2323
#include <limits>
24+
#include <set>
2425

2526
#include "BitSet.h"
27+
#include "ChunkMessageIdImpl.h"
2628
#include "ClientConnection.h"
2729
#include "Commands.h"
2830
#include "LogUtils.h"
@@ -42,6 +44,14 @@ void AckGroupingTracker::doImmediateAck(const MessageId& msgId, ResultCallback c
4244
}
4345
return;
4446
}
47+
if (auto chunkMessageId =
48+
std::dynamic_pointer_cast<ChunkMessageIdImpl>(Commands::getMessageIdImpl(msgId))) {
49+
auto msgIdList = chunkMessageId->moveChunkedMessageIds();
50+
doImmediateAck(std::set<MessageId>(std::make_move_iterator(msgIdList.begin()),
51+
std::make_move_iterator(msgIdList.end())),
52+
callback);
53+
return;
54+
}
4555
const auto& ackSet = Commands::getMessageIdImpl(msgId)->getBitSet();
4656
if (waitResponse_) {
4757
const auto requestId = requestIdSupplier_();
@@ -84,29 +94,43 @@ void AckGroupingTracker::doImmediateAck(const std::set<MessageId>& msgIds, Resul
8494
return;
8595
}
8696

97+
std::set<MessageId> ackMsgIds;
98+
99+
for (const auto& msgId : msgIds) {
100+
if (auto chunkMessageId =
101+
std::dynamic_pointer_cast<ChunkMessageIdImpl>(Commands::getMessageIdImpl(msgId))) {
102+
auto msgIdList = chunkMessageId->moveChunkedMessageIds();
103+
doImmediateAck(std::set<MessageId>(std::make_move_iterator(msgIdList.begin()),
104+
std::make_move_iterator(msgIdList.end())),
105+
callback);
106+
} else {
107+
ackMsgIds.insert(msgId);
108+
}
109+
}
110+
87111
if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) {
88112
if (waitResponse_) {
89113
const auto requestId = requestIdSupplier_();
90-
cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, msgIds, requestId), requestId)
114+
cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, ackMsgIds, requestId), requestId)
91115
.addListener([callback](Result result, const ResponseData&) {
92116
if (callback) {
93117
callback(result);
94118
}
95119
});
96120
} else {
97-
cnx->sendCommand(Commands::newMultiMessageAck(consumerId_, msgIds));
121+
cnx->sendCommand(Commands::newMultiMessageAck(consumerId_, ackMsgIds));
98122
if (callback) {
99123
callback(ResultOk);
100124
}
101125
}
102126
} else {
103-
auto count = std::make_shared<std::atomic<size_t>>(msgIds.size());
127+
auto count = std::make_shared<std::atomic<size_t>>(ackMsgIds.size());
104128
auto wrappedCallback = [callback, count](Result result) {
105129
if (--*count == 0 && callback) {
106130
callback(result);
107131
}
108132
};
109-
for (auto&& msgId : msgIds) {
133+
for (auto&& msgId : ackMsgIds) {
110134
doImmediateAck(msgId, wrappedCallback, CommandAck_AckType_Individual);
111135
}
112136
}

lib/ChunkMessageIdImpl.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,22 @@ class ChunkMessageIdImpl : public MessageIdImpl, public std::enable_shared_from_
3838
this->partition_ = msgId.partition();
3939
}
4040

41+
void setChunkedMessageIds(std::vector<MessageId>&& chunkedMessageIds) {
42+
chunkedMessageIds_ = std::move(chunkedMessageIds);
43+
setFirstChunkMessageId(chunkedMessageIds_.front());
44+
setLastChunkMessageId(chunkedMessageIds_.back());
45+
}
46+
4147
std::shared_ptr<const MessageIdImpl> getFirstChunkMessageId() const { return firstChunkMsgId_; }
4248

49+
std::vector<MessageId> moveChunkedMessageIds() const noexcept {
50+
return std::move(chunkedMessageIds_);
51+
}
52+
4353
MessageId build() { return MessageId{std::dynamic_pointer_cast<MessageIdImpl>(shared_from_this())}; }
4454

4555
private:
4656
std::shared_ptr<MessageIdImpl> firstChunkMsgId_;
57+
std::vector<MessageId> chunkedMessageIds_;
4758
};
4859
} // namespace pulsar

lib/ConsumerImpl.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -479,8 +479,7 @@ boost::optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuff
479479
}
480480

481481
ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>();
482-
chunkMsgId->setFirstChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().front());
483-
chunkMsgId->setLastChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().back());
482+
chunkMsgId->setChunkedMessageIds(chunkedMsgCtx.moveChunkedMessageIds());
484483
messageId = chunkMsgId->build();
485484

486485
LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", ChunkedMessageCtx: " << chunkedMsgCtx
@@ -1165,6 +1164,9 @@ bool ConsumerImpl::isCumulativeAcknowledgementAllowed(ConsumerType consumerType)
11651164

11661165
std::pair<MessageId, bool> ConsumerImpl::prepareIndividualAck(const MessageId& messageId) {
11671166
auto messageIdImpl = Commands::getMessageIdImpl(messageId);
1167+
if (std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageIdImpl)) {
1168+
return std::make_pair(messageId, true);
1169+
}
11681170
auto batchedMessageIdImpl = std::dynamic_pointer_cast<BatchedMessageIdImpl>(messageIdImpl);
11691171

11701172
auto batchSize = messageId.batchSize();

lib/ConsumerImpl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,10 @@ class ConsumerImpl : public ConsumerImplBase {
270270

271271
const std::vector<MessageId>& getChunkedMessageIds() const noexcept { return chunkedMessageIds_; }
272272

273+
std::vector<MessageId> moveChunkedMessageIds() const noexcept {
274+
return std::move(chunkedMessageIds_);
275+
}
276+
273277
long getReceivedTimeMs() const noexcept { return receivedTimeMs_; }
274278

275279
friend std::ostream& operator<<(std::ostream& os, const ChunkedMessageCtx& ctx) {

tests/MessageChunkingTest.cc

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,9 @@ class MessageChunkingTest : public ::testing::TestWithParam<CompressionType> {
8181
}
8282

8383
void createConsumer(const std::string& topic, Consumer& consumer) {
84-
ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", consumer));
84+
ConsumerConfiguration conf;
85+
conf.setBrokerConsumerStatsCacheTimeInMs(1000);
86+
ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", conf, consumer));
8587
}
8688

8789
void createConsumer(const std::string& topic, Consumer& consumer, ConsumerConfiguration& conf) {
@@ -138,6 +140,7 @@ TEST_P(MessageChunkingTest, testEndToEnd) {
138140
std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(messageId));
139141
ASSERT_TRUE(chunkMsgId);
140142
receivedMessageIds.emplace_back(messageId);
143+
consumer.acknowledge(messageId);
141144
}
142145
ASSERT_EQ(receivedMessageIds, sendMessageIds);
143146
ASSERT_EQ(receivedMessageIds.front().ledgerId(), receivedMessageIds.front().ledgerId());
@@ -147,6 +150,18 @@ TEST_P(MessageChunkingTest, testEndToEnd) {
147150
auto& chunkedMessageCache = PulsarFriend::getChunkedMessageCache(consumer);
148151
ASSERT_EQ(chunkedMessageCache.size(), 0);
149152

153+
BrokerConsumerStats consumerStats;
154+
waitUntil(
155+
std::chrono::seconds(10),
156+
[&] {
157+
if (consumer.getBrokerConsumerStats(consumerStats) != ResultOk) {
158+
return false;
159+
}
160+
return consumerStats.getMsgBacklog() == 0;
161+
},
162+
1000);
163+
ASSERT_EQ(consumerStats.getMsgBacklog(), 0);
164+
150165
producer.close();
151166
consumer.close();
152167
}

0 commit comments

Comments
 (0)