Skip to content

Commit 2ca2eac

Browse files
feat: introduce a v2 createProducer API to carry error message when fail (#579)
1 parent aedb925 commit 2ca2eac

40 files changed

Lines changed: 633 additions & 425 deletions

include/pulsar/Client.h

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636

3737
#include <memory>
3838
#include <string>
39+
#include <variant>
3940

4041
namespace pulsar {
4142
typedef std::function<void(Result, Producer)> CreateProducerCallback;
@@ -45,6 +46,8 @@ typedef std::function<void(Result, TableView)> TableViewCallback;
4546
typedef std::function<void(Result, const std::vector<std::string>&)> GetPartitionsCallback;
4647
typedef std::function<void(Result)> CloseCallback;
4748

49+
using CreateProducerV2Callback = std::function<void(std::variant<Error, Producer>)>;
50+
4851
class ClientImpl;
4952
class PulsarFriend;
5053
class PulsarWrapper;
@@ -108,7 +111,9 @@ class PULSAR_PUBLIC Client {
108111
* @return ResultOk if the producer has been successfully created
109112
* @return ResultError if there was an error
110113
*/
111-
Result createProducer(const std::string& topic, const ProducerConfiguration& conf, Producer& producer);
114+
[[deprecated("use createProducerV2")]] Result createProducer(const std::string& topic,
115+
const ProducerConfiguration& conf,
116+
Producer& producer);
112117

113118
/**
114119
* Asynchronously create a producer with the default ProducerConfiguration for publishing on a specific
@@ -118,7 +123,8 @@ class PULSAR_PUBLIC Client {
118123
* @param callback the callback that is triggered when the producer is created successfully or not
119124
* @param callback Callback function that is invoked when the operation is completed
120125
*/
121-
void createProducerAsync(const std::string& topic, const CreateProducerCallback& callback);
126+
[[deprecated("use createProducerAsyncV2")]] void createProducerAsync(
127+
const std::string& topic, const CreateProducerCallback& callback);
122128

123129
/**
124130
* Asynchronously create a producer with the customized ProducerConfiguration for publishing on a specific
@@ -127,8 +133,14 @@ class PULSAR_PUBLIC Client {
127133
* @param topic the name of the topic where to produce
128134
* @param conf the customized ProducerConfiguration
129135
*/
130-
void createProducerAsync(const std::string& topic, const ProducerConfiguration& conf,
131-
const CreateProducerCallback& callback);
136+
[[deprecated("use createProducerAsyncV2")]] void createProducerAsync(
137+
const std::string& topic, const ProducerConfiguration& conf, const CreateProducerCallback& callback);
138+
139+
void createProducerAsyncV2(const std::string& topic, const ProducerConfiguration& conf,
140+
CreateProducerV2Callback callback);
141+
142+
std::variant<Error, Producer> createProducerV2(const std::string& topic,
143+
const ProducerConfiguration& conf);
132144

133145
/**
134146
* Subscribe to a given topic and subscription combination with the default ConsumerConfiguration

include/pulsar/Result.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323

2424
#include <cstdint>
2525
#include <iosfwd>
26+
#include <ostream>
27+
#include <string>
2628

2729
namespace pulsar {
2830

@@ -101,6 +103,20 @@ enum Result : int8_t
101103
PULSAR_PUBLIC const char* strResult(Result result);
102104

103105
PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, pulsar::Result result);
106+
107+
struct PULSAR_PUBLIC Error {
108+
Result result;
109+
std::string message;
110+
};
111+
112+
inline std::ostream& operator<<(std::ostream& os, const Error& error) {
113+
os << error.result;
114+
if (!error.message.empty()) {
115+
os << " " << error.message;
116+
}
117+
return os;
118+
}
119+
104120
} // namespace pulsar
105121

106122
#endif /* ERROR_HPP_ */

lib/BinaryProtoLookupService.cc

Lines changed: 35 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,10 @@ auto BinaryProtoLookupService::findBroker(const std::string& address, bool autho
4747

4848
// NOTE: we can use move capture for topic since C++14
4949
cnxPool_.getConnectionAsync(address).addListener([this, promise, topic, address, authoritative,
50-
redirectCount](Result result,
50+
redirectCount](const auto& error,
5151
const ClientConnectionWeakPtr& weakCnx) {
52-
if (result != ResultOk) {
53-
promise->setFailed(result);
52+
if (error.result != ResultOk) {
53+
promise->setFailed(error.result);
5454
return;
5555
}
5656
auto cnx = weakCnx.lock();
@@ -62,10 +62,10 @@ auto BinaryProtoLookupService::findBroker(const std::string& address, bool autho
6262
auto lookupPromise = std::make_shared<LookupDataResultPromise>();
6363
cnx->newTopicLookup(topic, authoritative, listenerName_, newRequestId(), lookupPromise);
6464
lookupPromise->getFuture().addListener([this, cnx, promise, topic, address, redirectCount](
65-
Result result, const LookupDataResultPtr& data) {
66-
if (result != ResultOk || !data) {
67-
LOG_ERROR("Lookup failed for " << topic << ", result " << result);
68-
promise->setFailed(result);
65+
const Error& error, const LookupDataResultPtr& data) {
66+
if (error.result != ResultOk || !data) {
67+
LOG_ERROR("Lookup failed for " << topic << ", result " << error);
68+
promise->setFailed(error.result);
6969
return;
7070
}
7171

@@ -96,15 +96,11 @@ auto BinaryProtoLookupService::findBroker(const std::string& address, bool autho
9696
return promise->getFuture();
9797
}
9898

99-
/*
100-
* @param topicName topic to get number of partitions.
101-
*
102-
*/
103-
Future<Result, LookupDataResultPtr> BinaryProtoLookupService::getPartitionMetadataAsync(
99+
Future<Error, LookupDataResultPtr> BinaryProtoLookupService::getPartitionMetadataAsync(
104100
const TopicNamePtr& topicName) {
105101
LookupDataResultPromisePtr promise = std::make_shared<LookupDataResultPromise>();
106102
if (!topicName) {
107-
promise->setFailed(ResultInvalidTopicName);
103+
promise->setFailed(Error{ResultInvalidTopicName, ""});
108104
return promise->getFuture();
109105
}
110106
std::string lookupName = topicName->toString();
@@ -115,16 +111,17 @@ Future<Result, LookupDataResultPtr> BinaryProtoLookupService::getPartitionMetada
115111
return promise->getFuture();
116112
}
117113

118-
void BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const std::string& topicName, Result result,
114+
void BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const std::string& topicName,
115+
const Error& error,
119116
const ClientConnectionWeakPtr& clientCnx,
120117
const LookupDataResultPromisePtr& promise) {
121-
if (result != ResultOk) {
122-
promise->setFailed(result);
118+
if (error.result != ResultOk) {
119+
promise->setFailed(error);
123120
return;
124121
}
125122
auto conn = clientCnx.lock();
126123
if (!conn) {
127-
promise->setFailed(ResultConnectError);
124+
promise->setFailed(Error{ResultConnectError, ""});
128125
return;
129126
}
130127
LookupDataResultPromisePtr lookupPromise = std::make_shared<LookupDataResultPromise>();
@@ -135,7 +132,7 @@ void BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const std::str
135132
std::placeholders::_2, clientCnx, promise));
136133
}
137134

138-
void BinaryProtoLookupService::handlePartitionMetadataLookup(const std::string& topicName, Result result,
135+
void BinaryProtoLookupService::handlePartitionMetadataLookup(const std::string& topicName, const Error& error,
139136
const LookupDataResultPtr& data,
140137
const ClientConnectionWeakPtr& clientCnx,
141138
const LookupDataResultPromisePtr& promise) {
@@ -144,8 +141,8 @@ void BinaryProtoLookupService::handlePartitionMetadataLookup(const std::string&
144141
<< data->getBrokerUrl());
145142
promise->setValue(data);
146143
} else {
147-
LOG_DEBUG("PartitionMetadataLookup failed for " << topicName << ", result " << result);
148-
promise->setFailed(result);
144+
LOG_DEBUG("PartitionMetadataLookup failed for " << topicName << ", result " << error);
145+
promise->setFailed(error);
149146
}
150147
}
151148

@@ -168,38 +165,39 @@ Future<Result, NamespaceTopicsPtr> BinaryProtoLookupService::getTopicsOfNamespac
168165
return promise->getFuture();
169166
}
170167

171-
Future<Result, SchemaInfo> BinaryProtoLookupService::getSchema(const TopicNamePtr& topicName,
172-
const std::string& version) {
173-
GetSchemaPromisePtr promise = std::make_shared<Promise<Result, SchemaInfo>>();
174-
168+
Future<Error, SchemaInfo> BinaryProtoLookupService::getSchema(const TopicNamePtr& topicName,
169+
const std::string& version) {
170+
GetSchemaPromisePtr promise = std::make_shared<Promise<Error, SchemaInfo>>();
175171
if (!topicName) {
176-
promise->setFailed(ResultInvalidTopicName);
172+
promise->setFailed(Error{ResultInvalidTopicName, ""});
177173
return promise->getFuture();
178174
}
179-
cnxPool_.getConnectionAsync(serviceNameResolver_.resolveHost())
175+
176+
const auto topic = topicName->toString();
177+
const auto address = serviceNameResolver_.resolveHost();
178+
cnxPool_.getConnectionAsync(address, address)
180179
.addListener(std::bind(&BinaryProtoLookupService::sendGetSchemaRequest, this, topicName->toString(),
181180
version, std::placeholders::_1, std::placeholders::_2, promise));
182-
183181
return promise->getFuture();
184182
}
185183

186184
void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& topicName, const std::string& version,
187-
Result result, const ClientConnectionWeakPtr& clientCnx,
185+
const Error& error,
186+
const ClientConnectionWeakPtr& clientCnx,
188187
const GetSchemaPromisePtr& promise) {
189-
if (result != ResultOk) {
190-
promise->setFailed(result);
188+
if (error.result != ResultOk) {
189+
promise->setFailed(error);
191190
return;
192191
}
193192

194193
ClientConnectionPtr conn = clientCnx.lock();
195194
uint64_t requestId = newRequestId();
196195
LOG_DEBUG("sendGetSchemaRequest. requestId: " << requestId << " topicName: " << topicName
197196
<< " version: " << version);
198-
199197
conn->newGetSchema(topicName, version, requestId)
200-
.addListener([promise](Result result, const SchemaInfo& schemaInfo) {
201-
if (result != ResultOk) {
202-
promise->setFailed(result);
198+
.addListener([promise](const auto& error, const SchemaInfo& schemaInfo) {
199+
if (error.result != ResultOk) {
200+
promise->setFailed(error);
203201
return;
204202
}
205203
promise->setValue(schemaInfo);
@@ -208,11 +206,11 @@ void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& topicName
208206

209207
void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string& nsName,
210208
CommandGetTopicsOfNamespace_Mode mode,
211-
Result result,
209+
const Error& error,
212210
const ClientConnectionWeakPtr& clientCnx,
213211
const NamespaceTopicsPromisePtr& promise) {
214-
if (result != ResultOk) {
215-
promise->setFailed(result);
212+
if (error.result != ResultOk) {
213+
promise->setFailed(error.result);
216214
return;
217215
}
218216

lib/BinaryProtoLookupService.h

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class ConnectionPool;
3535
class LookupDataResult;
3636
class ServiceNameResolver;
3737
using NamespaceTopicsPromisePtr = std::shared_ptr<Promise<Result, NamespaceTopicsPtr>>;
38-
using GetSchemaPromisePtr = std::shared_ptr<Promise<Result, SchemaInfo>>;
38+
using GetSchemaPromisePtr = std::shared_ptr<Promise<Error, SchemaInfo>>;
3939

4040
class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
4141
public:
@@ -48,12 +48,12 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
4848

4949
LookupResultFuture getBroker(const TopicName& topicName) override;
5050

51-
Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName) override;
51+
Future<Error, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName) override;
5252

5353
Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
5454
const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override;
5555

56-
Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName, const std::string& version) override;
56+
Future<Error, SchemaInfo> getSchema(const TopicNamePtr& topicName, const std::string& version) override;
5757

5858
ServiceNameResolver& getServiceNameResolver() override { return serviceNameResolver_; }
5959

@@ -71,20 +71,20 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
7171
std::string listenerName_;
7272
const int32_t maxLookupRedirects_;
7373

74-
void sendPartitionMetadataLookupRequest(const std::string& topicName, Result result,
74+
void sendPartitionMetadataLookupRequest(const std::string& topicName, const Error& error,
7575
const ClientConnectionWeakPtr& clientCnx,
7676
const LookupDataResultPromisePtr& promise);
7777

78-
void handlePartitionMetadataLookup(const std::string& topicName, Result result,
78+
void handlePartitionMetadataLookup(const std::string& topicName, const Error& error,
7979
const LookupDataResultPtr& data,
8080
const ClientConnectionWeakPtr& clientCnx,
8181
const LookupDataResultPromisePtr& promise);
8282

8383
void sendGetTopicsOfNamespaceRequest(const std::string& nsName, CommandGetTopicsOfNamespace_Mode mode,
84-
Result result, const ClientConnectionWeakPtr& clientCnx,
84+
const Error& error, const ClientConnectionWeakPtr& clientCnx,
8585
const NamespaceTopicsPromisePtr& promise);
8686

87-
void sendGetSchemaRequest(const std::string& topicName, const std::string& version, Result result,
87+
void sendGetSchemaRequest(const std::string& topicName, const std::string& version, const Error& error,
8888
const ClientConnectionWeakPtr& clientCnx, const GetSchemaPromisePtr& promise);
8989

9090
void getTopicsOfNamespaceListener(Result result, const NamespaceTopicsPtr& topicsPtr,

lib/Client.cc

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <pulsar/Client.h>
2020
#include <pulsar/ServiceInfoProvider.h>
2121

22+
#include <future>
2223
#include <iostream>
2324
#include <memory>
2425
#include <utility>
@@ -65,7 +66,31 @@ void Client::createProducerAsync(const std::string& topic, const CreateProducerC
6566

6667
void Client::createProducerAsync(const std::string& topic, const ProducerConfiguration& conf,
6768
const CreateProducerCallback& callback) {
68-
impl_->createProducerAsync(topic, conf, callback);
69+
impl_->createProducerAsync(topic, conf, [callback](const auto& v) {
70+
if (const auto* error = std::get_if<Error>(&v)) {
71+
callback(error->result, Producer());
72+
} else {
73+
callback(ResultOk, std::get<Producer>(v));
74+
}
75+
});
76+
}
77+
78+
void Client::createProducerAsyncV2(const std::string& topic, const ProducerConfiguration& conf,
79+
CreateProducerV2Callback callback) {
80+
impl_->createProducerAsync(topic, conf, std::move(callback));
81+
}
82+
83+
std::variant<Error, Producer> Client::createProducerV2(const std::string& topic,
84+
const ProducerConfiguration& conf) {
85+
std::promise<std::variant<Error, Producer>> promise;
86+
createProducerAsyncV2(topic, conf, [&promise](const auto& v) mutable {
87+
if (const auto* error = std::get_if<Error>(&v)) {
88+
promise.set_value(*error);
89+
} else {
90+
promise.set_value(std::get<Producer>(v));
91+
}
92+
});
93+
return promise.get_future().get();
6994
}
7095

7196
Result Client::subscribe(const std::string& topic, const std::string& subscriptionName, Consumer& consumer) {
@@ -169,9 +194,9 @@ void Client::createTableViewAsync(const std::string& topic, const TableViewConfi
169194
}
170195

171196
Result Client::getPartitionsForTopic(const std::string& topic, std::vector<std::string>& partitions) {
172-
Promise<Result, std::vector<std::string> > promise;
173-
getPartitionsForTopicAsync(topic, WaitForCallbackValue<std::vector<std::string> >(promise));
174-
Future<Result, std::vector<std::string> > future = promise.getFuture();
197+
Promise<Result, std::vector<std::string>> promise;
198+
getPartitionsForTopicAsync(topic, WaitForCallbackValue<std::vector<std::string>>(promise));
199+
Future<Result, std::vector<std::string>> future = promise.getFuture();
175200

176201
return future.get(partitions);
177202
}
@@ -199,7 +224,9 @@ uint64_t Client::getNumberOfConsumers() { return impl_->getNumberOfConsumers();
199224
void Client::getSchemaInfoAsync(const std::string& topic, int64_t version,
200225
std::function<void(Result, const SchemaInfo&)> callback) {
201226
impl_->getSchema(TopicName::get(topic), (version >= 0) ? toBigEndianBytes(version) : "")
202-
.addListener(std::move(callback));
227+
.addListener([callback{std::move(callback)}](const Error& error, const SchemaInfo& schemaInfo) {
228+
callback(error.result, schemaInfo);
229+
});
203230
}
204231

205232
ServiceInfo Client::getServiceInfo() const { return impl_->getServiceInfo(); }

0 commit comments

Comments
 (0)