|
22 | 22 |
|
23 | 23 | #include "ClientConnection.h" |
24 | 24 | #include "Commands.h" |
25 | | -#include "LogUtils.h" |
26 | | -#include "MessageImpl.h" |
27 | | - |
28 | | -DECLARE_LOG_OBJECT() |
| 25 | +#include "CompressionCodec.h" |
| 26 | +#include "MessageCrypto.h" |
| 27 | +#include "OpSendMsg.h" |
| 28 | +#include "PulsarApi.pb.h" |
29 | 29 |
|
30 | 30 | namespace pulsar { |
31 | 31 |
|
| 32 | +MessageAndCallbackBatch::MessageAndCallbackBatch() {} |
| 33 | + |
| 34 | +MessageAndCallbackBatch::~MessageAndCallbackBatch() {} |
| 35 | + |
32 | 36 | void MessageAndCallbackBatch::add(const Message& msg, const SendCallback& callback) { |
33 | | - if (empty()) { |
34 | | - msgImpl_.reset(new MessageImpl); |
35 | | - Commands::initBatchMessageMetadata(msg, msgImpl_->metadata); |
| 37 | + if (callbacks_.empty()) { |
| 38 | + metadata_.reset(new proto::MessageMetadata); |
| 39 | + Commands::initBatchMessageMetadata(msg, *metadata_); |
| 40 | + sequenceId_ = metadata_->sequence_id(); |
36 | 41 | } |
37 | | - LOG_DEBUG(" Before serialization payload size in bytes = " << msgImpl_->payload.readableBytes()); |
38 | | - sequenceId_ = Commands::serializeSingleMessageInBatchWithPayload(msg, msgImpl_->payload, |
39 | | - ClientConnection::getMaxMessageSize()); |
40 | | - LOG_DEBUG(" After serialization payload size in bytes = " << msgImpl_->payload.readableBytes()); |
| 42 | + messages_.emplace_back(msg); |
41 | 43 | callbacks_.emplace_back(callback); |
42 | | - |
43 | | - ++messagesCount_; |
44 | 44 | messagesSize_ += msg.getLength(); |
45 | 45 | } |
46 | 46 |
|
| 47 | +std::unique_ptr<OpSendMsg> MessageAndCallbackBatch::createOpSendMsg( |
| 48 | + uint64_t producerId, const ProducerConfiguration& producerConfig, MessageCrypto* crypto) { |
| 49 | + auto callback = createSendCallback(); |
| 50 | + if (empty()) { |
| 51 | + return OpSendMsg::create(ResultOperationNotSupported, std::move(callback)); |
| 52 | + } |
| 53 | + |
| 54 | + // The magic number 64 is just an estimated size increment after setting some fields of the |
| 55 | + // SingleMessageMetadata. It does not have to be accurate because it's only used to reduce the |
| 56 | + // reallocation of the payload buffer. |
| 57 | + static const size_t kEstimatedHeaderSize = |
| 58 | + sizeof(uint32_t) + proto::MessageMetadata{}.ByteSizeLong() + 64; |
| 59 | + const auto maxMessageSize = ClientConnection::getMaxMessageSize(); |
| 60 | + // Estimate the buffer size just to avoid resizing the buffer |
| 61 | + size_t maxBufferSize = kEstimatedHeaderSize * messages_.size(); |
| 62 | + for (const auto& msg : messages_) { |
| 63 | + maxBufferSize += msg.getLength(); |
| 64 | + } |
| 65 | + auto payload = SharedBuffer::allocate(maxBufferSize); |
| 66 | + for (const auto& msg : messages_) { |
| 67 | + sequenceId_ = Commands::serializeSingleMessageInBatchWithPayload(msg, payload, maxMessageSize); |
| 68 | + } |
| 69 | + metadata_->set_sequence_id(sequenceId_); |
| 70 | + metadata_->set_num_messages_in_batch(messages_.size()); |
| 71 | + auto compressionType = producerConfig.getCompressionType(); |
| 72 | + if (compressionType != CompressionNone) { |
| 73 | + metadata_->set_compression(static_cast<proto::CompressionType>(compressionType)); |
| 74 | + metadata_->set_uncompressed_size(payload.readableBytes()); |
| 75 | + } |
| 76 | + payload = CompressionCodecProvider::getCodec(compressionType).encode(payload); |
| 77 | + |
| 78 | + if (producerConfig.isEncryptionEnabled() && crypto) { |
| 79 | + SharedBuffer encryptedPayload; |
| 80 | + if (!crypto->encrypt(producerConfig.getEncryptionKeys(), producerConfig.getCryptoKeyReader(), |
| 81 | + *metadata_, payload, encryptedPayload)) { |
| 82 | + return OpSendMsg::create(ResultCryptoError, std::move(callback)); |
| 83 | + } |
| 84 | + payload = encryptedPayload; |
| 85 | + } |
| 86 | + |
| 87 | + if (payload.readableBytes() > ClientConnection::getMaxMessageSize()) { |
| 88 | + return OpSendMsg::create(ResultMessageTooBig, std::move(callback)); |
| 89 | + } |
| 90 | + |
| 91 | + auto op = OpSendMsg::create(*metadata_, callbacks_.size(), messagesSize_, producerConfig.getSendTimeout(), |
| 92 | + std::move(callback), nullptr, producerId, payload); |
| 93 | + clear(); |
| 94 | + return op; |
| 95 | +} |
| 96 | + |
47 | 97 | void MessageAndCallbackBatch::clear() { |
48 | | - msgImpl_.reset(); |
| 98 | + messages_.clear(); |
49 | 99 | callbacks_.clear(); |
50 | | - messagesCount_ = 0; |
51 | 100 | messagesSize_ = 0; |
52 | 101 | } |
53 | 102 |
|
54 | 103 | static void completeSendCallbacks(const std::vector<SendCallback>& callbacks, Result result, |
55 | 104 | const MessageId& id) { |
56 | 105 | int32_t numOfMessages = static_cast<int32_t>(callbacks.size()); |
57 | | - LOG_DEBUG("Batch complete [Result = " << result << "] [numOfMessages = " << numOfMessages << "]"); |
58 | 106 | for (int32_t i = 0; i < numOfMessages; i++) { |
59 | 107 | callbacks[i](result, MessageIdBuilder::from(id).batchIndex(i).batchSize(numOfMessages).build()); |
60 | 108 | } |
61 | 109 | } |
62 | 110 |
|
63 | | -void MessageAndCallbackBatch::complete(Result result, const MessageId& id) const { |
64 | | - completeSendCallbacks(callbacks_, result, id); |
65 | | -} |
66 | | - |
67 | | -SendCallback MessageAndCallbackBatch::createSendCallback(const FlushCallback& flushCallback) const { |
| 111 | +SendCallback MessageAndCallbackBatch::createSendCallback() const { |
68 | 112 | const auto& callbacks = callbacks_; |
69 | | - if (flushCallback) { |
70 | | - return [callbacks, flushCallback](Result result, const MessageId& id) { |
71 | | - completeSendCallbacks(callbacks, result, id); |
72 | | - flushCallback(result); |
73 | | - }; |
74 | | - } else { |
75 | | - return [callbacks] // save a copy of `callbacks_` |
76 | | - (Result result, const MessageId& id) { completeSendCallbacks(callbacks, result, id); }; |
77 | | - } |
| 113 | + return [callbacks](Result result, const MessageId& id) { completeSendCallbacks(callbacks, result, id); }; |
78 | 114 | } |
79 | 115 |
|
80 | 116 | } // namespace pulsar |
0 commit comments