Skip to content

Commit d6e9b17

Browse files
committed
Avoid calling serializeSingleMessageInBatchWithPayload each time a message is added
### Motivation Currently, each time a message is added to the batch message container, `serializeSingleMessageInBatchWithPayload` will be called. In this method, if the payload buffer's size is not enough, it will grow twice. After batch is cleared, the payload buffer will be reset. For example, here is a typical buffer size increament during a period of a batch: ``` increase buffer size from 0 to 1033 increase buffer size from 1033 to 2066 increase buffer size from 2066 to 4132 increase buffer size from 3099 to 6198 increase buffer size from 5165 to 10330 increase buffer size from 9297 to 18594 increase buffer size from 17561 to 35122 increase buffer size from 34089 to 68178 increase buffer size from 67145 to 134290 ``` ### Modifications Refactor the `MessageAndCallbackBatch` design, in `add` method, only store the message and callback. Provide a `createOpSendMsg` method to serialize the messages and callbacks into a `OpSendMsg`. (cherry picked from commit 2723e1e)
1 parent cac5e1d commit d6e9b17

8 files changed

Lines changed: 111 additions & 110 deletions

lib/BatchMessageContainer.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,10 @@ void BatchMessageContainer::clear() {
5454
}
5555

5656
std::unique_ptr<OpSendMsg> BatchMessageContainer::createOpSendMsg(const FlushCallback& flushCallback) {
57-
auto op = createOpSendMsgHelper(flushCallback, batch_);
57+
auto op = createOpSendMsgHelper(batch_);
58+
if (flushCallback) {
59+
op->addTrackerCallback(flushCallback);
60+
}
5861
clear();
5962
return op;
6063
}

lib/BatchMessageContainerBase.cc

Lines changed: 3 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,10 @@
1818
*/
1919
#include "BatchMessageContainerBase.h"
2020

21-
#include "ClientConnection.h"
22-
#include "CompressionCodec.h"
2321
#include "MessageAndCallbackBatch.h"
2422
#include "MessageCrypto.h"
25-
#include "MessageImpl.h"
2623
#include "OpSendMsg.h"
2724
#include "ProducerImpl.h"
28-
#include "PulsarApi.pb.h"
2925
#include "SharedBuffer.h"
3026

3127
namespace pulsar {
@@ -40,38 +36,9 @@ BatchMessageContainerBase::BatchMessageContainerBase(const ProducerImpl& produce
4036
BatchMessageContainerBase::~BatchMessageContainerBase() {}
4137

4238
std::unique_ptr<OpSendMsg> BatchMessageContainerBase::createOpSendMsgHelper(
43-
const FlushCallback& flushCallback, const MessageAndCallbackBatch& batch) const {
44-
auto sendCallback = batch.createSendCallback(flushCallback);
45-
if (batch.empty()) {
46-
return OpSendMsg::create(ResultOperationNotSupported, std::move(sendCallback));
47-
}
48-
49-
MessageImplPtr impl = batch.msgImpl();
50-
impl->metadata.set_num_messages_in_batch(batch.size());
51-
auto compressionType = producerConfig_.getCompressionType();
52-
if (compressionType != CompressionNone) {
53-
impl->metadata.set_compression(static_cast<proto::CompressionType>(compressionType));
54-
impl->metadata.set_uncompressed_size(impl->payload.readableBytes());
55-
}
56-
impl->payload = CompressionCodecProvider::getCodec(compressionType).encode(impl->payload);
57-
58-
auto msgCrypto = msgCryptoWeakPtr_.lock();
59-
if (msgCrypto && producerConfig_.isEncryptionEnabled()) {
60-
SharedBuffer encryptedPayload;
61-
if (!msgCrypto->encrypt(producerConfig_.getEncryptionKeys(), producerConfig_.getCryptoKeyReader(),
62-
impl->metadata, impl->payload, encryptedPayload)) {
63-
return OpSendMsg::create(ResultCryptoError, std::move(sendCallback));
64-
}
65-
impl->payload = encryptedPayload;
66-
}
67-
68-
if (impl->payload.readableBytes() > ClientConnection::getMaxMessageSize()) {
69-
return OpSendMsg::create(ResultMessageTooBig, std::move(sendCallback));
70-
}
71-
72-
return OpSendMsg::create(impl->metadata, batch.messagesCount(), batch.messagesSize(),
73-
producerConfig_.getSendTimeout(), batch.createSendCallback(flushCallback),
74-
nullptr, producerId_, impl->payload);
39+
MessageAndCallbackBatch& batch) const {
40+
auto crypto = msgCryptoWeakPtr_.lock();
41+
return batch.createOpSendMsg(producerId_, producerConfig_, crypto.get());
7542
}
7643

7744
} // namespace pulsar

lib/BatchMessageContainerBase.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,7 @@ class BatchMessageContainerBase : public boost::noncopyable {
109109
void updateStats(const Message& msg);
110110
void resetStats();
111111

112-
std::unique_ptr<OpSendMsg> createOpSendMsgHelper(const FlushCallback& flushCallback,
113-
const MessageAndCallbackBatch& batch) const;
112+
std::unique_ptr<OpSendMsg> createOpSendMsgHelper(MessageAndCallbackBatch& flushCallback) const;
114113

115114
virtual void clear() = 0;
116115
};

lib/BatchMessageKeyBasedContainer.cc

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -74,22 +74,19 @@ void BatchMessageKeyBasedContainer::clear() {
7474

7575
std::vector<std::unique_ptr<OpSendMsg>> BatchMessageKeyBasedContainer::createOpSendMsgs(
7676
const FlushCallback& flushCallback) {
77-
// Sorted the batches by sequence id
78-
std::vector<const MessageAndCallbackBatch*> sortedBatches;
79-
for (const auto& kv : batches_) {
80-
sortedBatches.emplace_back(&kv.second);
77+
// Store raw pointers to use std::sort
78+
std::vector<OpSendMsg*> rawOpSendMsgs;
79+
for (auto& kv : batches_) {
80+
rawOpSendMsgs.emplace_back(createOpSendMsgHelper(kv.second).release());
8181
}
82-
std::sort(sortedBatches.begin(), sortedBatches.end(),
83-
[](const MessageAndCallbackBatch* lhs, const MessageAndCallbackBatch* rhs) {
84-
return lhs->sequenceId() < rhs->sequenceId();
85-
});
82+
std::sort(rawOpSendMsgs.begin(), rawOpSendMsgs.end(), [](const OpSendMsg* lhs, const OpSendMsg* rhs) {
83+
return lhs->sendArgs->sequenceId < rhs->sendArgs->sequenceId;
84+
});
85+
rawOpSendMsgs.back()->addTrackerCallback(flushCallback);
8686

87-
std::vector<std::unique_ptr<OpSendMsg>> opSendMsgs{sortedBatches.size()};
88-
for (size_t i = 0; i + 1 < opSendMsgs.size(); i++) {
89-
opSendMsgs[i].reset(createOpSendMsgHelper(nullptr, *sortedBatches[i]).release());
90-
}
91-
if (!opSendMsgs.empty()) {
92-
opSendMsgs.back().reset(createOpSendMsgHelper(flushCallback, *sortedBatches.back()).release());
87+
std::vector<std::unique_ptr<OpSendMsg>> opSendMsgs{rawOpSendMsgs.size()};
88+
for (size_t i = 0; i < opSendMsgs.size(); i++) {
89+
opSendMsgs[i].reset(rawOpSendMsgs[i]);
9390
}
9491
clear();
9592
return opSendMsgs;

lib/Commands.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -839,7 +839,8 @@ void Commands::initBatchMessageMetadata(const Message& msg, pulsar::proto::Messa
839839
uint64_t Commands::serializeSingleMessageInBatchWithPayload(const Message& msg, SharedBuffer& batchPayLoad,
840840
unsigned long maxMessageSizeInBytes) {
841841
const auto& msgMetadata = msg.impl_->metadata;
842-
SingleMessageMetadata metadata;
842+
thread_local SingleMessageMetadata metadata;
843+
metadata.Clear();
843844
if (msgMetadata.has_partition_key()) {
844845
metadata.set_partition_key(msgMetadata.partition_key());
845846
}
@@ -868,7 +869,7 @@ uint64_t Commands::serializeSingleMessageInBatchWithPayload(const Message& msg,
868869
int payloadSize = msg.impl_->payload.readableBytes();
869870
metadata.set_payload_size(payloadSize);
870871

871-
int msgMetadataSize = metadata.ByteSize();
872+
auto msgMetadataSize = metadata.ByteSizeLong();
872873

873874
unsigned long requiredSpace = sizeof(uint32_t) + msgMetadataSize + payloadSize;
874875
if (batchPayLoad.writableBytes() <= sizeof(uint32_t) + msgMetadataSize + payloadSize) {

lib/MessageAndCallbackBatch.cc

Lines changed: 66 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -22,59 +22,95 @@
2222

2323
#include "ClientConnection.h"
2424
#include "Commands.h"
25-
#include "LogUtils.h"
26-
#include "MessageImpl.h"
27-
28-
DECLARE_LOG_OBJECT()
25+
#include "CompressionCodec.h"
26+
#include "MessageCrypto.h"
27+
#include "OpSendMsg.h"
28+
#include "PulsarApi.pb.h"
2929

3030
namespace pulsar {
3131

32+
MessageAndCallbackBatch::MessageAndCallbackBatch() {}
33+
34+
MessageAndCallbackBatch::~MessageAndCallbackBatch() {}
35+
3236
void MessageAndCallbackBatch::add(const Message& msg, const SendCallback& callback) {
33-
if (empty()) {
34-
msgImpl_.reset(new MessageImpl);
35-
Commands::initBatchMessageMetadata(msg, msgImpl_->metadata);
37+
if (callbacks_.empty()) {
38+
metadata_.reset(new proto::MessageMetadata);
39+
Commands::initBatchMessageMetadata(msg, *metadata_);
40+
sequenceId_ = metadata_->sequence_id();
3641
}
37-
LOG_DEBUG(" Before serialization payload size in bytes = " << msgImpl_->payload.readableBytes());
38-
sequenceId_ = Commands::serializeSingleMessageInBatchWithPayload(msg, msgImpl_->payload,
39-
ClientConnection::getMaxMessageSize());
40-
LOG_DEBUG(" After serialization payload size in bytes = " << msgImpl_->payload.readableBytes());
42+
messages_.emplace_back(msg);
4143
callbacks_.emplace_back(callback);
42-
43-
++messagesCount_;
4444
messagesSize_ += msg.getLength();
4545
}
4646

47+
std::unique_ptr<OpSendMsg> MessageAndCallbackBatch::createOpSendMsg(
48+
uint64_t producerId, const ProducerConfiguration& producerConfig, MessageCrypto* crypto) {
49+
auto callback = createSendCallback();
50+
if (empty()) {
51+
return OpSendMsg::create(ResultOperationNotSupported, std::move(callback));
52+
}
53+
54+
// The magic number 64 is just an estimated size increment after setting some fields of the
55+
// SingleMessageMetadata. It does not have to be accurate because it's only used to reduce the
56+
// reallocation of the payload buffer.
57+
static const size_t kEstimatedHeaderSize =
58+
sizeof(uint32_t) + proto::MessageMetadata{}.ByteSizeLong() + 64;
59+
const auto maxMessageSize = ClientConnection::getMaxMessageSize();
60+
// Estimate the buffer size just to avoid resizing the buffer
61+
size_t maxBufferSize = kEstimatedHeaderSize * messages_.size();
62+
for (const auto& msg : messages_) {
63+
maxBufferSize += msg.getLength();
64+
}
65+
auto payload = SharedBuffer::allocate(maxBufferSize);
66+
for (const auto& msg : messages_) {
67+
sequenceId_ = Commands::serializeSingleMessageInBatchWithPayload(msg, payload, maxMessageSize);
68+
}
69+
metadata_->set_sequence_id(sequenceId_);
70+
metadata_->set_num_messages_in_batch(messages_.size());
71+
auto compressionType = producerConfig.getCompressionType();
72+
if (compressionType != CompressionNone) {
73+
metadata_->set_compression(static_cast<proto::CompressionType>(compressionType));
74+
metadata_->set_uncompressed_size(payload.readableBytes());
75+
}
76+
payload = CompressionCodecProvider::getCodec(compressionType).encode(payload);
77+
78+
if (producerConfig.isEncryptionEnabled() && crypto) {
79+
SharedBuffer encryptedPayload;
80+
if (!crypto->encrypt(producerConfig.getEncryptionKeys(), producerConfig.getCryptoKeyReader(),
81+
*metadata_, payload, encryptedPayload)) {
82+
return OpSendMsg::create(ResultCryptoError, std::move(callback));
83+
}
84+
payload = encryptedPayload;
85+
}
86+
87+
if (payload.readableBytes() > ClientConnection::getMaxMessageSize()) {
88+
return OpSendMsg::create(ResultMessageTooBig, std::move(callback));
89+
}
90+
91+
auto op = OpSendMsg::create(*metadata_, callbacks_.size(), messagesSize_, producerConfig.getSendTimeout(),
92+
std::move(callback), nullptr, producerId, payload);
93+
clear();
94+
return op;
95+
}
96+
4797
void MessageAndCallbackBatch::clear() {
48-
msgImpl_.reset();
98+
messages_.clear();
4999
callbacks_.clear();
50-
messagesCount_ = 0;
51100
messagesSize_ = 0;
52101
}
53102

54103
static void completeSendCallbacks(const std::vector<SendCallback>& callbacks, Result result,
55104
const MessageId& id) {
56105
int32_t numOfMessages = static_cast<int32_t>(callbacks.size());
57-
LOG_DEBUG("Batch complete [Result = " << result << "] [numOfMessages = " << numOfMessages << "]");
58106
for (int32_t i = 0; i < numOfMessages; i++) {
59107
callbacks[i](result, MessageIdBuilder::from(id).batchIndex(i).batchSize(numOfMessages).build());
60108
}
61109
}
62110

63-
void MessageAndCallbackBatch::complete(Result result, const MessageId& id) const {
64-
completeSendCallbacks(callbacks_, result, id);
65-
}
66-
67-
SendCallback MessageAndCallbackBatch::createSendCallback(const FlushCallback& flushCallback) const {
111+
SendCallback MessageAndCallbackBatch::createSendCallback() const {
68112
const auto& callbacks = callbacks_;
69-
if (flushCallback) {
70-
return [callbacks, flushCallback](Result result, const MessageId& id) {
71-
completeSendCallbacks(callbacks, result, id);
72-
flushCallback(result);
73-
};
74-
} else {
75-
return [callbacks] // save a copy of `callbacks_`
76-
(Result result, const MessageId& id) { completeSendCallbacks(callbacks, result, id); };
77-
}
113+
return [callbacks](Result result, const MessageId& id) { completeSendCallbacks(callbacks, result, id); };
78114
}
79115

80116
} // namespace pulsar

lib/MessageAndCallbackBatch.h

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,24 @@
2424

2525
#include <atomic>
2626
#include <boost/noncopyable.hpp>
27+
#include <memory>
2728
#include <vector>
2829

2930
namespace pulsar {
3031

31-
class MessageImpl;
32-
using MessageImplPtr = std::shared_ptr<MessageImpl>;
32+
struct OpSendMsg;
33+
class MessageCrypto;
3334
using FlushCallback = std::function<void(Result)>;
3435

35-
class MessageAndCallbackBatch : public boost::noncopyable {
36+
namespace proto {
37+
class MessageMetadata;
38+
}
39+
40+
class MessageAndCallbackBatch final : public boost::noncopyable {
3641
public:
42+
MessageAndCallbackBatch();
43+
~MessageAndCallbackBatch();
44+
3745
// Wrapper methods of STL container
3846
bool empty() const noexcept { return callbacks_.empty(); }
3947
size_t size() const noexcept { return callbacks_.size(); }
@@ -46,34 +54,22 @@ class MessageAndCallbackBatch : public boost::noncopyable {
4654
*/
4755
void add(const Message& msg, const SendCallback& callback);
4856

49-
/**
50-
* Clear the internal stats
51-
*/
52-
void clear();
57+
std::unique_ptr<OpSendMsg> createOpSendMsg(uint64_t producerId,
58+
const ProducerConfiguration& producerConfig,
59+
MessageCrypto* crypto);
5360

54-
/**
55-
* Complete all the callbacks with given parameters
56-
*
57-
* @param result this batch's send result
58-
* @param id this batch's message id
59-
*/
60-
void complete(Result result, const MessageId& id) const;
61-
62-
SendCallback createSendCallback(const FlushCallback& flushCallback) const;
63-
64-
const MessageImplPtr& msgImpl() const { return msgImpl_; }
6561
uint64_t sequenceId() const noexcept { return sequenceId_; }
6662

67-
uint32_t messagesCount() const { return messagesCount_; }
68-
uint64_t messagesSize() const { return messagesSize_; }
63+
void clear();
6964

7065
private:
71-
MessageImplPtr msgImpl_;
66+
std::unique_ptr<proto::MessageMetadata> metadata_;
67+
std::vector<Message> messages_;
7268
std::vector<SendCallback> callbacks_;
7369
std::atomic<uint64_t> sequenceId_{static_cast<uint64_t>(-1L)};
74-
75-
uint32_t messagesCount_{0};
7670
uint64_t messagesSize_{0ull};
71+
72+
SendCallback createSendCallback() const;
7773
};
7874

7975
} // namespace pulsar

lib/OpSendMsg.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ struct SendArguments {
3636
const uint64_t producerId;
3737
const uint64_t sequenceId;
3838
const proto::MessageMetadata metadata;
39-
const SharedBuffer payload;
39+
SharedBuffer payload;
4040

4141
SendArguments(uint64_t producerId, uint64_t sequenceId, const proto::MessageMetadata& metadata,
4242
const SharedBuffer& payload)
@@ -73,7 +73,9 @@ struct OpSendMsg {
7373
}
7474

7575
void addTrackerCallback(std::function<void(Result)> trackerCallback) {
76-
trackerCallbacks.emplace_back(trackerCallback);
76+
if (trackerCallback) {
77+
trackerCallbacks.emplace_back(trackerCallback);
78+
}
7779
}
7880

7981
private:

0 commit comments

Comments
 (0)