Skip to content

Commit 9114edf

Browse files
committed
fix stale lookup service might be refered
1 parent 477fb94 commit 9114edf

11 files changed

Lines changed: 69 additions & 65 deletions

lib/Client.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,7 @@ uint64_t Client::getNumberOfConsumers() { return impl_->getNumberOfConsumers();
193193

194194
void Client::getSchemaInfoAsync(const std::string& topic, int64_t version,
195195
std::function<void(Result, const SchemaInfo&)> callback) {
196-
impl_->getLookup()
197-
->getSchema(TopicName::get(topic), (version >= 0) ? toBigEndianBytes(version) : "")
196+
impl_->getSchema(TopicName::get(topic), (version >= 0) ? toBigEndianBytes(version) : "")
198197
.addListener(std::move(callback));
199198
}
200199

lib/ClientImpl.cc

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -200,21 +200,20 @@ void ClientImpl::createProducerAsync(const std::string& topic, const ProducerCon
200200

201201
if (autoDownloadSchema) {
202202
auto self = shared_from_this();
203-
auto lookup = getLookup();
204-
lookup->getSchema(topicName).addListener(
203+
getSchema(topicName).addListener(
205204
[self, topicName, callback](Result res, const SchemaInfo& topicSchema) {
206205
if (res != ResultOk) {
207206
callback(res, Producer());
208207
return;
209208
}
210209
ProducerConfiguration conf;
211210
conf.setSchema(topicSchema);
212-
self->getLookup()->getPartitionMetadataAsync(topicName).addListener(
211+
self->getPartitionMetadataAsync(topicName).addListener(
213212
std::bind(&ClientImpl::handleCreateProducer, self, std::placeholders::_1,
214213
std::placeholders::_2, topicName, conf, callback));
215214
});
216215
} else {
217-
getLookup()->getPartitionMetadataAsync(topicName).addListener(
216+
getPartitionMetadataAsync(topicName).addListener(
218217
std::bind(&ClientImpl::handleCreateProducer, shared_from_this(), std::placeholders::_1,
219218
std::placeholders::_2, topicName, conf, callback));
220219
}
@@ -287,7 +286,7 @@ void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& st
287286
}
288287

289288
MessageId msgId(startMessageId);
290-
getLookup()->getPartitionMetadataAsync(topicName).addListener(
289+
getPartitionMetadataAsync(topicName).addListener(
291290
std::bind(&ClientImpl::handleReaderMetadataLookup, shared_from_this(), std::placeholders::_1,
292291
std::placeholders::_2, topicName, msgId, conf, callback));
293292
}
@@ -400,8 +399,7 @@ void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const
400399
return;
401400
}
402401

403-
getLookup()
404-
->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), mode)
402+
getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), mode)
405403
.addListener(std::bind(&ClientImpl::createPatternMultiTopicsConsumer, shared_from_this(),
406404
std::placeholders::_1, std::placeholders::_2, regexPattern, mode,
407405
subscriptionName, conf, callback));
@@ -423,9 +421,8 @@ void ClientImpl::createPatternMultiTopicsConsumer(Result result, const Namespace
423421

424422
auto interceptors = std::make_shared<ConsumerInterceptors>(conf.getInterceptors());
425423

426-
consumer = std::make_shared<PatternMultiTopicsConsumerImpl>(shared_from_this(), regexPattern, mode,
427-
*matchTopics, subscriptionName, conf,
428-
getLookup(), interceptors);
424+
consumer = std::make_shared<PatternMultiTopicsConsumerImpl>(
425+
shared_from_this(), regexPattern, mode, *matchTopics, subscriptionName, conf, interceptors);
429426

430427
consumer->getConsumerCreatedFuture().addListener(
431428
std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1,
@@ -472,7 +469,7 @@ void ClientImpl::subscribeAsync(const std::vector<std::string>& originalTopics,
472469
auto interceptors = std::make_shared<ConsumerInterceptors>(conf.getInterceptors());
473470

474471
ConsumerImplBasePtr consumer = std::make_shared<MultiTopicsConsumerImpl>(
475-
shared_from_this(), topics, subscriptionName, topicNamePtr, conf, getLookup(), interceptors);
472+
shared_from_this(), topics, subscriptionName, topicNamePtr, conf, interceptors);
476473

477474
consumer->getConsumerCreatedFuture().addListener(std::bind(&ClientImpl::handleConsumerCreated,
478475
shared_from_this(), std::placeholders::_1,
@@ -502,7 +499,7 @@ void ClientImpl::subscribeAsync(const std::string& topic, const std::string& sub
502499
}
503500
}
504501

505-
getLookup()->getPartitionMetadataAsync(topicName).addListener(
502+
getPartitionMetadataAsync(topicName).addListener(
506503
std::bind(&ClientImpl::handleSubscribe, shared_from_this(), std::placeholders::_1,
507504
std::placeholders::_2, topicName, subscriptionName, conf, callback));
508505
}
@@ -525,9 +522,9 @@ void ClientImpl::handleSubscribe(Result result, const LookupDataResultPtr& parti
525522
callback(ResultInvalidConfiguration, Consumer());
526523
return;
527524
}
528-
consumer = std::make_shared<MultiTopicsConsumerImpl>(
529-
shared_from_this(), topicName, partitionMetadata->getPartitions(), subscriptionName, conf,
530-
getLookup(), interceptors);
525+
consumer = std::make_shared<MultiTopicsConsumerImpl>(shared_from_this(), topicName,
526+
partitionMetadata->getPartitions(),
527+
subscriptionName, conf, interceptors);
531528
} else {
532529
auto consumerImpl = std::make_shared<ConsumerImpl>(shared_from_this(), topicName->toString(),
533530
subscriptionName, conf,
@@ -680,9 +677,9 @@ void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, const GetP
680677
return;
681678
}
682679
}
683-
getLookup()->getPartitionMetadataAsync(topicName).addListener(
684-
std::bind(&ClientImpl::handleGetPartitions, shared_from_this(), std::placeholders::_1,
685-
std::placeholders::_2, topicName, callback));
680+
getPartitionMetadataAsync(topicName).addListener(std::bind(&ClientImpl::handleGetPartitions,
681+
shared_from_this(), std::placeholders::_1,
682+
std::placeholders::_2, topicName, callback));
686683
}
687684

688685
void ClientImpl::closeAsync(const CloseCallback& callback) {
@@ -802,7 +799,9 @@ void ClientImpl::shutdown() {
802799
<< " consumers have been shutdown.");
803800
}
804801

805-
getLookup()->close();
802+
std::shared_lock lock(mutex_);
803+
lookupServicePtr_->close();
804+
lock.unlock();
806805
if (!pool_.close()) {
807806
// pool_ has already been closed. It means shutdown() has been called before.
808807
return;

lib/ClientImpl.h

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "ConnectionPool.h"
3030
#include "Future.h"
3131
#include "LookupDataResult.h"
32+
#include "LookupService.h"
3233
#include "MemoryLimitController.h"
3334
#include "ProtoApiEnums.h"
3435
#include "SynchronizedHashMap.h"
@@ -53,8 +54,6 @@ typedef std::weak_ptr<ConsumerImplBase> ConsumerImplBaseWeakPtr;
5354
class ClientConnection;
5455
using ClientConnectionPtr = std::shared_ptr<ClientConnection>;
5556

56-
class LookupService;
57-
using LookupServicePtr = std::shared_ptr<LookupService>;
5857
using LookupServiceFactory = std::function<LookupServicePtr(const std::string&, const ClientConfiguration&,
5958
ConnectionPool& pool, const AuthenticationPtr&)>;
6059

@@ -129,7 +128,6 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
129128
ExecutorServiceProviderPtr getIOExecutorProvider();
130129
ExecutorServiceProviderPtr getListenerExecutorProvider();
131130
ExecutorServiceProviderPtr getPartitionListenerExecutorProvider();
132-
LookupServicePtr getLookup(const std::string& redirectedClusterURI = "");
133131

134132
void cleanupProducer(ProducerImplBase* address) { producers_.remove(address); }
135133

@@ -143,6 +141,23 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
143141
void updateServiceInfo(ServiceInfo&& serviceInfo);
144142
ServiceInfo getServiceInfo() const;
145143

144+
// Since the underlying `lookupServicePtr_` can be modified by `updateServiceInfo`, we should not expose
145+
// it to other classes, otherwise the update might not be visible.
146+
auto getPartitionMetadataAsync(const TopicNamePtr& topicName) {
147+
std::shared_lock lock(mutex_);
148+
return lookupServicePtr_->getPartitionMetadataAsync(topicName);
149+
}
150+
151+
auto getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) {
152+
std::shared_lock lock(mutex_);
153+
return lookupServicePtr_->getTopicsOfNamespaceAsync(nsName, mode);
154+
}
155+
156+
auto getSchema(const TopicNamePtr& topicName, const std::string& version = "") {
157+
std::shared_lock lock(mutex_);
158+
return lookupServicePtr_->getSchema(topicName, version);
159+
}
160+
146161
static std::chrono::nanoseconds getOperationTimeout(const ClientConfiguration& clientConfiguration);
147162

148163
friend class PulsarFriend;
@@ -182,6 +197,7 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
182197
const std::string& logicalAddress);
183198

184199
LookupServicePtr createLookup(const std::string& serviceUrl);
200+
LookupServicePtr getLookup(const std::string& redirectedClusterURI);
185201

186202
static std::string getClientVersion(const ClientConfiguration& clientConfiguration);
187203

lib/MultiTopicsConsumerImpl.cc

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,20 +44,19 @@ using std::chrono::seconds;
4444
MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(const ClientImplPtr& client, const TopicNamePtr& topicName,
4545
int numPartitions, const std::string& subscriptionName,
4646
const ConsumerConfiguration& conf,
47-
const LookupServicePtr& lookupServicePtr,
4847
const ConsumerInterceptorsPtr& interceptors,
4948
Commands::SubscriptionMode subscriptionMode,
5049
const optional<MessageId>& startMessageId)
5150
: MultiTopicsConsumerImpl(client, {topicName->toString()}, subscriptionName, topicName, conf,
52-
lookupServicePtr, interceptors, subscriptionMode, startMessageId) {
51+
interceptors, subscriptionMode, startMessageId) {
5352
topicsPartitions_[topicName->toString()] = numPartitions;
5453
}
5554

5655
MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(
5756
const ClientImplPtr& client, const std::vector<std::string>& topics, const std::string& subscriptionName,
5857
const TopicNamePtr& topicName, const ConsumerConfiguration& conf,
59-
const LookupServicePtr& lookupServicePtr, const ConsumerInterceptorsPtr& interceptors,
60-
Commands::SubscriptionMode subscriptionMode, const optional<MessageId>& startMessageId)
58+
const ConsumerInterceptorsPtr& interceptors, Commands::SubscriptionMode subscriptionMode,
59+
const optional<MessageId>& startMessageId)
6160
: ConsumerImplBase(client, topicName ? topicName->toString() : "EmptyTopics",
6261
Backoff(milliseconds(100), seconds(60), milliseconds(0)), conf,
6362
client->getListenerExecutorProvider()->get()),
@@ -66,7 +65,6 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(
6665
conf_(conf),
6766
incomingMessages_(conf.getReceiverQueueSize()),
6867
messageListener_(conf.getMessageListener()),
69-
lookupServicePtr_(lookupServicePtr),
7068
numberTopicPartitions_(std::make_shared<std::atomic<int>>(0)),
7169
topics_(topics),
7270
subscriptionMode_(subscriptionMode),
@@ -93,7 +91,6 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(
9391
if (partitionsUpdateInterval > 0) {
9492
partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer();
9593
partitionsUpdateInterval_ = seconds(partitionsUpdateInterval);
96-
lookupServicePtr_ = client->getLookup();
9794
}
9895

9996
state_ = Pending;
@@ -185,7 +182,12 @@ Future<Result, Consumer> MultiTopicsConsumerImpl::subscribeOneTopicAsync(const s
185182
auto entry = topicsPartitions_.find(topic);
186183
if (entry == topicsPartitions_.end()) {
187184
lock.unlock();
188-
lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
185+
auto client = client_.lock();
186+
if (!client) {
187+
topicPromise->setFailed(ResultAlreadyClosed);
188+
return topicPromise->getFuture();
189+
}
190+
client->getPartitionMetadataAsync(topicName).addListener(
189191
[this, topicName, topicPromise](Result result, const LookupDataResultPtr& lookupDataResult) {
190192
if (result != ResultOk) {
191193
LOG_ERROR("Error Checking/Getting Partition Metadata while MultiTopics Subscribing- "
@@ -1003,7 +1005,11 @@ void MultiTopicsConsumerImpl::topicPartitionUpdate() {
10031005
auto topicName = TopicName::get(item.first);
10041006
auto currentNumPartitions = item.second;
10051007
auto weakSelf = weak_from_this();
1006-
lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
1008+
auto client = client_.lock();
1009+
if (!client) {
1010+
return;
1011+
}
1012+
client->getPartitionMetadataAsync(topicName).addListener(
10071013
[this, weakSelf, topicName, currentNumPartitions](Result result,
10081014
const LookupDataResultPtr& lookupDataResult) {
10091015
auto self = weakSelf.lock();

lib/MultiTopicsConsumerImpl.h

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,23 +46,19 @@ class MultiTopicsBrokerConsumerStatsImpl;
4646
using MultiTopicsBrokerConsumerStatsPtr = std::shared_ptr<MultiTopicsBrokerConsumerStatsImpl>;
4747
class UnAckedMessageTrackerInterface;
4848
using UnAckedMessageTrackerPtr = std::shared_ptr<UnAckedMessageTrackerInterface>;
49-
class LookupService;
50-
using LookupServicePtr = std::shared_ptr<LookupService>;
5149

5250
class MultiTopicsConsumerImpl;
5351
class MultiTopicsConsumerImpl : public ConsumerImplBase {
5452
public:
5553
MultiTopicsConsumerImpl(const ClientImplPtr& client, const TopicNamePtr& topicName, int numPartitions,
5654
const std::string& subscriptionName, const ConsumerConfiguration& conf,
57-
const LookupServicePtr& lookupServicePtr,
5855
const ConsumerInterceptorsPtr& interceptors,
5956
Commands::SubscriptionMode = Commands::SubscriptionModeDurable,
6057
const optional<MessageId>& startMessageId = optional<MessageId>{});
6158

6259
MultiTopicsConsumerImpl(const ClientImplPtr& client, const std::vector<std::string>& topics,
6360
const std::string& subscriptionName, const TopicNamePtr& topicName,
64-
const ConsumerConfiguration& conf, const LookupServicePtr& lookupServicePtr_,
65-
const ConsumerInterceptorsPtr& interceptors,
61+
const ConsumerConfiguration& conf, const ConsumerInterceptorsPtr& interceptors,
6662
Commands::SubscriptionMode = Commands::SubscriptionModeDurable,
6763
const optional<MessageId>& startMessageId = optional<MessageId>{});
6864

@@ -119,7 +115,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
119115
MessageListener messageListener_;
120116
DeadlineTimerPtr partitionsUpdateTimer_;
121117
TimeDuration partitionsUpdateInterval_;
122-
LookupServicePtr lookupServicePtr_;
123118
std::shared_ptr<std::atomic<int>> numberTopicPartitions_;
124119
std::atomic<Result> failedResult{ResultOk};
125120
Promise<Result, ConsumerImplBaseWeakPtr> multiTopicsConsumerCreatedPromise_;

lib/PartitionedProducerImpl.cc

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
#include "ClientImpl.h"
2424
#include "ExecutorService.h"
2525
#include "LogUtils.h"
26-
#include "LookupService.h"
2726
#include "ProducerImpl.h"
2827
#include "RoundRobinMessageRouter.h"
2928
#include "SinglePartitionMessageRouter.h"
@@ -59,7 +58,6 @@ PartitionedProducerImpl::PartitionedProducerImpl(const ClientImplPtr& client, co
5958
listenerExecutor_ = client->getListenerExecutorProvider()->get();
6059
partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer();
6160
partitionsUpdateInterval_ = std::chrono::seconds(partitionsUpdateInterval);
62-
lookupServicePtr_ = client->getLookup();
6361
}
6462
}
6563

@@ -433,7 +431,11 @@ void PartitionedProducerImpl::runPartitionUpdateTask() {
433431
void PartitionedProducerImpl::getPartitionMetadata() {
434432
using namespace std::placeholders;
435433
auto weakSelf = weak_from_this();
436-
lookupServicePtr_->getPartitionMetadataAsync(topicName_)
434+
auto client = client_.lock();
435+
if (!client) {
436+
return;
437+
}
438+
client->getPartitionMetadataAsync(topicName_)
437439
.addListener([weakSelf](Result result, const LookupDataResultPtr& lookupDataResult) {
438440
auto self = weakSelf.lock();
439441
if (self) {

lib/PartitionedProducerImpl.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,6 @@ using ClientImplPtr = std::shared_ptr<ClientImpl>;
3838
using ClientImplWeakPtr = std::weak_ptr<ClientImpl>;
3939
class ExecutorService;
4040
using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
41-
class LookupService;
42-
using LookupServicePtr = std::shared_ptr<LookupService>;
4341
class ProducerImpl;
4442
using ProducerImplPtr = std::shared_ptr<ProducerImpl>;
4543
class TopicName;
@@ -133,7 +131,6 @@ class PartitionedProducerImpl : public ProducerImplBase,
133131
ExecutorServicePtr listenerExecutor_;
134132
DeadlineTimerPtr partitionsUpdateTimer_;
135133
TimeDuration partitionsUpdateInterval_;
136-
LookupServicePtr lookupServicePtr_;
137134

138135
ProducerInterceptorsPtr interceptors_;
139136

lib/PatternMultiTopicsConsumerImpl.cc

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
#include "ClientImpl.h"
2222
#include "ExecutorService.h"
2323
#include "LogUtils.h"
24-
#include "LookupService.h"
2524

2625
DECLARE_LOG_OBJECT()
2726

@@ -32,10 +31,8 @@ using std::chrono::seconds;
3231
PatternMultiTopicsConsumerImpl::PatternMultiTopicsConsumerImpl(
3332
const ClientImplPtr& client, const std::string& pattern, CommandGetTopicsOfNamespace_Mode getTopicsMode,
3433
const std::vector<std::string>& topics, const std::string& subscriptionName,
35-
const ConsumerConfiguration& conf, const LookupServicePtr& lookupServicePtr_,
36-
const ConsumerInterceptorsPtr& interceptors)
37-
: MultiTopicsConsumerImpl(client, topics, subscriptionName, TopicName::get(pattern), conf,
38-
lookupServicePtr_, interceptors),
34+
const ConsumerConfiguration& conf, const ConsumerInterceptorsPtr& interceptors)
35+
: MultiTopicsConsumerImpl(client, topics, subscriptionName, TopicName::get(pattern), conf, interceptors),
3936
patternString_(pattern),
4037
pattern_(PULSAR_REGEX_NAMESPACE::regex(TopicName::removeDomain(pattern))),
4138
getTopicsMode_(getTopicsMode),
@@ -84,7 +81,11 @@ void PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const ASIO_ERROR& er
8481
// already get namespace from pattern.
8582
assert(namespaceName_);
8683

87-
lookupServicePtr_->getTopicsOfNamespaceAsync(namespaceName_, getTopicsMode_)
84+
auto client = client_.lock();
85+
if (!client) {
86+
return;
87+
}
88+
client->getTopicsOfNamespaceAsync(namespaceName_, getTopicsMode_)
8889
.addListener(std::bind(&PatternMultiTopicsConsumerImpl::timerGetTopicsOfNamespace, this,
8990
std::placeholders::_1, std::placeholders::_2));
9091
}

lib/PatternMultiTopicsConsumerImpl.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ class PatternMultiTopicsConsumerImpl : public MultiTopicsConsumerImpl {
5252
CommandGetTopicsOfNamespace_Mode getTopicsMode,
5353
const std::vector<std::string>& topics,
5454
const std::string& subscriptionName, const ConsumerConfiguration& conf,
55-
const LookupServicePtr& lookupServicePtr_,
5655
const ConsumerInterceptorsPtr& interceptors);
5756
~PatternMultiTopicsConsumerImpl() override;
5857

lib/ReaderImpl.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@ void ReaderImpl::start(const MessageId& startMessageId,
9090
if (partitions_ > 0) {
9191
auto consumerImpl = std::make_shared<MultiTopicsConsumerImpl>(
9292
client_.lock(), TopicName::get(topic_), partitions_, subscription, consumerConf,
93-
client_.lock()->getLookup(),
9493
std::make_shared<ConsumerInterceptors>(std::vector<ConsumerInterceptorPtr>()),
9594
Commands::SubscriptionModeNonDurable, startMessageId);
9695
consumer_ = consumerImpl;

0 commit comments

Comments
 (0)