Skip to content

Commit 30d27f1

Browse files
committed
abstract a common method to insert a request and add tests
1 parent 86e864b commit 30d27f1

4 files changed

Lines changed: 102 additions & 119 deletions

File tree

lib/ClientConnection.cc

Lines changed: 43 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "ConnectionPool.h"
3434
#include "ConsumerImpl.h"
3535
#include "ExecutorService.h"
36+
#include "Future.h"
3637
#include "LogUtils.h"
3738
#include "MockServer.h"
3839
#include "OpSendMsg.h"
@@ -1005,7 +1006,6 @@ Future<Result, BrokerConsumerStatsImpl> ClientConnection::newConsumerStats(uint6
10051006
}
10061007
pendingConsumerStatsMap_.insert(std::make_pair(requestId, promise));
10071008
lock.unlock();
1008-
10091009
if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr &&
10101010
mockServer_->sendRequest("CONSUMER_STATS", requestId)) {
10111011
return promise.getFuture();
@@ -1040,14 +1040,14 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, co
10401040
return;
10411041
}
10421042

1043-
auto request = std::make_shared<LookupRequest>(
1044-
executor_->createTimer(operationsTimeout_),
1045-
makePendingRequestTimeoutHandler(
1046-
pendingLookupRequests_, requestId,
1047-
[cnxString = cnxString(), requestId, requestType]() {
1048-
LOG_WARN(cnxString << requestType << " request timeout to broker, req_id: " << requestId);
1049-
},
1050-
[](ClientConnection& connection) { connection.numOfPendingLookupRequest_--; }));
1043+
auto request = insertRequest(
1044+
pendingLookupRequests_, requestId, [weakSelf{weak_from_this()}, requestId, requestType]() {
1045+
if (auto self = weakSelf.lock()) {
1046+
LOG_WARN(self->cnxString()
1047+
<< requestType << " request timeout to broker, req_id: " << requestId);
1048+
self->numOfPendingLookupRequest_--;
1049+
}
1050+
});
10511051
request->getFuture().addListener([promise](Result result, const LookupDataResultPtr& lookupDataResult) {
10521052
if (result == ResultOk) {
10531053
promise->setValue(lookupDataResult);
@@ -1056,9 +1056,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, co
10561056
}
10571057
});
10581058

1059-
pendingLookupRequests_.emplace(requestId, request);
10601059
numOfPendingLookupRequest_++;
1061-
request->initialize();
10621060
lock.unlock();
10631061
LOG_DEBUG(cnxString() << "Inserted lookup request " << requestType << " (req_id: " << requestId << ")");
10641062
if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr &&
@@ -1174,22 +1172,17 @@ Future<Result, ResponseData> ClientConnection::sendRequestWithId(const SharedBuf
11741172
lock.unlock();
11751173
LOG_DEBUG(cnxString() << "Fail " << requestType << "(req_id: " << requestId
11761174
<< ") to a closed connection");
1177-
auto request =
1178-
std::make_shared<Request>(executor_->createTimer(operationsTimeout_), [] { return false; });
1179-
request->fail(ResultNotConnected);
1180-
return request->getFuture();
1175+
Promise<Result, ResponseData> promise;
1176+
promise.setFailed(ResultNotConnected);
1177+
return promise.getFuture();
11811178
}
11821179

1183-
auto request = std::make_shared<Request>(
1184-
executor_->createTimer(operationsTimeout_),
1185-
makePendingRequestTimeoutHandler(
1186-
pendingRequests_, requestId,
1187-
[cnxString = cnxString(), physicalAddress = physicalAddress_, requestId, requestType]() {
1188-
LOG_WARN(cnxString << "Network request timeout to broker, remote: " << physicalAddress
1189-
<< ", req_id: " << requestId << ", request: " << requestType);
1190-
}));
1191-
pendingRequests_.emplace(requestId, request);
1192-
request->initialize();
1180+
auto request = insertRequest(
1181+
pendingRequests_, requestId,
1182+
[cnxString{cnxString()}, physicalAddress{physicalAddress_}, requestId, requestType]() {
1183+
LOG_WARN(cnxString << "Network request timeout to broker, remote: " << physicalAddress
1184+
<< ", req_id: " << requestId << ", request: " << requestType);
1185+
});
11931186
lock.unlock();
11941187

11951188
LOG_DEBUG(cnxString() << "Inserted request " << requestType << " (req_id: " << requestId << ")");
@@ -1278,6 +1271,11 @@ const std::future<void>& ClientConnection::close(Result result, bool switchClust
12781271
keepAliveTimer_.reset();
12791272
}
12801273

1274+
if (consumerStatsRequestTimer_) {
1275+
cancelTimer(*consumerStatsRequestTimer_);
1276+
consumerStatsRequestTimer_.reset();
1277+
}
1278+
12811279
cancelTimer(*connectTimer_);
12821280
lock.unlock();
12831281
int refCount = weak_from_this().use_count();
@@ -1401,15 +1399,12 @@ Future<Result, GetLastMessageIdResponse> ClientConnection::newGetLastMessageId(u
14011399
return promise->getFuture();
14021400
}
14031401

1404-
auto request = std::make_shared<GetLastMessageId>(
1405-
executor_->createTimer(operationsTimeout_),
1406-
makePendingRequestTimeoutHandler(
1407-
pendingGetLastMessageIdRequests_, requestId, [cnxString = cnxString(), requestId]() {
1408-
LOG_WARN(cnxString << "GetLastMessageId request timeout to broker, req_id: " << requestId);
1409-
}));
1410-
pendingGetLastMessageIdRequests_.emplace(requestId, request);
1411-
request->initialize();
1402+
auto request =
1403+
insertRequest(pendingGetLastMessageIdRequests_, requestId, [cnxString = cnxString(), requestId]() {
1404+
LOG_WARN(cnxString << "GetLastMessageId request timeout to broker, req_id: " << requestId);
1405+
});
14121406
lock.unlock();
1407+
14131408
if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr &&
14141409
mockServer_->sendRequest("GET_LAST_MESSAGE_ID", requestId)) {
14151410
return request->getFuture();
@@ -1424,21 +1419,16 @@ Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(
14241419
if (isClosed()) {
14251420
lock.unlock();
14261421
LOG_ERROR(cnxString() << "Client is not connected to the broker");
1427-
auto request = std::make_shared<GetTopicsOfNamespace>(executor_->createTimer(operationsTimeout_),
1428-
[] { return false; });
1429-
request->fail(ResultNotConnected);
1430-
return request->getFuture();
1422+
Promise<Result, NamespaceTopicsPtr> promise;
1423+
promise.setFailed(ResultNotConnected);
1424+
return promise.getFuture();
14311425
}
14321426

1433-
auto request = std::make_shared<GetTopicsOfNamespace>(
1434-
executor_->createTimer(operationsTimeout_),
1435-
makePendingRequestTimeoutHandler(
1436-
pendingGetNamespaceTopicsRequests_, requestId, [cnxString = cnxString(), requestId]() {
1437-
LOG_WARN(cnxString << "GetTopicsOfNamespace request timeout to broker, req_id: "
1438-
<< requestId);
1439-
}));
1427+
auto request =
1428+
insertRequest(pendingGetNamespaceTopicsRequests_, requestId, [cnxString = cnxString(), requestId]() {
1429+
LOG_WARN(cnxString << "GetTopicsOfNamespace request timeout to broker, req_id: " << requestId);
1430+
});
14401431
pendingGetNamespaceTopicsRequests_.emplace(requestId, request);
1441-
request->initialize();
14421432
lock.unlock();
14431433
if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr &&
14441434
mockServer_->sendRequest("GET_TOPICS_OF_NAMESPACE", requestId)) {
@@ -1455,20 +1445,15 @@ Future<Result, SchemaInfo> ClientConnection::newGetSchema(const std::string& top
14551445
if (isClosed()) {
14561446
lock.unlock();
14571447
LOG_ERROR(cnxString() << "Client is not connected to the broker");
1458-
auto request =
1459-
std::make_shared<GetSchema>(executor_->createTimer(operationsTimeout_), [] { return false; });
1460-
request->fail(ResultNotConnected);
1461-
return request->getFuture();
1448+
Promise<Result, SchemaInfo> promise;
1449+
promise.setFailed(ResultNotConnected);
1450+
return promise.getFuture();
14621451
}
14631452

1464-
auto request = std::make_shared<GetSchema>(
1465-
executor_->createTimer(operationsTimeout_),
1466-
makePendingRequestTimeoutHandler(
1467-
pendingGetSchemaRequests_, requestId, [cnxString = cnxString(), requestId]() {
1468-
LOG_WARN(cnxString << "GetSchema request timeout to broker, req_id: " << requestId);
1469-
}));
1470-
pendingGetSchemaRequests_.emplace(requestId, request);
1471-
request->initialize();
1453+
auto request =
1454+
insertRequest(pendingGetSchemaRequests_, requestId, [cnxString = cnxString(), requestId]() {
1455+
LOG_WARN(cnxString << "GetSchema request timeout to broker, req_id: " << requestId);
1456+
});
14721457
lock.unlock();
14731458

14741459
if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr &&
@@ -1737,8 +1722,7 @@ void ClientConnection::handleError(const proto::CommandError& error) {
17371722

17381723
request->fail(result);
17391724
} else {
1740-
PendingGetNamespaceTopicsMap::iterator it =
1741-
pendingGetNamespaceTopicsRequests_.find(error.request_id());
1725+
auto it = pendingGetNamespaceTopicsRequests_.find(error.request_id());
17421726
if (it != pendingGetNamespaceTopicsRequests_.end()) {
17431727
auto request = std::move(it->second);
17441728
pendingGetNamespaceTopicsRequests_.erase(it);

lib/ClientConnection.h

Lines changed: 28 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353

5454
#include "AsioTimer.h"
5555
#include "Commands.h"
56+
#include "ExecutorService.h"
5657
#include "GetLastMessageIdResponse.h"
5758
#include "LookupDataResult.h"
5859
#include "PendingRequest.h"
@@ -68,9 +69,6 @@ class PulsarFriend;
6869

6970
using TcpResolverPtr = std::shared_ptr<ASIO::ip::tcp::resolver>;
7071

71-
class ExecutorService;
72-
using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
73-
7472
class ConnectionPool;
7573
class ClientConnection;
7674
typedef std::shared_ptr<ClientConnection> ClientConnectionPtr;
@@ -340,13 +338,14 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
340338
const std::chrono::milliseconds connectTimeout_;
341339
const DeadlineTimerPtr connectTimer_;
342340

343-
using Request = PendingRequest<ResponseData>;
344-
typedef std::unordered_map<long, PendingRequestPtr<ResponseData>> PendingRequestsMap;
345-
PendingRequestsMap pendingRequests_;
341+
template <typename T>
342+
using RequestMap = std::unordered_map<uint64_t, PendingRequestPtr<T>>;
346343

347-
using LookupRequest = PendingRequest<LookupDataResultPtr>;
348-
typedef std::unordered_map<long, PendingRequestPtr<LookupDataResultPtr>> PendingLookupRequestsMap;
349-
PendingLookupRequestsMap pendingLookupRequests_;
344+
RequestMap<ResponseData> pendingRequests_;
345+
RequestMap<LookupDataResultPtr> pendingLookupRequests_;
346+
RequestMap<GetLastMessageIdResponse> pendingGetLastMessageIdRequests_;
347+
RequestMap<NamespaceTopicsPtr> pendingGetNamespaceTopicsRequests_;
348+
RequestMap<SchemaInfo> pendingGetSchemaRequests_;
350349

351350
typedef std::unordered_map<long, ProducerImplWeakPtr> ProducersMap;
352351
ProducersMap producers_;
@@ -357,54 +356,29 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
357356
typedef std::map<uint64_t, Promise<Result, BrokerConsumerStatsImpl>> PendingConsumerStatsMap;
358357
PendingConsumerStatsMap pendingConsumerStatsMap_;
359358

360-
using GetLastMessageId = PendingRequest<GetLastMessageIdResponse>;
361-
using PendingGetLastMessageIdMap = std::unordered_map<long, PendingRequestPtr<GetLastMessageIdResponse>>;
362-
PendingGetLastMessageIdMap pendingGetLastMessageIdRequests_;
363-
364-
using GetTopicsOfNamespace = PendingRequest<NamespaceTopicsPtr>;
365-
typedef std::unordered_map<long, PendingRequestPtr<NamespaceTopicsPtr>> PendingGetNamespaceTopicsMap;
366-
PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_;
367-
368-
using GetSchema = PendingRequest<SchemaInfo>;
369-
typedef std::unordered_map<uint64_t, PendingRequestPtr<SchemaInfo>> PendingGetSchemaMap;
370-
PendingGetSchemaMap pendingGetSchemaRequests_;
371-
372359
mutable std::mutex mutex_;
373360
typedef std::unique_lock<std::mutex> Lock;
374361

375-
template <typename RequestMap, typename OnTimeout, typename OnCleanup>
376-
std::function<bool()> makePendingRequestTimeoutHandler(RequestMap& pendingRequests,
377-
const typename RequestMap::key_type& requestId,
378-
OnTimeout onTimeout, OnCleanup onCleanup) {
379-
auto weakSelf = weak_from_this();
380-
return [weakSelf, pendingRequestsPtr = &pendingRequests, requestId, onTimeout = std::move(onTimeout),
381-
onCleanup = std::move(onCleanup)]() mutable {
382-
auto self = weakSelf.lock();
383-
if (!self) {
384-
return false;
385-
}
386-
387-
{
388-
Lock lock(self->mutex_);
389-
auto it = pendingRequestsPtr->find(requestId);
390-
if (it == pendingRequestsPtr->end()) {
391-
return false;
362+
// Note: this method must be called when holding `mutex_`
363+
template <typename T, typename OnTimeout>
364+
auto insertRequest(RequestMap<T>& pendingRequests, uint64_t requestId, OnTimeout onTimeout) {
365+
auto request = std::make_shared<PendingRequest<T>>(
366+
executor_->createTimer(operationsTimeout_),
367+
[this, self{shared_from_this()}, requestId, onTimeout{std::move(onTimeout)},
368+
&pendingRequests]() mutable {
369+
{
370+
std::lock_guard lock{mutex_};
371+
if (auto it = pendingRequests.find(requestId); it != pendingRequests.end()) {
372+
pendingRequests.erase(it);
373+
}
392374
}
393-
pendingRequestsPtr->erase(it);
394-
onCleanup(*self);
395-
}
396-
397-
onTimeout();
398-
return true;
399-
};
400-
}
401-
402-
template <typename RequestMap, typename OnTimeout>
403-
std::function<bool()> makePendingRequestTimeoutHandler(RequestMap& pendingRequests,
404-
const typename RequestMap::key_type& requestId,
405-
OnTimeout onTimeout) {
406-
return makePendingRequestTimeoutHandler(pendingRequests, requestId, std::move(onTimeout),
407-
[](ClientConnection&) {});
375+
onTimeout();
376+
});
377+
auto [iterator, inserted] = pendingRequests.emplace(requestId, request);
378+
if (inserted) {
379+
request->initialize();
380+
} // else: the request id is duplicated
381+
return iterator->second;
408382
}
409383

410384
// Pending buffers to write on the socket
@@ -430,7 +404,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
430404

431405
void startConsumerStatsTimer(std::vector<uint64_t> consumerStatsRequests);
432406
uint32_t maxPendingLookupRequest_;
433-
uint32_t numOfPendingLookupRequest_ = 0;
407+
std::atomic_uint32_t numOfPendingLookupRequest_{0};
434408

435409
bool isTlsAllowInsecureConnection_ = false;
436410

lib/PendingRequest.h

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ namespace pulsar {
3333
template <typename T>
3434
class PendingRequest : public std::enable_shared_from_this<PendingRequest<T>> {
3535
public:
36-
PendingRequest(ASIO::steady_timer timer, std::function<bool()> timeoutCallback)
36+
PendingRequest(ASIO::steady_timer timer, std::function<void()> timeoutCallback)
3737
: timer_(std::move(timer)), timeoutCallback_(std::move(timeoutCallback)) {}
3838

3939
void initialize() {
@@ -42,9 +42,7 @@ class PendingRequest : public std::enable_shared_from_this<PendingRequest<T>> {
4242
if (!self || error || timeoutDisabled_.load(std::memory_order_acquire)) {
4343
return;
4444
}
45-
if (!timeoutCallback_()) {
46-
return;
47-
}
45+
timeoutCallback_();
4846
promise_.setFailed(ResultTimeout);
4947
});
5048
}
@@ -68,7 +66,7 @@ class PendingRequest : public std::enable_shared_from_this<PendingRequest<T>> {
6866
private:
6967
ASIO::steady_timer timer_;
7068
Promise<Result, T> promise_;
71-
std::function<bool()> timeoutCallback_;
69+
std::function<void()> timeoutCallback_;
7270
std::atomic_bool timeoutDisabled_{false};
7371
};
7472

tests/ClientTest.cc

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,12 @@
3535
#include "WaitUtils.h"
3636
#include "lib/AsioDefines.h"
3737
#include "lib/AtomicSharedPtr.h"
38-
#include "lib/BrokerConsumerStatsImpl.h"
3938
#include "lib/ClientConnection.h"
4039
#include "lib/ConnectionPool.h"
4140
#include "lib/ExecutorService.h"
4241
#include "lib/LogUtils.h"
4342
#include "lib/MockServer.h"
43+
#include "lib/TimeUtils.h"
4444
#include "lib/checksum/ChecksumProvider.h"
4545
#include "lib/stats/ProducerStatsImpl.h"
4646

@@ -237,6 +237,33 @@ TEST(ClientTest, testConnectTimeoutAfterTcpConnected) {
237237
server->stop();
238238
}
239239

240+
TEST(ClientTest, testConnectionNotReferredAfterClose) {
241+
Client client(lookupUrl);
242+
auto topic = "test-connection-not-referred-after-close-" + std::to_string(time(nullptr));
243+
Producer producer;
244+
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
245+
246+
Reader reader;
247+
ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), {}, reader));
248+
249+
bool available;
250+
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(available));
251+
ASSERT_FALSE(available);
252+
253+
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("test").build()));
254+
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(available));
255+
ASSERT_TRUE(available);
256+
257+
Message msg;
258+
ASSERT_EQ(ResultOk, reader.readNext(msg));
259+
ASSERT_EQ("test", msg.getDataAsString());
260+
261+
auto start = TimeUtils::currentTimeMillis();
262+
ASSERT_EQ(ResultOk, client.close());
263+
auto closeTimeMs = TimeUtils::currentTimeMillis() - start;
264+
ASSERT_LT(closeTimeMs, 3000) << "close time: " << closeTimeMs << " ms";
265+
}
266+
240267
TEST(ClientTest, testTimedOutPendingRequestsAreErasedFromConnectionMaps) {
241268
const auto suffix = std::to_string(std::chrono::steady_clock::now().time_since_epoch().count());
242269
ClientConfiguration conf;

0 commit comments

Comments
 (0)