Skip to content

Commit b2bc7d7

Browse files
committed
fix other tests
1 parent cdb2128 commit b2bc7d7

4 files changed

Lines changed: 21 additions & 6 deletions

File tree

include/pulsar/ServiceInfoProvider.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,13 @@ class PULSAR_PUBLIC ServiceInfoProvider {
3333
*/
3434
virtual ~ServiceInfoProvider() = default;
3535

36+
/**
37+
* Get the current `ServiceInfo` connection for the client.
38+
* This method is called **only once** internally when the `Client` is being initialized, and the client
39+
* will use the returned `ServiceInfo` to establish the initial connection to the Pulsar cluster.
40+
*/
41+
virtual ServiceInfo getServiceInfo() const = 0;
42+
3643
/**
3744
* Initialize the ServiceInfoProvider.
3845
*

lib/ClientImpl.cc

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,13 +129,20 @@ ClientImpl::ClientImpl(std::unique_ptr<ServiceInfoProvider> serviceInfoProvider,
129129
if (loggerFactory) {
130130
LogUtils::setLoggerFactory(std::move(loggerFactory));
131131
}
132+
133+
serviceInfo_.store(std::make_shared<const ServiceInfo>(serviceInfoProvider_->getServiceInfo()));
134+
lookupServicePtr_ = createLookup(*serviceInfo_.load());
132135
}
133136

134137
ClientImpl::~ClientImpl() { shutdown(); }
135138

136139
void ClientImpl::initialize() {
137-
serviceInfoProvider_->initialize(
138-
[this](ServiceInfo serviceInfo) { updateServiceInfo(std::move(serviceInfo)); });
140+
auto weakSelf = weak_from_this();
141+
serviceInfoProvider_->initialize([weakSelf](ServiceInfo serviceInfo) {
142+
if (auto self = weakSelf.lock()) {
143+
self->updateServiceInfo(std::move(serviceInfo));
144+
}
145+
});
139146
}
140147

141148
LookupServicePtr ClientImpl::createLookup(ServiceInfo serviceInfo) {

lib/DefaultServiceUrlProvider.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ class DefaultServiceUrlProvider : public ServiceInfoProvider {
3131
DefaultServiceUrlProvider(const std::string& serviceUrl, const ClientConfigurationImpl& config)
3232
: serviceInfo_(config.toServiceInfo(serviceUrl)) {}
3333

34-
void initialize(std::function<void(ServiceInfo)> onServiceInfoUpdate) override {
35-
onServiceInfoUpdate(serviceInfo_);
36-
}
34+
ServiceInfo getServiceInfo() const override { return serviceInfo_; }
35+
36+
void initialize(std::function<void(ServiceInfo)> onServiceInfoUpdate) override {}
3737

3838
private:
3939
ServiceInfo serviceInfo_;

tests/ServiceInfoProviderTest.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ class TestServiceInfoProvider : public ServiceInfoProvider {
7474
public:
7575
TestServiceInfoProvider(ServiceInfoQueue &queue) : queue_(queue) {}
7676

77+
ServiceInfo getServiceInfo() const override { return queue_.pop(); }
78+
7779
void initialize(std::function<void(ServiceInfo)> onServiceInfoUpdate) override {
7880
onServiceInfoUpdate(queue_.pop());
7981
thread_ = std::thread([this, onServiceInfoUpdate] {
@@ -99,7 +101,6 @@ class TestServiceInfoProvider : public ServiceInfoProvider {
99101

100102
std::thread thread_;
101103
mutable std::mutex mutex_;
102-
std::queue<ServiceInfo> newServiceInfo_;
103104
};
104105

105106
TEST(ServiceInfoProviderTest, testSwitchCluster) {

0 commit comments

Comments
 (0)