Skip to content

Commit 2bfef36

Browse files
committed
Use the same connection when producer or consumer reconnect
(cherry picked from commit 768c75a)
1 parent 0baf526 commit 2bfef36

9 files changed

Lines changed: 60 additions & 36 deletions

lib/BinaryProtoLookupService.cc

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,10 @@ auto BinaryProtoLookupService::findBroker(const std::string& address, bool autho
4747
}
4848

4949
// NOTE: we can use move capture for topic since C++14
50-
cnxPool_.getConnectionAsync(address).addListener([this, promise, topic, address, authoritative,
51-
redirectCount](Result result,
52-
const ClientConnectionWeakPtr& weakCnx) {
50+
cnxPool_.getRandomConnectionAsync(address).addListener([this, promise, topic, address, authoritative,
51+
redirectCount](
52+
Result result,
53+
const ClientConnectionWeakPtr& weakCnx) {
5354
if (result != ResultOk) {
5455
promise->setFailed(result);
5556
return;
@@ -109,7 +110,7 @@ Future<Result, LookupDataResultPtr> BinaryProtoLookupService::getPartitionMetada
109110
}
110111
std::string lookupName = topicName->toString();
111112
const auto address = serviceNameResolver_.resolveHost();
112-
cnxPool_.getConnectionAsync(address, address)
113+
cnxPool_.getRandomConnectionAsync(address, address)
113114
.addListener(std::bind(&BinaryProtoLookupService::sendPartitionMetadataLookupRequest, this,
114115
lookupName, std::placeholders::_1, std::placeholders::_2, promise));
115116
return promise->getFuture();
@@ -162,7 +163,7 @@ Future<Result, NamespaceTopicsPtr> BinaryProtoLookupService::getTopicsOfNamespac
162163
return promise->getFuture();
163164
}
164165
std::string namespaceName = nsName->toString();
165-
cnxPool_.getConnectionAsync(serviceNameResolver_.resolveHost())
166+
cnxPool_.getRandomConnectionAsync(serviceNameResolver_.resolveHost())
166167
.addListener(std::bind(&BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest, this,
167168
namespaceName, mode, std::placeholders::_1, std::placeholders::_2, promise));
168169
return promise->getFuture();
@@ -176,7 +177,7 @@ Future<Result, SchemaInfo> BinaryProtoLookupService::getSchema(const TopicNamePt
176177
promise->setFailed(ResultInvalidTopicName);
177178
return promise->getFuture();
178179
}
179-
cnxPool_.getConnectionAsync(serviceNameResolver_.resolveHost())
180+
cnxPool_.getRandomConnectionAsync(serviceNameResolver_.resolveHost())
180181
.addListener(std::bind(&BinaryProtoLookupService::sendGetSchemaRequest, this, topicName->toString(),
181182
version, std::placeholders::_1, std::placeholders::_2, promise));
182183

lib/ClientImpl.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -516,7 +516,7 @@ void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr co
516516
}
517517
}
518518

519-
Future<Result, ClientConnectionPtr> ClientImpl::getConnection(const std::string& topic) {
519+
Future<Result, ClientConnectionPtr> ClientImpl::getConnection(const std::string& topic, int key) {
520520
Promise<Result, ClientConnectionPtr> promise;
521521

522522
const auto topicNamePtr = TopicName::get(topic);
@@ -528,12 +528,12 @@ Future<Result, ClientConnectionPtr> ClientImpl::getConnection(const std::string&
528528

529529
auto self = shared_from_this();
530530
lookupServicePtr_->getBroker(*topicNamePtr)
531-
.addListener([this, self, promise](Result result, const LookupService::LookupResult& data) {
531+
.addListener([this, self, promise, key](Result result, const LookupService::LookupResult& data) {
532532
if (result != ResultOk) {
533533
promise.setFailed(result);
534534
return;
535535
}
536-
pool_.getConnectionAsync(data.logicalAddress, data.physicalAddress)
536+
pool_.getConnectionAsync(data.logicalAddress, data.physicalAddress, key)
537537
.addListener([promise](Result result, const ClientConnectionWeakPtr& weakCnx) {
538538
if (result == ResultOk) {
539539
auto cnx = weakCnx.lock();

lib/ClientImpl.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
9595

9696
void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback);
9797

98-
Future<Result, ClientConnectionPtr> getConnection(const std::string& topic);
98+
Future<Result, ClientConnectionPtr> getConnection(const std::string& topic, int key);
9999

100100
void closeAsync(CloseCallback callback);
101101
void shutdown();
@@ -123,6 +123,8 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
123123

124124
std::shared_ptr<std::atomic<uint64_t>> getRequestIdGenerator() const { return requestIdGenerator_; }
125125

126+
ConnectionPool& getConnectionPool() noexcept { return pool_; }
127+
126128
friend class PulsarFriend;
127129

128130
private:

lib/ConnectionPool.cc

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,9 @@ bool ConnectionPool::close() {
6161
return true;
6262
}
6363

64-
Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(
65-
const std::string& logicalAddress, const std::string& physicalAddress) {
64+
Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(const std::string& logicalAddress,
65+
const std::string& physicalAddress,
66+
int keySuffix) {
6667
if (closed_) {
6768
Promise<Result, ClientConnectionWeakPtr> promise;
6869
promise.setFailed(ResultAlreadyClosed);
@@ -72,7 +73,7 @@ Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(
7273
std::unique_lock<std::recursive_mutex> lock(mutex_);
7374

7475
std::stringstream ss;
75-
ss << logicalAddress << '-' << randomDistribution_(randomEngine_);
76+
ss << logicalAddress << '-' << keySuffix;
7677
const std::string key = ss.str();
7778

7879
PoolMap::iterator cnxIt = pool_.find(key);
@@ -95,7 +96,8 @@ Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(
9596
// No valid or pending connection found in the pool, creating a new one
9697
ClientConnectionPtr cnx;
9798
try {
98-
cnx.reset(new ClientConnection(logicalAddress, physicalAddress, executorProvider_->get(),
99+
size_t index = executorIndex_++;
100+
cnx.reset(new ClientConnection(logicalAddress, physicalAddress, executorProvider_->get(index),
99101
clientConfiguration_, authentication_, clientVersion_, *this));
100102
} catch (const std::runtime_error& e) {
101103
lock.unlock();

lib/ConnectionPool.h

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,15 +65,27 @@ class PULSAR_PUBLIC ConnectionPool {
6565
* a proxy layer. Essentially, the pool is using the logical address as a way to
6666
* decide whether to reuse a particular connection.
6767
*
68+
* There could be many connections to the same broker, so this pool uses an integer key as the suffix of
69+
* the key that represents the connection.
70+
*
6871
* @param logicalAddress the address to use as the broker tag
6972
* @param physicalAddress the real address where the TCP connection should be made
73+
* @param keySuffix the key suffix to choose which connection on the same broker
7074
* @return a future that will produce the ClientCnx object
7175
*/
7276
Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const std::string& logicalAddress,
73-
const std::string& physicalAddress);
77+
const std::string& physicalAddress,
78+
int keySuffix);
79+
80+
int generateRandomIndex() { return randomDistribution_(randomEngine_); }
81+
82+
Future<Result, ClientConnectionWeakPtr> getRandomConnectionAsync(const std::string& logicalAddress,
83+
const std::string& physicalAddress) {
84+
return getConnectionAsync(logicalAddress, physicalAddress, generateRandomIndex());
85+
}
7486

75-
Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const std::string& address) {
76-
return getConnectionAsync(address, address);
87+
Future<Result, ClientConnectionWeakPtr> getRandomConnectionAsync(const std::string& address) {
88+
return getRandomConnectionAsync(address, address);
7789
}
7890

7991
private:
@@ -85,6 +97,8 @@ class PULSAR_PUBLIC ConnectionPool {
8597
const std::string clientVersion_;
8698
mutable std::recursive_mutex mutex_;
8799
std::atomic_bool closed_{false};
100+
// Use a separated index so that connections will be distributed uniformly across the executors
101+
std::atomic_size_t executorIndex_{0};
88102

89103
std::uniform_int_distribution<> randomDistribution_;
90104
std::mt19937 randomEngine_;

lib/ExecutorService.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,10 @@ void ExecutorService::postWork(std::function<void(void)> task) { io_service_.pos
133133
ExecutorServiceProvider::ExecutorServiceProvider(int nthreads)
134134
: executors_(nthreads), executorIdx_(0), mutex_() {}
135135

136-
ExecutorServicePtr ExecutorServiceProvider::get() {
136+
ExecutorServicePtr ExecutorServiceProvider::get(size_t idx) {
137+
idx %= executors_.size();
137138
Lock lock(mutex_);
138139

139-
int idx = executorIdx_++ % executors_.size();
140140
if (!executors_[idx]) {
141141
executors_[idx] = ExecutorService::create();
142142
}

lib/ExecutorService.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,15 +88,17 @@ class PULSAR_PUBLIC ExecutorServiceProvider {
8888
public:
8989
explicit ExecutorServiceProvider(int nthreads);
9090

91-
ExecutorServicePtr get();
91+
ExecutorServicePtr get() { return get(executorIdx_++); }
92+
93+
ExecutorServicePtr get(size_t index);
9294

9395
// See TimeoutProcessor for the semantics of the parameter.
9496
void close(long timeoutMs = 3000);
9597

9698
private:
9799
typedef std::vector<ExecutorServicePtr> ExecutorList;
98100
ExecutorList executors_;
99-
int executorIdx_;
101+
std::atomic_size_t executorIdx_;
100102
std::mutex mutex_;
101103
typedef std::unique_lock<std::mutex> Lock;
102104
};

lib/HandlerBase.cc

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ namespace pulsar {
3232
HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, const Backoff& backoff)
3333
: topic_(std::make_shared<std::string>(topic)),
3434
client_(client),
35+
connectionKeySuffix_(client->getConnectionPool().generateRandomIndex()),
3536
executor_(client->getIOExecutorProvider()->get()),
3637
mutex_(),
3738
creationTimestamp_(TimeUtils::now()),
@@ -88,22 +89,23 @@ void HandlerBase::grabCnx() {
8889
return;
8990
}
9091
auto self = shared_from_this();
91-
client->getConnection(topic()).addListener([this, self](Result result, const ClientConnectionPtr& cnx) {
92-
if (result == ResultOk) {
93-
LOG_DEBUG(getName() << "Connected to broker: " << cnx->cnxString());
94-
connectionOpened(cnx).addListener([this, self](Result result, bool) {
95-
// Do not use bool, only Result.
92+
client->getConnection(topic(), connectionKeySuffix_)
93+
.addListener([this, self](Result result, const ClientConnectionPtr& cnx) {
94+
if (result == ResultOk) {
95+
LOG_DEBUG(getName() << "Connected to broker: " << cnx->cnxString());
96+
connectionOpened(cnx).addListener([this, self](Result result, bool) {
97+
// Do not use bool, only Result.
98+
reconnectionPending_ = false;
99+
if (isResultRetryable(result)) {
100+
scheduleReconnection();
101+
}
102+
});
103+
} else {
104+
connectionFailed(result);
96105
reconnectionPending_ = false;
97-
if (isResultRetryable(result)) {
98-
scheduleReconnection();
99-
}
100-
});
101-
} else {
102-
connectionFailed(result);
103-
reconnectionPending_ = false;
104-
scheduleReconnection();
105-
}
106-
});
106+
scheduleReconnection();
107+
}
108+
});
107109
}
108110

109111
void HandlerBase::handleDisconnection(Result result, const ClientConnectionPtr& cnx) {

lib/HandlerBase.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
9999

100100
protected:
101101
ClientImplWeakPtr client_;
102+
const int connectionKeySuffix_;
102103
ExecutorServicePtr executor_;
103104
mutable std::mutex mutex_;
104105
std::mutex pendingReceiveMutex_;

0 commit comments

Comments
 (0)