Skip to content

Commit 92a7c87

Browse files
committed
Add more logs for reconnection
1 parent b35ae1a commit 92a7c87

4 files changed

Lines changed: 35 additions & 28 deletions

File tree

lib/ClientConnection.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1329,7 +1329,7 @@ void ClientConnection::close(Result result, bool detach) {
13291329
if (result != ResultDisconnected && result != ResultRetryable) {
13301330
LOG_ERROR(cnxString_ << "Connection closed with " << result);
13311331
} else {
1332-
LOG_INFO(cnxString_ << "Connection disconnected");
1332+
LOG_WARN(cnxString_ << "Connection disconnected: " << result);
13331333
}
13341334
// Remove the connection from the pool before completing any promise
13351335
if (detach) {

lib/ConsumerImpl.cc

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ void ConsumerImpl::onNegativeAcksSend(const std::set<MessageId>& messageIds) {
222222

223223
void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
224224
if (state_ == Closed) {
225-
LOG_DEBUG(getName() << "connectionOpened : Consumer is already closed");
225+
LOG_WARN(getName() << "connectionOpened : Consumer is already closed");
226226
return;
227227
}
228228

@@ -246,6 +246,7 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
246246

247247
ClientImplPtr client = client_.lock();
248248
uint64_t requestId = client->newRequestId();
249+
LOG_WARN(getName() << "start subscribing the topic");
249250
SharedBuffer cmd = Commands::newSubscribe(
250251
*topic_, subscription_, consumerId_, requestId, getSubType(), consumerName_, subscriptionMode_,
251252
subscribeMessageId, readCompacted_, config_.getProperties(), config_.getSubscriptionProperties(),
@@ -274,6 +275,7 @@ void ConsumerImpl::sendFlowPermitsToBroker(const ClientConnectionPtr& cnx, int n
274275
}
275276

276277
void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result result) {
278+
LOG_WARN(getName() << "created consumer: " << result << ", " << cnx->cnxString());
277279
static bool firstTime = true;
278280
if (result == ResultOk) {
279281
if (firstTime) {
@@ -294,7 +296,7 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
294296
availablePermits_ = 0;
295297
}
296298

297-
LOG_DEBUG(getName() << "Send initial flow permits: " << config_.getReceiverQueueSize());
299+
LOG_WARN(getName() << "Send initial flow permits: " << config_.getReceiverQueueSize());
298300
if (consumerTopicType_ == NonPartitioned || !firstTime) {
299301
if (config_.getReceiverQueueSize() != 0) {
300302
sendFlowPermitsToBroker(cnx, config_.getReceiverQueueSize());
@@ -1204,7 +1206,7 @@ void ConsumerImpl::negativeAcknowledge(const MessageId& messageId) {
12041206
}
12051207

12061208
void ConsumerImpl::disconnectConsumer() {
1207-
LOG_INFO("Broker notification of Closed consumer: " << consumerId_);
1209+
LOG_WARN("Broker notification of Closed consumer: " << consumerId_);
12081210
resetCnx();
12091211
scheduleReconnection();
12101212
}

lib/HandlerBase.cc

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,36 +67,37 @@ void HandlerBase::setCnx(const ClientConnectionPtr& cnx) {
6767

6868
void HandlerBase::grabCnx() {
6969
if (getCnx().lock()) {
70-
LOG_INFO(getName() << "Ignoring reconnection request since we're already connected");
70+
LOG_WARN(getName() << "Ignoring reconnection request since we're already connected");
7171
return;
7272
}
7373

7474
bool expectedState = false;
7575
if (!reconnectionPending_.compare_exchange_strong(expectedState, true)) {
76-
LOG_DEBUG(getName() << "Ignoring reconnection attempt since there's already a pending reconnection");
76+
LOG_WARN(getName() << "Ignoring reconnection attempt since there's already a pending reconnection");
7777
return;
7878
}
7979

80-
LOG_INFO(getName() << "Getting connection from pool");
80+
LOG_WARN(getName() << "Getting connection from pool");
8181
ClientImplPtr client = client_.lock();
8282
if (!client) {
8383
LOG_WARN(getName() << "Client is invalid when calling grabCnx()");
8484
connectionFailed(ResultConnectError);
8585
return;
8686
}
8787
auto weakSelf = get_weak_from_this();
88+
auto name = getName();
8889
client->getConnection(*topic_).addListener(
89-
[this, weakSelf](Result result, const ClientConnectionPtr& cnx) {
90+
[this, weakSelf, name](Result result, const ClientConnectionPtr& cnx) {
9091
auto self = weakSelf.lock();
9192
if (!self) {
92-
LOG_DEBUG("HandlerBase Weak reference is not valid anymore");
93+
LOG_WARN(name << " HandlerBase Weak reference is not valid anymore: " << result);
9394
return;
9495
}
9596

9697
reconnectionPending_ = false;
9798

9899
if (result == ResultOk) {
99-
LOG_DEBUG(getName() << "Connected to broker: " << cnx->cnxString());
100+
LOG_WARN(getName() << "Connected to broker: " << cnx->cnxString());
100101
connectionOpened(cnx);
101102
} else {
102103
connectionFailed(result);
@@ -116,6 +117,7 @@ void HandlerBase::handleDisconnection(Result result, const ClientConnectionPtr&
116117
}
117118

118119
resetCnx();
120+
LOG_WARN(getName() << " disconnected (result: " << result << ", state: " << state << ")");
119121

120122
if (result == ResultRetryable) {
121123
scheduleReconnection();
@@ -144,7 +146,7 @@ void HandlerBase::scheduleReconnection() {
144146
if (state == Pending || state == Ready) {
145147
TimeDuration delay = backoff_.next();
146148

147-
LOG_INFO(getName() << "Schedule reconnection in " << (delay.total_milliseconds() / 1000.0) << " s");
149+
LOG_WARN(getName() << "Schedule reconnection in " << (delay.total_milliseconds() / 1000.0) << " s");
148150
timer_->expires_from_now(delay);
149151
// passing shared_ptr here since time_ will get destroyed, so tasks will be cancelled
150152
// so we will not run into the case where grabCnx is invoked on out of scope handler

lib/RetryableOperation.h

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -108,27 +108,30 @@ class RetryableOperation : public std::enable_shared_from_this<RetryableOperatio
108108
timer_->expires_from_now(delay);
109109

110110
auto nextRemainingTime = remainingTime - delay;
111-
LOG_INFO("Reschedule " << name_ << " for " << delay.total_milliseconds()
111+
LOG_WARN("Reschedule " << name_ << " for " << delay.total_milliseconds()
112112
<< " ms, remaining time: " << nextRemainingTime.total_milliseconds()
113113
<< " ms");
114-
timer_->async_wait([this, weakSelf, nextRemainingTime](const boost::system::error_code& ec) {
115-
auto self = weakSelf.lock();
116-
if (!self) {
117-
return;
118-
}
119-
if (ec) {
120-
if (ec == boost::asio::error::operation_aborted) {
121-
LOG_DEBUG("Timer for " << name_ << " is cancelled");
122-
promise_.setFailed(ResultTimeout);
114+
auto name = name_;
115+
timer_->async_wait(
116+
[this, weakSelf, nextRemainingTime, name](const boost::system::error_code& ec) {
117+
auto self = weakSelf.lock();
118+
if (!self) {
119+
LOG_WARN(name << " is expired in the callback");
120+
return;
121+
}
122+
if (ec) {
123+
if (ec == boost::asio::error::operation_aborted) {
124+
LOG_WARN("Timer for " << name_ << " is cancelled");
125+
promise_.setFailed(ResultTimeout);
126+
} else {
127+
LOG_WARN("Timer for " << name_ << " failed: " << ec.message());
128+
}
123129
} else {
124-
LOG_WARN("Timer for " << name_ << " failed: " << ec.message());
130+
LOG_DEBUG("Run operation " << name_ << ", remaining time: "
131+
<< nextRemainingTime.total_milliseconds() << " ms");
132+
runImpl(nextRemainingTime);
125133
}
126-
} else {
127-
LOG_DEBUG("Run operation " << name_ << ", remaining time: "
128-
<< nextRemainingTime.total_milliseconds() << " ms");
129-
runImpl(nextRemainingTime);
130-
}
131-
});
134+
});
132135
});
133136
return promise_.getFuture();
134137
}

0 commit comments

Comments
 (0)