Skip to content

Commit 7424d56

Browse files
merlimatBewareMyPower
authored andcommitted
Avoid double attempt at reconnecting (apache#310)
### Motivation There is a sequence of conditions that can trigger a client to schedule multiple reconnections to the broker. This is due to fact that we're not checking whether such an attempt is already in progress. example: 1. Receive `CloseConsumer` command from broker 1a. Schedule for reconnection in 100ms 2. Connection is closed (eg: broker shutdown has initiated) 2a. Schedule for reconnection in 200ms (since the backoff was already incremented) Result is that we're going to call `grabCnx()` twice ### Modifications Use atomic flag to ignore the 2nd attempt, just waiting for the 1st attempt to finish. (cherry picked from commit 2e2f90b)
1 parent 6d14fee commit 7424d56

2 files changed

Lines changed: 14 additions & 1 deletion

File tree

lib/HandlerBase.cc

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
3838
state_(NotStarted),
3939
backoff_(backoff),
4040
epoch_(0),
41-
timer_(executor_->createDeadlineTimer()) {}
41+
timer_(executor_->createDeadlineTimer()),
42+
reconnectionPending_(false) {}
4243

4344
HandlerBase::~HandlerBase() { timer_->cancel(); }
4445

@@ -69,6 +70,13 @@ void HandlerBase::grabCnx() {
6970
LOG_INFO(getName() << "Ignoring reconnection request since we're already connected");
7071
return;
7172
}
73+
74+
bool expectedState = false;
75+
if (!reconnectionPending_.compare_exchange_strong(expectedState, true)) {
76+
LOG_DEBUG(getName() << "Ignoring reconnection attempt since there's already a pending reconnection");
77+
return;
78+
}
79+
7280
LOG_INFO(getName() << "Getting connection from pool");
7381
ClientImplPtr client = client_.lock();
7482
Future<Result, ClientConnectionWeakPtr> future = client->getConnection(topic_);
@@ -83,6 +91,9 @@ void HandlerBase::handleNewConnection(Result result, ClientConnectionWeakPtr con
8391
LOG_DEBUG("HandlerBase Weak reference is not valid anymore");
8492
return;
8593
}
94+
95+
handler->reconnectionPending_ = false;
96+
8697
if (result == ResultOk) {
8798
ClientConnectionPtr conn = connection.lock();
8899
if (conn) {
@@ -142,6 +153,7 @@ bool HandlerBase::isRetriableError(Result result) { return result == ResultRetry
142153

143154
void HandlerBase::scheduleReconnection(HandlerBasePtr handler) {
144155
const auto state = handler->state_.load();
156+
145157
if (state == Pending || state == Ready) {
146158
TimeDuration delay = handler->backoff_.next();
147159

lib/HandlerBase.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ class HandlerBase {
130130
DeadlineTimerPtr timer_;
131131

132132
mutable std::mutex connectionMutex_;
133+
std::atomic<bool> reconnectionPending_;
133134
ClientConnectionWeakPtr connection_;
134135
friend class ClientConnection;
135136
friend class PulsarFriend;

0 commit comments

Comments
 (0)