Skip to content

Commit bfb48a0

Browse files
committed
Revert "Use the same connection when producer or consumer reconnect"
This reverts commit 768c75a.
1 parent 768c75a commit bfb48a0

10 files changed

Lines changed: 37 additions & 61 deletions

lib/BinaryProtoLookupService.cc

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,9 @@ auto BinaryProtoLookupService::findBroker(const std::string& address, bool autho
4747
}
4848

4949
// NOTE: we can use move capture for topic since C++14
50-
cnxPool_.getRandomConnectionAsync(address).addListener([this, promise, topic, address, authoritative,
51-
redirectCount](
52-
Result result,
53-
const ClientConnectionWeakPtr& weakCnx) {
50+
cnxPool_.getConnectionAsync(address).addListener([this, promise, topic, address, authoritative,
51+
redirectCount](Result result,
52+
const ClientConnectionWeakPtr& weakCnx) {
5453
if (result != ResultOk) {
5554
promise->setFailed(result);
5655
return;
@@ -110,7 +109,7 @@ Future<Result, LookupDataResultPtr> BinaryProtoLookupService::getPartitionMetada
110109
}
111110
std::string lookupName = topicName->toString();
112111
const auto address = serviceNameResolver_.resolveHost();
113-
cnxPool_.getRandomConnectionAsync(address, address)
112+
cnxPool_.getConnectionAsync(address, address)
114113
.addListener(std::bind(&BinaryProtoLookupService::sendPartitionMetadataLookupRequest, this,
115114
lookupName, std::placeholders::_1, std::placeholders::_2, promise));
116115
return promise->getFuture();
@@ -163,7 +162,7 @@ Future<Result, NamespaceTopicsPtr> BinaryProtoLookupService::getTopicsOfNamespac
163162
return promise->getFuture();
164163
}
165164
std::string namespaceName = nsName->toString();
166-
cnxPool_.getRandomConnectionAsync(serviceNameResolver_.resolveHost())
165+
cnxPool_.getConnectionAsync(serviceNameResolver_.resolveHost())
167166
.addListener(std::bind(&BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest, this,
168167
namespaceName, mode, std::placeholders::_1, std::placeholders::_2, promise));
169168
return promise->getFuture();
@@ -177,7 +176,7 @@ Future<Result, SchemaInfo> BinaryProtoLookupService::getSchema(const TopicNamePt
177176
promise->setFailed(ResultInvalidTopicName);
178177
return promise->getFuture();
179178
}
180-
cnxPool_.getRandomConnectionAsync(serviceNameResolver_.resolveHost())
179+
cnxPool_.getConnectionAsync(serviceNameResolver_.resolveHost())
181180
.addListener(std::bind(&BinaryProtoLookupService::sendGetSchemaRequest, this, topicName->toString(),
182181
version, std::placeholders::_1, std::placeholders::_2, promise));
183182

lib/ClientImpl.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -516,7 +516,7 @@ void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr co
516516
}
517517
}
518518

519-
Future<Result, ClientConnectionPtr> ClientImpl::getConnection(const std::string& topic, int key) {
519+
Future<Result, ClientConnectionPtr> ClientImpl::getConnection(const std::string& topic) {
520520
Promise<Result, ClientConnectionPtr> promise;
521521

522522
const auto topicNamePtr = TopicName::get(topic);
@@ -528,12 +528,12 @@ Future<Result, ClientConnectionPtr> ClientImpl::getConnection(const std::string&
528528

529529
auto self = shared_from_this();
530530
lookupServicePtr_->getBroker(*topicNamePtr)
531-
.addListener([this, self, promise, key](Result result, const LookupService::LookupResult& data) {
531+
.addListener([this, self, promise](Result result, const LookupService::LookupResult& data) {
532532
if (result != ResultOk) {
533533
promise.setFailed(result);
534534
return;
535535
}
536-
pool_.getConnectionAsync(data.logicalAddress, data.physicalAddress, key)
536+
pool_.getConnectionAsync(data.logicalAddress, data.physicalAddress)
537537
.addListener([promise](Result result, const ClientConnectionWeakPtr& weakCnx) {
538538
if (result == ResultOk) {
539539
auto cnx = weakCnx.lock();

lib/ClientImpl.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
9595

9696
void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback);
9797

98-
Future<Result, ClientConnectionPtr> getConnection(const std::string& topic, int key);
98+
Future<Result, ClientConnectionPtr> getConnection(const std::string& topic);
9999

100100
void closeAsync(CloseCallback callback);
101101
void shutdown();
@@ -123,8 +123,6 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
123123

124124
std::shared_ptr<std::atomic<uint64_t>> getRequestIdGenerator() const { return requestIdGenerator_; }
125125

126-
ConnectionPool& getConnectionPool() noexcept { return pool_; }
127-
128126
friend class PulsarFriend;
129127

130128
private:

lib/ConnectionPool.cc

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,8 @@ bool ConnectionPool::close() {
6161
return true;
6262
}
6363

64-
Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(const std::string& logicalAddress,
65-
const std::string& physicalAddress,
66-
int keySuffix) {
64+
Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(
65+
const std::string& logicalAddress, const std::string& physicalAddress) {
6766
if (closed_) {
6867
Promise<Result, ClientConnectionWeakPtr> promise;
6968
promise.setFailed(ResultAlreadyClosed);
@@ -73,7 +72,7 @@ Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(const
7372
std::unique_lock<std::recursive_mutex> lock(mutex_);
7473

7574
std::stringstream ss;
76-
ss << logicalAddress << '-' << keySuffix;
75+
ss << logicalAddress << '-' << randomDistribution_(randomEngine_);
7776
const std::string key = ss.str();
7877

7978
PoolMap::iterator cnxIt = pool_.find(key);
@@ -96,8 +95,7 @@ Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(const
9695
// No valid or pending connection found in the pool, creating a new one
9796
ClientConnectionPtr cnx;
9897
try {
99-
size_t index = executorIndex_++;
100-
cnx.reset(new ClientConnection(logicalAddress, physicalAddress, executorProvider_->get(index),
98+
cnx.reset(new ClientConnection(logicalAddress, physicalAddress, executorProvider_->get(),
10199
clientConfiguration_, authentication_, clientVersion_, *this));
102100
} catch (const std::runtime_error& e) {
103101
lock.unlock();

lib/ConnectionPool.h

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -65,27 +65,15 @@ class PULSAR_PUBLIC ConnectionPool {
6565
* a proxy layer. Essentially, the pool is using the logical address as a way to
6666
* decide whether to reuse a particular connection.
6767
*
68-
* There could be many connections to the same broker, so this pool uses an integer key as the suffix of
69-
* the key that represents the connection.
70-
*
7168
* @param logicalAddress the address to use as the broker tag
7269
* @param physicalAddress the real address where the TCP connection should be made
73-
* @param keySuffix the key suffix to choose which connection on the same broker
7470
* @return a future that will produce the ClientCnx object
7571
*/
7672
Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const std::string& logicalAddress,
77-
const std::string& physicalAddress,
78-
int keySuffix);
79-
80-
int generateRandomIndex() { return randomDistribution_(randomEngine_); }
81-
82-
Future<Result, ClientConnectionWeakPtr> getRandomConnectionAsync(const std::string& logicalAddress,
83-
const std::string& physicalAddress) {
84-
return getConnectionAsync(logicalAddress, physicalAddress, generateRandomIndex());
85-
}
73+
const std::string& physicalAddress);
8674

87-
Future<Result, ClientConnectionWeakPtr> getRandomConnectionAsync(const std::string& address) {
88-
return getRandomConnectionAsync(address, address);
75+
Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const std::string& address) {
76+
return getConnectionAsync(address, address);
8977
}
9078

9179
private:
@@ -97,8 +85,6 @@ class PULSAR_PUBLIC ConnectionPool {
9785
const std::string clientVersion_;
9886
mutable std::recursive_mutex mutex_;
9987
std::atomic_bool closed_{false};
100-
// Use a separated index so that connections will be distributed uniformly across the executors
101-
std::atomic_size_t executorIndex_{0};
10288

10389
std::uniform_int_distribution<> randomDistribution_;
10490
std::mt19937 randomEngine_;

lib/ExecutorService.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,10 @@ void ExecutorService::postWork(std::function<void(void)> task) { io_service_.pos
133133
ExecutorServiceProvider::ExecutorServiceProvider(int nthreads)
134134
: executors_(nthreads), executorIdx_(0), mutex_() {}
135135

136-
ExecutorServicePtr ExecutorServiceProvider::get(size_t idx) {
137-
idx %= executors_.size();
136+
ExecutorServicePtr ExecutorServiceProvider::get() {
138137
Lock lock(mutex_);
139138

139+
int idx = executorIdx_++ % executors_.size();
140140
if (!executors_[idx]) {
141141
executors_[idx] = ExecutorService::create();
142142
}

lib/ExecutorService.h

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,17 +88,15 @@ class PULSAR_PUBLIC ExecutorServiceProvider {
8888
public:
8989
explicit ExecutorServiceProvider(int nthreads);
9090

91-
ExecutorServicePtr get() { return get(executorIdx_++); }
92-
93-
ExecutorServicePtr get(size_t index);
91+
ExecutorServicePtr get();
9492

9593
// See TimeoutProcessor for the semantics of the parameter.
9694
void close(long timeoutMs = 3000);
9795

9896
private:
9997
typedef std::vector<ExecutorServicePtr> ExecutorList;
10098
ExecutorList executors_;
101-
std::atomic_size_t executorIdx_;
99+
int executorIdx_;
102100
std::mutex mutex_;
103101
typedef std::unique_lock<std::mutex> Lock;
104102
};

lib/HandlerBase.cc

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ namespace pulsar {
3232
HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, const Backoff& backoff)
3333
: topic_(std::make_shared<std::string>(topic)),
3434
client_(client),
35-
connectionKeySuffix_(client->getConnectionPool().generateRandomIndex()),
3635
executor_(client->getIOExecutorProvider()->get()),
3736
mutex_(),
3837
creationTimestamp_(TimeUtils::now()),
@@ -89,23 +88,22 @@ void HandlerBase::grabCnx() {
8988
return;
9089
}
9190
auto self = shared_from_this();
92-
client->getConnection(topic(), connectionKeySuffix_)
93-
.addListener([this, self](Result result, const ClientConnectionPtr& cnx) {
94-
if (result == ResultOk) {
95-
LOG_DEBUG(getName() << "Connected to broker: " << cnx->cnxString());
96-
connectionOpened(cnx).addListener([this, self](Result result, bool) {
97-
// Do not use bool, only Result.
98-
reconnectionPending_ = false;
99-
if (isResultRetryable(result)) {
100-
scheduleReconnection();
101-
}
102-
});
103-
} else {
104-
connectionFailed(result);
91+
client->getConnection(topic()).addListener([this, self](Result result, const ClientConnectionPtr& cnx) {
92+
if (result == ResultOk) {
93+
LOG_DEBUG(getName() << "Connected to broker: " << cnx->cnxString());
94+
connectionOpened(cnx).addListener([this, self](Result result, bool) {
95+
// Do not use bool, only Result.
10596
reconnectionPending_ = false;
106-
scheduleReconnection();
107-
}
108-
});
97+
if (isResultRetryable(result)) {
98+
scheduleReconnection();
99+
}
100+
});
101+
} else {
102+
connectionFailed(result);
103+
reconnectionPending_ = false;
104+
scheduleReconnection();
105+
}
106+
});
109107
}
110108

111109
void HandlerBase::handleDisconnection(Result result, const ClientConnectionPtr& cnx) {

lib/HandlerBase.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
9999

100100
protected:
101101
ClientImplWeakPtr client_;
102-
const int connectionKeySuffix_;
103102
ExecutorServicePtr executor_;
104103
mutable std::mutex mutex_;
105104
std::mutex pendingReceiveMutex_;

lib/ProducerImpl.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -966,7 +966,7 @@ bool ProducerImpl::encryptMessage(proto::MessageMetadata& metadata, SharedBuffer
966966
}
967967

968968
void ProducerImpl::disconnectProducer() {
969-
LOG_INFO("Broker notification of Closed producer: " << producerId_);
969+
LOG_DEBUG("Broker notification of Closed producer: " << producerId_);
970970
resetCnx();
971971
scheduleReconnection();
972972
}

0 commit comments

Comments
 (0)