forked from apache/pulsar-client-cpp
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathProducerImpl.h
More file actions
209 lines (159 loc) · 7.05 KB
/
Copy pathProducerImpl.h
File metadata and controls
209 lines (159 loc) · 7.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef LIB_PRODUCERIMPL_H_
#define LIB_PRODUCERIMPL_H_
#include <boost/optional.hpp>
#include <memory>
#include "Future.h"
#include "HandlerBase.h"
// In MSVC and macOS, the value type of STL container cannot be forward declared
#if defined(_MSC_VER) || defined(__APPLE__)
#include "OpSendMsg.h"
#endif
#include "PendingFailures.h"
#include "PeriodicTask.h"
#include "ProducerImplBase.h"
namespace pulsar {
class BatchMessageContainerBase;
class ClientImpl;
using ClientImplPtr = std::shared_ptr<ClientImpl>;
using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
class MessageCrypto;
using MessageCryptoPtr = std::shared_ptr<MessageCrypto>;
class ProducerImpl;
using ProducerImplWeakPtr = std::weak_ptr<ProducerImpl>;
class ProducerStatsBase;
using ProducerStatsBasePtr = std::shared_ptr<ProducerStatsBase>;
struct ResponseData;
class ProducerImpl;
using ProducerImplPtr = std::shared_ptr<ProducerImpl>;
class PulsarFriend;
class Producer;
class MemoryLimitController;
class Semaphore;
class TopicName;
struct OpSendMsg;
namespace proto {
class MessageMetadata;
} // namespace proto
class ProducerImpl : public HandlerBase, public ProducerImplBase {
public:
ProducerImpl(ClientImplPtr client, const TopicName& topic,
const ProducerConfiguration& producerConfiguration,
const ProducerInterceptorsPtr& interceptors, int32_t partition = -1);
~ProducerImpl();
// overrided methods from ProducerImplBase
const std::string& getProducerName() const override;
int64_t getLastSequenceId() const override;
const std::string& getSchemaVersion() const override;
void sendAsync(const Message& msg, SendCallback callback) override;
void closeAsync(CloseCallback callback) override;
void start() override;
void shutdown() override;
bool isClosed() override;
const std::string& getTopic() const override;
Future<Result, ProducerImplBaseWeakPtr> getProducerCreatedFuture() override;
void triggerFlush() override;
void flushAsync(FlushCallback callback) override;
bool isConnected() const override;
uint64_t getNumberOfConnectedProducer() override;
bool isStarted() const;
bool removeCorruptMessage(uint64_t sequenceId);
bool ackReceived(uint64_t sequenceId, MessageId& messageId);
virtual void disconnectProducer();
uint64_t getProducerId() const;
int32_t partition() const noexcept { return partition_; }
static int getNumOfChunks(uint32_t size, uint32_t maxMessageSize);
ProducerImplPtr shared_from_this() noexcept {
return std::dynamic_pointer_cast<ProducerImpl>(HandlerBase::shared_from_this());
}
ProducerImplWeakPtr weak_from_this() noexcept { return shared_from_this(); }
protected:
ProducerStatsBasePtr producerStatsBasePtr_;
void setMessageMetadata(const Message& msg, const uint64_t& sequenceId, const uint32_t& uncompressedSize);
void sendMessage(std::unique_ptr<OpSendMsg> opSendMsg);
void startSendTimeoutTimer();
friend class PulsarFriend;
friend class Producer;
friend class BatchMessageContainerBase;
friend class BatchMessageContainer;
// overrided methods from HandlerBase
void beforeConnectionChange(ClientConnection& connection) override;
void connectionOpened(const ClientConnectionPtr& connection) override;
void connectionFailed(Result result) override;
const std::string& getName() const override { return producerStr_; }
private:
void printStats();
void handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
const ResponseData& responseData);
void resendMessages(ClientConnectionPtr cnx);
void refreshEncryptionKey(const boost::system::error_code& ec);
bool encryptMessage(proto::MessageMetadata& metadata, SharedBuffer& payload,
SharedBuffer& encryptedPayload);
void sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& callback);
/**
* Reserve a spot in the messages queue before acquiring the ProducerImpl mutex. When the queue is full,
* this call will block until a spot is available if blockIfQueueIsFull is true. Otherwise, it will return
* ResultProducerQueueIsFull immediately.
*
* It also checks whether the memory could reach the limit after `payloadSize` is added. If so, this call
* will block until enough memory could be retained.
*/
Result canEnqueueRequest(uint32_t payloadSize);
void releaseSemaphore(uint32_t payloadSize);
void releaseSemaphoreForSendOp(const OpSendMsg& op);
void cancelTimers() noexcept;
bool isValidProducerState(const SendCallback& callback) const;
bool canAddToBatch(const Message& msg) const;
typedef std::unique_lock<std::mutex> Lock;
ProducerConfiguration conf_;
std::unique_ptr<Semaphore> semaphore_;
std::list<std::unique_ptr<OpSendMsg>> pendingMessagesQueue_;
const int32_t partition_; // -1 if topic is non-partitioned
std::string producerName_;
bool userProvidedProducerName_;
std::string producerStr_;
uint64_t producerId_;
int64_t msgSequenceGenerator_;
std::unique_ptr<BatchMessageContainerBase> batchMessageContainer_;
DeadlineTimerPtr batchTimer_;
PendingFailures batchMessageAndSend(const FlushCallback& flushCallback = nullptr);
volatile int64_t lastSequenceIdPublished_;
std::string schemaVersion_;
DeadlineTimerPtr sendTimer_;
void handleSendTimeout(const boost::system::error_code& err);
using DurationType = typename boost::asio::deadline_timer::duration_type;
void asyncWaitSendTimeout(DurationType expiryTime);
Promise<Result, ProducerImplBaseWeakPtr> producerCreatedPromise_;
struct PendingCallbacks;
decltype(pendingMessagesQueue_) getPendingCallbacksWhenFailed();
decltype(pendingMessagesQueue_) getPendingCallbacksWhenFailedWithLock();
void failPendingMessages(Result result, bool withLock);
MessageCryptoPtr msgCrypto_;
PeriodicTask dataKeyRefreshTask_;
MemoryLimitController& memoryLimitController_;
const bool chunkingEnabled_;
boost::optional<uint64_t> topicEpoch;
ProducerInterceptorsPtr interceptors_;
};
struct ProducerImplCmp {
bool operator()(const ProducerImplPtr& a, const ProducerImplPtr& b) const;
};
} /* namespace pulsar */
#endif /* LIB_PRODUCERIMPL_H_ */