Skip to content

Commit 54bd885

Browse files
committed
add log on timeout in consumer side
1 parent baf997a commit 54bd885

3 files changed

Lines changed: 28 additions & 3 deletions

File tree

lib/ConsumerImpl.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1104,6 +1104,14 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) {
11041104
if (state_ != Ready) {
11051105
return ResultAlreadyClosed;
11061106
}
1107+
auto cnx = getCnx().lock();
1108+
if (cnx) {
1109+
LOG_WARN(getName() << " Receive timeout after " << timeout << " ms, connection: " << cnx->cnxString()
1110+
<< ", queue size: " << incomingMessages_.size());
1111+
} else {
1112+
LOG_WARN(getName() << " Receive timeout after " << timeout << " ms, no connection, queue size: "
1113+
<< incomingMessages_.size());
1114+
}
11071115
return ResultTimeout;
11081116
}
11091117
}

lib/MultiTopicsConsumerImpl.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <chrono>
2222
#include <stdexcept>
2323

24+
#include "ClientConnection.h"
2425
#include "ClientImpl.h"
2526
#include "ConsumerImpl.h"
2627
#include "ExecutorService.h"
@@ -600,6 +601,14 @@ Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) {
600601
if (state_ != Ready) {
601602
return ResultAlreadyClosed;
602603
}
604+
auto cnx = getCnx().lock();
605+
if (cnx) {
606+
LOG_WARN(getName() << " Receive timeout after " << timeout << " ms, connection: " << cnx->cnxString()
607+
<< ", queue size: " << incomingMessages_.size());
608+
} else {
609+
LOG_WARN(getName() << " Receive timeout after " << timeout << " ms, no connection, queue size: "
610+
<< incomingMessages_.size());
611+
}
603612
return ResultTimeout;
604613
}
605614
}

lib/UnAckedMessageTrackerEnabled.cc

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
#include <functional>
2222

23+
#include "ClientConnection.h"
2324
#include "ClientImpl.h"
2425
#include "ConsumerImplBase.h"
2526
#include "ExecutorService.h"
@@ -57,9 +58,16 @@ void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() {
5758

5859
std::set<MessageId> msgIdsToRedeliver;
5960
if (!headPartition.empty()) {
60-
LOG_INFO(consumerReference_.getName().c_str()
61-
<< ": " << headPartition.size() << " Messages were not acked within "
62-
<< timePartitions.size() * tickDurationInMs_ << " time");
61+
auto cnx = consumerReference_.getCnx().lock();
62+
if (cnx) {
63+
LOG_WARN(consumerReference_.getName() << " Unacked messages timeout: " << headPartition.size()
64+
<< " messages not acked within " << timeoutMs_
65+
<< " ms, connection: " << cnx->cnxString());
66+
} else {
67+
LOG_WARN(consumerReference_.getName() << " Unacked messages timeout: " << headPartition.size()
68+
<< " messages not acked within " << timeoutMs_
69+
<< " ms, no connection");
70+
}
6371
for (auto it = headPartition.begin(); it != headPartition.end(); it++) {
6472
msgIdsToRedeliver.insert(*it);
6573
messageIdPartitionMap.erase(*it);

0 commit comments

Comments
 (0)