Skip to content

Commit abba991

Browse files
committed
Refactor and refine chunked message id
1 parent 2c2d3a6 commit abba991

6 files changed

Lines changed: 18 additions & 26 deletions

File tree

lib/ChunkMessageIdImpl.h

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,23 +30,17 @@ class ChunkMessageIdImpl : public MessageIdImpl, public std::enable_shared_from_
3030
public:
3131
ChunkMessageIdImpl() : firstChunkMsgId_(std::make_shared<MessageIdImpl>()) {}
3232

33-
void setFirstChunkMessageId(const MessageId& msgId) { *firstChunkMsgId_ = *msgId.impl_; }
34-
35-
void setLastChunkMessageId(const MessageId& msgId) {
36-
this->ledgerId_ = msgId.ledgerId();
37-
this->entryId_ = msgId.entryId();
38-
this->partition_ = msgId.partition();
39-
}
40-
4133
void setChunkedMessageIds(std::vector<MessageId>&& chunkedMessageIds) {
4234
chunkedMessageIds_ = std::move(chunkedMessageIds);
43-
setFirstChunkMessageId(chunkedMessageIds_.front());
44-
setLastChunkMessageId(chunkedMessageIds_.back());
35+
auto lastChunkMsgId = chunkedMessageIds_.back();
36+
this->ledgerId_ = lastChunkMsgId.ledgerId();
37+
this->entryId_ = lastChunkMsgId.entryId();
38+
this->partition_ = lastChunkMsgId.partition();
4539
}
4640

47-
std::shared_ptr<const MessageIdImpl> getFirstChunkMessageId() const { return firstChunkMsgId_; }
41+
std::shared_ptr<const MessageIdImpl> getFirstChunkMessageId() const { return chunkedMessageIds_.front().impl_; }
4842

49-
std::vector<MessageId> moveChunkedMessageIds() const noexcept {
43+
std::vector<MessageId> moveChunkedMessageIds() noexcept {
5044
return std::move(chunkedMessageIds_);
5145
}
5246

lib/ConsumerImpl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ class ConsumerImpl : public ConsumerImplBase {
270270

271271
const std::vector<MessageId>& getChunkedMessageIds() const noexcept { return chunkedMessageIds_; }
272272

273-
std::vector<MessageId> moveChunkedMessageIds() const noexcept {
273+
std::vector<MessageId> moveChunkedMessageIds() noexcept {
274274
return std::move(chunkedMessageIds_);
275275
}
276276

lib/MessageId.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,8 @@ MessageId MessageId::deserialize(const std::string& serializedMessageId) {
100100

101101
if (idData.has_first_chunk_message_id()) {
102102
ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>();
103-
chunkMsgId->setFirstChunkMessageId(MessageIdBuilder::from(idData.first_chunk_message_id()).build());
104-
chunkMsgId->setLastChunkMessageId(msgId);
103+
chunkMsgId->setChunkedMessageIds(
104+
{MessageIdBuilder::from(idData.first_chunk_message_id()).build(), msgId});
105105
return chunkMsgId->build();
106106
}
107107

lib/OpSendMsg.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ struct OpSendMsg {
5555
const SendCallback sendCallback;
5656
std::vector<std::function<void(Result)>> trackerCallbacks;
5757
ChunkMessageIdImplPtr chunkedMessageId;
58+
std::vector<MessageId> chunkMessageIdList;
5859
// Use shared_ptr here because producer might resend the message with the same arguments
5960
const std::shared_ptr<SendArguments> sendArgs;
6061

lib/ProducerImpl.cc

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -886,7 +886,7 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
886886
return true;
887887
}
888888

889-
const auto& op = *pendingMessagesQueue_.front();
889+
auto& op = *pendingMessagesQueue_.front();
890890
if (op.result != ResultOk) {
891891
LOG_ERROR("Unexpected OpSendMsg whose result is " << op.result << " for " << sequenceId << " and "
892892
<< rawMessageId);
@@ -912,10 +912,9 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
912912

913913
if (op.chunkedMessageId) {
914914
// Handling the chunk message id.
915-
if (op.chunkId == 0) {
916-
op.chunkedMessageId->setFirstChunkMessageId(messageId);
917-
} else if (op.chunkId == op.numChunks - 1) {
918-
op.chunkedMessageId->setLastChunkMessageId(messageId);
915+
op.chunkMessageIdList.push_back(messageId);
916+
if (op.chunkId == op.numChunks - 1) {
917+
op.chunkedMessageId->setChunkedMessageIds(std::move(op.chunkMessageIdList));
919918
messageId = op.chunkedMessageId->build();
920919
}
921920
}

tests/MessageChunkingTest.cc

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -154,10 +154,8 @@ TEST_P(MessageChunkingTest, testEndToEnd) {
154154
waitUntil(
155155
std::chrono::seconds(10),
156156
[&] {
157-
if (consumer.getBrokerConsumerStats(consumerStats) != ResultOk) {
158-
return false;
159-
}
160-
return consumerStats.getMsgBacklog() == 0;
157+
return consumer.getBrokerConsumerStats(consumerStats) == ResultOk &&
158+
consumerStats.getMsgBacklog() == 0;
161159
},
162160
1000);
163161
ASSERT_EQ(consumerStats.getMsgBacklog(), 0);
@@ -333,8 +331,8 @@ TEST(ChunkMessageIdTest, testSetChunkMessageId) {
333331
MessageId msgId;
334332
{
335333
ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>();
336-
chunkMsgId->setFirstChunkMessageId(MessageIdBuilder().ledgerId(1).entryId(2).partition(3).build());
337-
chunkMsgId->setLastChunkMessageId(MessageIdBuilder().ledgerId(4).entryId(5).partition(6).build());
334+
chunkMsgId->setChunkedMessageIds({MessageIdBuilder().ledgerId(1).entryId(2).partition(3).build(),
335+
MessageIdBuilder().ledgerId(4).entryId(5).partition(6).build()});
338336
msgId = chunkMsgId->build();
339337
// Test the destructor of the underlying message id should also work for the generated messageId.
340338
}

0 commit comments

Comments
 (0)