Skip to content

Commit ce3dfa8

Browse files
feat: add v2 APIs to create Consumer, Reader or TableView (#581)
* feat: add v2 APIs to create Consumer, Reader or TableView * improve tests for other v2 methods * fix lint
1 parent 2ca2eac commit ce3dfa8

7 files changed

Lines changed: 296 additions & 101 deletions

File tree

include/pulsar/Client.h

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@
3636

3737
#include <memory>
3838
#include <string>
39+
#include <utility>
3940
#include <variant>
41+
#include <vector>
4042

4143
namespace pulsar {
4244
typedef std::function<void(Result, Producer)> CreateProducerCallback;
@@ -47,6 +49,21 @@ typedef std::function<void(Result, const std::vector<std::string>&)> GetPartitio
4749
typedef std::function<void(Result)> CloseCallback;
4850

4951
using CreateProducerV2Callback = std::function<void(std::variant<Error, Producer>)>;
52+
using CreateConsumerV2Callback = std::function<void(std::variant<Error, Consumer>)>;
53+
using SubscribeV2Callback = CreateConsumerV2Callback;
54+
using ReaderV2Callback = std::function<void(std::variant<Error, Reader>)>;
55+
using TableViewV2Callback = std::function<void(std::variant<Error, TableView>)>;
56+
57+
/**
58+
* Use TopicRegex with subscribeV2/subscribeAsyncV2 to distinguish a regex pattern from a single topic name.
59+
*/
60+
struct TopicRegex {
61+
explicit TopicRegex(std::string pattern) : pattern(std::move(pattern)) {}
62+
63+
std::string pattern;
64+
};
65+
66+
using SubscribeTopics = std::variant<std::string, std::vector<std::string>, TopicRegex>;
5067

5168
class ClientImpl;
5269
class PulsarFriend;
@@ -188,6 +205,13 @@ class PULSAR_PUBLIC Client {
188205
void subscribeAsync(const std::string& topic, const std::string& subscriptionName,
189206
const ConsumerConfiguration& conf, const SubscribeCallback& callback);
190207

208+
void subscribeAsyncV2(const SubscribeTopics& topics, const std::string& subscriptionName,
209+
const ConsumerConfiguration& conf, SubscribeV2Callback callback);
210+
211+
std::variant<Error, Consumer> subscribeV2(const SubscribeTopics& topics,
212+
const std::string& subscriptionName,
213+
const ConsumerConfiguration& conf);
214+
191215
/**
192216
* Subscribe to multiple topics under the same namespace.
193217
*
@@ -332,6 +356,12 @@ class PULSAR_PUBLIC Client {
332356
void createReaderAsync(const std::string& topic, const MessageId& startMessageId,
333357
const ReaderConfiguration& conf, const ReaderCallback& callback);
334358

359+
void createReaderAsyncV2(const std::string& topic, const MessageId& startMessageId,
360+
const ReaderConfiguration& conf, ReaderV2Callback callback);
361+
362+
std::variant<Error, Reader> createReaderV2(const std::string& topic, const MessageId& startMessageId,
363+
const ReaderConfiguration& conf);
364+
335365
/**
336366
* Create a table view with given {@code TableViewConfiguration} for specified topic.
337367
*
@@ -362,6 +392,12 @@ class PULSAR_PUBLIC Client {
362392
void createTableViewAsync(const std::string& topic, const TableViewConfiguration& conf,
363393
const TableViewCallback& callBack);
364394

395+
void createTableViewAsyncV2(const std::string& topic, const TableViewConfiguration& conf,
396+
TableViewV2Callback callback);
397+
398+
std::variant<Error, TableView> createTableViewV2(const std::string& topic,
399+
const TableViewConfiguration& conf);
400+
365401
/**
366402
* Get the list of partitions for a given topic.
367403
*

lib/Client.cc

Lines changed: 75 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,35 @@
2626

2727
#include "ClientImpl.h"
2828
#include "Int64SerDes.h"
29-
#include "LogUtils.h"
3029
#include "LookupService.h"
3130
#include "TopicName.h"
3231
#include "Utils.h"
3332

34-
DECLARE_LOG_OBJECT()
35-
3633
namespace pulsar {
3734

35+
namespace {
36+
37+
template <typename T>
38+
void setPromiseValue(std::promise<std::variant<Error, T>>& promise, const std::variant<Error, T>& value) {
39+
if (const auto* error = std::get_if<Error>(&value)) {
40+
promise.set_value(*error);
41+
} else {
42+
promise.set_value(std::get<T>(value));
43+
}
44+
}
45+
46+
template <typename T>
47+
void invokeLegacyCallback(const std::function<void(Result, T)>& callback,
48+
const std::variant<Error, T>& value) {
49+
if (const auto* error = std::get_if<Error>(&value)) {
50+
callback(error->result, T());
51+
} else {
52+
callback(ResultOk, std::get<T>(value));
53+
}
54+
}
55+
56+
} // namespace
57+
3858
Client::Client(const std::shared_ptr<ClientImpl>& impl) : impl_(impl) { impl_->initialize(); }
3959

4060
Client::Client(const std::string& serviceUrl) : Client(serviceUrl, ClientConfiguration()) {}
@@ -83,13 +103,8 @@ void Client::createProducerAsyncV2(const std::string& topic, const ProducerConfi
83103
std::variant<Error, Producer> Client::createProducerV2(const std::string& topic,
84104
const ProducerConfiguration& conf) {
85105
std::promise<std::variant<Error, Producer>> promise;
86-
createProducerAsyncV2(topic, conf, [&promise](const auto& v) mutable {
87-
if (const auto* error = std::get_if<Error>(&v)) {
88-
promise.set_value(*error);
89-
} else {
90-
promise.set_value(std::get<Producer>(v));
91-
}
92-
});
106+
createProducerAsyncV2(topic, conf,
107+
[&promise](const auto& v) mutable { setPromiseValue<Producer>(promise, v); });
93108
return promise.get_future().get();
94109
}
95110

@@ -113,8 +128,22 @@ void Client::subscribeAsync(const std::string& topic, const std::string& subscri
113128

114129
void Client::subscribeAsync(const std::string& topic, const std::string& subscriptionName,
115130
const ConsumerConfiguration& conf, const SubscribeCallback& callback) {
116-
LOG_INFO("Subscribing on Topic :" << topic);
117-
impl_->subscribeAsync(topic, subscriptionName, conf, callback);
131+
subscribeAsyncV2(topic, subscriptionName, conf,
132+
[callback](const auto& value) { invokeLegacyCallback<Consumer>(callback, value); });
133+
}
134+
135+
void Client::subscribeAsyncV2(const SubscribeTopics& topics, const std::string& subscriptionName,
136+
const ConsumerConfiguration& conf, SubscribeV2Callback callback) {
137+
impl_->subscribeAsyncV2(topics, subscriptionName, conf, std::move(callback));
138+
}
139+
140+
std::variant<Error, Consumer> Client::subscribeV2(const SubscribeTopics& topics,
141+
const std::string& subscriptionName,
142+
const ConsumerConfiguration& conf) {
143+
std::promise<std::variant<Error, Consumer>> promise;
144+
subscribeAsyncV2(topics, subscriptionName, conf,
145+
[&promise](const auto& v) mutable { setPromiseValue<Consumer>(promise, v); });
146+
return promise.get_future().get();
118147
}
119148

120149
Result Client::subscribe(const std::vector<std::string>& topics, const std::string& subscriptionName,
@@ -138,7 +167,8 @@ void Client::subscribeAsync(const std::vector<std::string>& topics, const std::s
138167

139168
void Client::subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName,
140169
const ConsumerConfiguration& conf, const SubscribeCallback& callback) {
141-
impl_->subscribeAsync(topics, subscriptionName, conf, callback);
170+
subscribeAsyncV2(topics, subscriptionName, conf,
171+
[callback](const auto& value) { invokeLegacyCallback<Consumer>(callback, value); });
142172
}
143173

144174
Result Client::subscribeWithRegex(const std::string& regexPattern, const std::string& subscriptionName,
@@ -162,7 +192,8 @@ void Client::subscribeWithRegexAsync(const std::string& regexPattern, const std:
162192

163193
void Client::subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName,
164194
const ConsumerConfiguration& conf, const SubscribeCallback& callback) {
165-
impl_->subscribeWithRegexAsync(regexPattern, subscriptionName, conf, callback);
195+
subscribeAsyncV2(TopicRegex{regexPattern}, subscriptionName, conf,
196+
[callback](const auto& value) { invokeLegacyCallback<Consumer>(callback, value); });
166197
}
167198

168199
Result Client::createReader(const std::string& topic, const MessageId& startMessageId,
@@ -176,7 +207,21 @@ Result Client::createReader(const std::string& topic, const MessageId& startMess
176207

177208
void Client::createReaderAsync(const std::string& topic, const MessageId& startMessageId,
178209
const ReaderConfiguration& conf, const ReaderCallback& callback) {
179-
impl_->createReaderAsync(topic, startMessageId, conf, callback);
210+
createReaderAsyncV2(topic, startMessageId, conf,
211+
[callback](const auto& value) { invokeLegacyCallback<Reader>(callback, value); });
212+
}
213+
214+
void Client::createReaderAsyncV2(const std::string& topic, const MessageId& startMessageId,
215+
const ReaderConfiguration& conf, ReaderV2Callback callback) {
216+
impl_->createReaderAsyncV2(topic, startMessageId, conf, std::move(callback));
217+
}
218+
219+
std::variant<Error, Reader> Client::createReaderV2(const std::string& topic, const MessageId& startMessageId,
220+
const ReaderConfiguration& conf) {
221+
std::promise<std::variant<Error, Reader>> promise;
222+
createReaderAsyncV2(topic, startMessageId, conf,
223+
[&promise](const auto& v) mutable { setPromiseValue<Reader>(promise, v); });
224+
return promise.get_future().get();
180225
}
181226

182227
Result Client::createTableView(const std::string& topic, const TableViewConfiguration& conf,
@@ -190,7 +235,21 @@ Result Client::createTableView(const std::string& topic, const TableViewConfigur
190235

191236
void Client::createTableViewAsync(const std::string& topic, const TableViewConfiguration& conf,
192237
const TableViewCallback& callback) {
193-
impl_->createTableViewAsync(topic, conf, callback);
238+
createTableViewAsyncV2(
239+
topic, conf, [callback](const auto& value) { invokeLegacyCallback<TableView>(callback, value); });
240+
}
241+
242+
void Client::createTableViewAsyncV2(const std::string& topic, const TableViewConfiguration& conf,
243+
TableViewV2Callback callback) {
244+
impl_->createTableViewAsyncV2(topic, conf, std::move(callback));
245+
}
246+
247+
std::variant<Error, TableView> Client::createTableViewV2(const std::string& topic,
248+
const TableViewConfiguration& conf) {
249+
std::promise<std::variant<Error, TableView>> promise;
250+
createTableViewAsyncV2(topic, conf,
251+
[&promise](const auto& v) mutable { setPromiseValue<TableView>(promise, v); });
252+
return promise.get_future().get();
194253
}
195254

196255
Result Client::getPartitionsForTopic(const std::string& topic, std::vector<std::string>& partitions) {

0 commit comments

Comments
 (0)