Skip to content

Commit 1144c27

Browse files
committed
clean up implementation
1 parent b91860a commit 1144c27

7 files changed

Lines changed: 46 additions & 56 deletions

File tree

include/pulsar/ServiceInfoProvider.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,9 @@
1919
#ifndef PULSAR_SERVICE_INFO_PROVIDER_H_
2020
#define PULSAR_SERVICE_INFO_PROVIDER_H_
2121

22+
#include <pulsar/ClientConfiguration.h>
2223
#include <pulsar/ServiceInfo.h>
2324

24-
#include <functional>
25-
2625
namespace pulsar {
2726

2827
class PULSAR_PUBLIC ServiceInfoProvider {
@@ -34,11 +33,12 @@ class PULSAR_PUBLIC ServiceInfoProvider {
3433
virtual ~ServiceInfoProvider() = default;
3534

3635
/**
37-
* Get the current `ServiceInfo` connection for the client.
38-
* This method is called **only once** internally when the `Client` is being initialized, and the client
39-
* will use the returned `ServiceInfo` to establish the initial connection to the Pulsar cluster.
36+
* Get the initial `ServiceInfo` connection for the client.
37+
* This method is called **only once** internally in `Client::create()` to get the initial `ServiceInfo`
38+
* for the client to connect to the Pulsar service. Since it's only called once, it's legal to return a
39+
* moved `ServiceInfo` object to avoid unnecessary copying.
4040
*/
41-
virtual ServiceInfo getServiceInfo() const = 0;
41+
virtual ServiceInfo initialServiceInfo() = 0;
4242

4343
/**
4444
* Initialize the ServiceInfoProvider.

lib/AtomicSharedPtr.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ class AtomicSharedPtr {
2727
public:
2828
using Pointer = std::shared_ptr<const T>;
2929

30+
AtomicSharedPtr() = default;
31+
explicit AtomicSharedPtr(T value) : ptr_(std::make_shared<const T>(std::move(value))) {}
32+
3033
auto load() const { return std::atomic_load(&ptr_); }
3134

3235
void store(Pointer&& newPtr) { std::atomic_store(&ptr_, std::move(newPtr)); }

lib/ClientConnection.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,6 @@ static bool file_exists(const std::string& path) {
182182

183183
std::atomic<int32_t> ClientConnection::maxMessageSize_{Commands::DefaultMaxMessageSize};
184184

185-
// NOTE: we should get the connection info from `serviceInfo` rather than `clientConfiguration` because it can
186-
// be modified dynamically.
187185
ClientConnection::ClientConnection(const std::string& logicalAddress, const std::string& physicalAddress,
188186
const ServiceInfo& serviceInfo, const ExecutorServicePtr& executor,
189187
const ClientConfiguration& clientConfiguration,

lib/ClientImpl.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ ClientImpl::ClientImpl(std::unique_ptr<ServiceInfoProvider> serviceInfoProvider,
111111
: serviceInfoProvider_(std::move(serviceInfoProvider)),
112112
state_(Open),
113113
clientConfiguration_(clientConfiguration),
114+
serviceInfo_(serviceInfoProvider_->initialServiceInfo()),
114115
memoryLimitController_(clientConfiguration.getMemoryLimit()),
115116
ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(clientConfiguration_.getIOThreads())),
116117
listenerExecutorProvider_(
@@ -130,7 +131,6 @@ ClientImpl::ClientImpl(std::unique_ptr<ServiceInfoProvider> serviceInfoProvider,
130131
LogUtils::setLoggerFactory(std::move(loggerFactory));
131132
}
132133

133-
serviceInfo_.store(std::make_shared<const ServiceInfo>(serviceInfoProvider_->getServiceInfo()));
134134
lookupServicePtr_ = createLookup(*serviceInfo_.load());
135135
}
136136

@@ -768,8 +768,8 @@ void ClientImpl::handleClose(Result result, const SharedInt& numberOfOpenHandler
768768
}
769769

770770
LOG_DEBUG("Shutting down producers and consumers for client");
771-
// handleClose() is called in ExecutorService's event loop, while shutdown() tried to wait the
772-
// event loop exits. So here we use another thread to call shutdown().
771+
// handleClose() is called in ExecutorService's event loop, while shutdown() tried to wait the event
772+
// loop exits. So here we use another thread to call shutdown().
773773
auto self = shared_from_this();
774774
std::thread shutdownTask{[this, self, callback] {
775775
shutdown();

lib/DefaultServiceUrlProvider.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class DefaultServiceUrlProvider : public ServiceInfoProvider {
3131
DefaultServiceUrlProvider(const std::string& serviceUrl, const ClientConfigurationImpl& config)
3232
: serviceInfo_(config.toServiceInfo(serviceUrl)) {}
3333

34-
ServiceInfo getServiceInfo() const override { return serviceInfo_; }
34+
ServiceInfo initialServiceInfo() override { return std::move(serviceInfo_); }
3535

3636
void initialize(std::function<void(ServiceInfo)> onServiceInfoUpdate) override {}
3737

tests/ClientTest.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@
1717
* under the License.
1818
*/
1919
#include <gtest/gtest.h>
20-
#include <pulsar/Authentication.h>
2120
#include <pulsar/Client.h>
2221
#include <pulsar/Version.h>
2322

2423
#include <algorithm>
2524
#include <chrono>
2625
#include <future>
26+
#include <sstream>
2727

2828
#include "MockClientImpl.h"
2929
#include "PulsarAdminHelper.h"
@@ -32,6 +32,7 @@
3232
#include "lib/ClientConnection.h"
3333
#include "lib/LogUtils.h"
3434
#include "lib/checksum/ChecksumProvider.h"
35+
#include "lib/stats/ProducerStatsImpl.h"
3536

3637
DECLARE_LOG_OBJECT()
3738

tests/ServiceInfoProviderTest.cc

Lines changed: 31 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,9 @@
2020
#include <pulsar/Client.h>
2121

2222
#include <atomic>
23-
#include <condition_variable>
2423
#include <memory>
2524
#include <mutex>
26-
#include <queue>
27-
#include <stdexcept>
25+
#include <optional>
2826
#include <thread>
2927

3028
#include "PulsarFriend.h"
@@ -36,70 +34,62 @@ DECLARE_LOG_OBJECT()
3634
using namespace pulsar;
3735
using namespace std::chrono_literals;
3836

39-
class ServiceInfoQueue {
37+
class ServiceInfoHolder {
4038
public:
41-
void push(ServiceInfo info) {
42-
{
43-
std::lock_guard<std::mutex> lock(mutex_);
44-
queue_.push(std::move(info));
45-
}
46-
cond_.notify_all();
47-
}
39+
ServiceInfoHolder(ServiceInfo info) : serviceInfo_(std::move(info)) {}
4840

49-
ServiceInfo pop() {
50-
std::unique_lock<std::mutex> lock(mutex_);
51-
cond_.wait(lock, [this] { return !queue_.empty() || !running_; });
52-
if (queue_.empty()) {
53-
throw std::runtime_error("Queue is closed");
41+
std::optional<ServiceInfo> getUpdatedValue() {
42+
std::lock_guard lock(mutex_);
43+
if (!owned_) {
44+
return std::nullopt;
5445
}
55-
56-
ServiceInfo info = std::move(queue_.front());
57-
queue_.pop();
58-
return info;
46+
owned_ = false;
47+
return std::move(serviceInfo_);
5948
}
6049

61-
void close() {
62-
running_ = false;
63-
cond_.notify_all();
50+
void updateValue(ServiceInfo info) {
51+
std::lock_guard lock(mutex_);
52+
serviceInfo_ = std::move(info);
53+
owned_ = true;
6454
}
6555

6656
private:
57+
ServiceInfo serviceInfo_;
58+
bool owned_{true};
59+
6760
mutable std::mutex mutex_;
68-
mutable std::condition_variable cond_;
69-
std::queue<ServiceInfo> queue_;
70-
std::atomic_bool running_{true};
7161
};
7262

7363
class TestServiceInfoProvider : public ServiceInfoProvider {
7464
public:
75-
TestServiceInfoProvider(ServiceInfoQueue &queue) : queue_(queue) {}
65+
TestServiceInfoProvider(ServiceInfoHolder &serviceInfo) : serviceInfo_(serviceInfo) {}
7666

77-
ServiceInfo getServiceInfo() const override { return queue_.pop(); }
67+
ServiceInfo initialServiceInfo() override { return serviceInfo_.getUpdatedValue().value(); }
7868

7969
void initialize(std::function<void(ServiceInfo)> onServiceInfoUpdate) override {
80-
onServiceInfoUpdate(queue_.pop());
8170
thread_ = std::thread([this, onServiceInfoUpdate] {
82-
try {
83-
while (true) {
84-
ServiceInfo info = queue_.pop();
85-
onServiceInfoUpdate(std::move(info));
71+
while (running_) {
72+
auto updatedValue = serviceInfo_.getUpdatedValue();
73+
if (updatedValue) {
74+
onServiceInfoUpdate(std::move(*updatedValue));
8675
}
87-
} catch (const std::runtime_error &) {
76+
// Use a tight wait loop for tests
77+
std::this_thread::sleep_for(10ms);
8878
}
8979
});
9080
}
9181

9282
~TestServiceInfoProvider() override {
93-
queue_.close();
83+
running_ = false;
9484
if (thread_.joinable()) {
9585
thread_.join();
9686
}
9787
}
9888

9989
private:
100-
ServiceInfoQueue &queue_;
101-
10290
std::thread thread_;
91+
ServiceInfoHolder &serviceInfo_;
92+
std::atomic_bool running_{true};
10393
mutable std::mutex mutex_;
10494
};
10595

@@ -114,10 +104,8 @@ TEST(ServiceInfoProviderTest, testSwitchCluster) {
114104
// Access "public/default" namespace in cluster 1, which doesn't require authentication
115105
ServiceInfo info3{"pulsar://localhost:6650"};
116106

117-
ServiceInfoQueue queue;
118-
queue.push(info1);
119-
120-
auto client = Client::create(std::make_unique<TestServiceInfoProvider>(queue), {});
107+
ServiceInfoHolder serviceInfo{info1};
108+
auto client = Client::create(std::make_unique<TestServiceInfoProvider>(serviceInfo), {});
121109

122110
const auto topicRequiredAuth = "private/auth/testUpdateConnectionInfo-" + std::to_string(time(nullptr));
123111
Producer producer;
@@ -141,7 +129,7 @@ TEST(ServiceInfoProviderTest, testSwitchCluster) {
141129

142130
// Switch to cluster 2 (started by ./build-support/start-mim-test-service-inside-container.sh)
143131
ASSERT_FALSE(PulsarFriend::getConnections(client).empty());
144-
queue.push(info2);
132+
serviceInfo.updateValue(info2);
145133
ASSERT_TRUE(waitUntil(1s, [&] {
146134
return PulsarFriend::getConnections(client).empty() && client.getServiceInfo() == info2;
147135
}));
@@ -152,7 +140,7 @@ TEST(ServiceInfoProviderTest, testSwitchCluster) {
152140
// Switch back to cluster 1 without any authentication, the previous authentication info configured for
153141
// cluster 2 will be cleared.
154142
ASSERT_FALSE(PulsarFriend::getConnections(client).empty());
155-
queue.push(info3);
143+
serviceInfo.updateValue(info3);
156144
ASSERT_TRUE(waitUntil(1s, [&] {
157145
return PulsarFriend::getConnections(client).empty() && client.getServiceInfo() == info3;
158146
}));

0 commit comments

Comments
 (0)