Skip to content

Commit 38406e4

Browse files
committed
[improve][client] Enhance connection and timeout logging
- Add endpoint IP:port details to connection failure messages - Log resolved endpoints for DNS lookup debugging - Include physical address in connection timeout errors - Add broker remote address to network request timeout warnings - Log producer queue sizes during send timeouts for backpressure debugging These improvements provide better context for troubleshooting network connectivity issues, DNS resolution problems, and producer backpressure scenarios.
1 parent 2606df9 commit 38406e4

2 files changed

Lines changed: 28 additions & 2 deletions

File tree

lib/ClientConnection.cc

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,7 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, const tcp::endp
486486
handleHandshake(ASIO_SUCCESS);
487487
}
488488
} else {
489-
LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
489+
LOG_ERROR(cnxString_ << "Failed to establish connection to " << endpoint.address().to_string() << ":" << endpoint.port() << ": " << err.message());
490490
if (err == ASIO::error::operation_aborted) {
491491
close();
492492
} else {
@@ -603,15 +603,29 @@ void ClientConnection::handleResolve(ASIO_ERROR err, const tcp::resolver::result
603603
return;
604604
}
605605

606+
tcp::endpoint endpointToLog;
607+
if (!results.empty()) {
608+
endpointToLog = *results.begin();
609+
LOG_DEBUG(cnxString_ << "Resolved " << results.size() << " endpoints");
610+
for (const auto& endpoint : results) {
611+
LOG_DEBUG(cnxString_ << " " << endpoint.endpoint().address().to_string() << ":" << endpoint.endpoint().port());
612+
}
613+
}
614+
606615
auto weakSelf = weak_from_this();
607-
connectTimeoutTask_->setCallback([weakSelf](const PeriodicTask::ErrorCode& ec) {
616+
connectTimeoutTask_->setCallback([weakSelf, endpointToLog](const PeriodicTask::ErrorCode& ec) {
608617
ClientConnectionPtr ptr = weakSelf.lock();
609618
if (!ptr) {
610619
// Connection was already destroyed
611620
return;
612621
}
613622

614623
if (ptr->state_ != Ready) {
624+
if (endpointToLog.port() != 0) {
625+
LOG_ERROR(ptr->cnxString_ << "Connection timeout to " << endpointToLog.address().to_string() << ":" << endpointToLog.port());
626+
} else {
627+
LOG_ERROR(ptr->cnxString_ << "Connection timeout to physical address " << ptr->physicalAddress_);
628+
}
615629
LOG_ERROR(ptr->cnxString_ << "Connection was not established in "
616630
<< ptr->connectTimeoutTask_->getPeriodMs() << " ms, close the socket");
617631
PeriodicTask::ErrorCode err;
@@ -1212,20 +1226,23 @@ Future<Result, ResponseData> ClientConnection::sendRequestWithId(const SharedBuf
12121226
void ClientConnection::handleRequestTimeout(const ASIO_ERROR& ec,
12131227
const PendingRequestData& pendingRequestData) {
12141228
if (!ec && !pendingRequestData.hasGotResponse->load()) {
1229+
LOG_WARN(cnxString_ << "Network request timeout to broker, remote: " << physicalAddress_);
12151230
pendingRequestData.promise.setFailed(ResultTimeout);
12161231
}
12171232
}
12181233

12191234
void ClientConnection::handleLookupTimeout(const ASIO_ERROR& ec,
12201235
const LookupRequestData& pendingRequestData) {
12211236
if (!ec) {
1237+
LOG_WARN(cnxString_ << "Lookup request timeout to broker, remote: " << physicalAddress_);
12221238
pendingRequestData.promise->setFailed(ResultTimeout);
12231239
}
12241240
}
12251241

12261242
void ClientConnection::handleGetLastMessageIdTimeout(const ASIO_ERROR& ec,
12271243
const ClientConnection::LastMessageIdRequestData& data) {
12281244
if (!ec) {
1245+
LOG_WARN(cnxString_ << "GetLastMessageId request timeout to broker, remote: " << physicalAddress_);
12291246
data.promise->setFailed(ResultTimeout);
12301247
}
12311248
}

lib/ProducerImpl.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -868,6 +868,15 @@ void ProducerImpl::handleSendTimeout(const ASIO_ERROR& err) {
868868
}
869869

870870
lock.unlock();
871+
auto cnx = getCnx().lock();
872+
if (cnx) {
873+
LOG_WARN(getName() << "Send timeout due to queueing delay, connection: " << cnx->cnxString()
874+
<< ", pending messages: " << pendingMessages.size()
875+
<< ", queue size: " << pendingMessagesQueue_.size());
876+
} else {
877+
LOG_WARN(getName() << "Send timeout due to queueing delay, no connection, pending messages: " << pendingMessages.size()
878+
<< ", queue size: " << pendingMessagesQueue_.size());
879+
}
871880
for (const auto& op : pendingMessages) {
872881
op->complete(ResultTimeout, {});
873882
}

0 commit comments

Comments
 (0)