Skip to content

Commit b030d2c

Browse files
committed
track incomplete requests
1 parent 1303c05 commit b030d2c

2 files changed

Lines changed: 39 additions & 8 deletions

File tree

lib/MockServer.h

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#pragma once
2020

2121
#include <initializer_list>
22+
#include <memory>
2223
#include <mutex>
2324
#include <unordered_map>
2425
#include <vector>
@@ -31,7 +32,7 @@
3132

3233
namespace pulsar {
3334

34-
class MockServer {
35+
class MockServer : public std::enable_shared_from_this<MockServer> {
3536
public:
3637
using RequestDelayType = std::unordered_map<std::string, long /* delay in milliseconds */>;
3738

@@ -72,18 +73,25 @@ class MockServer {
7273
}
7374
long delayMs = iter->second;
7475
auto timer = connection->executor_->createDeadlineTimer();
76+
auto key = request + std::to_string(requestId);
77+
pendingTimers_[key] = timer;
7578
timer->expires_from_now(std::chrono::milliseconds(delayMs));
76-
timer->async_wait([connection, requestId, request, timer](const auto& ec) {
79+
80+
auto self = shared_from_this();
81+
timer->async_wait([this, self, key, connection, requestId, timer](const auto& ec) {
82+
{
83+
std::lock_guard<std::mutex> lock(mutex_);
84+
pendingTimers_.erase(key);
85+
}
7786
if (ec) {
78-
LOG_INFO("Timer cancelled for request " << request << " with id " << requestId);
87+
LOG_INFO("Timer cancelled for request " << key);
7988
return;
8089
}
8190
if (connection->isClosed()) {
82-
LOG_INFO("Connection is closed, not completing request " << request << " with id "
83-
<< requestId);
91+
LOG_INFO("Connection is closed, not completing request " << key);
8492
return;
8593
}
86-
LOG_INFO("Completing delayed request " << request << " with id " << requestId);
94+
LOG_INFO("Completing delayed request " << key);
8795
proto::CommandSuccess success;
8896
success.set_request_id(requestId);
8997
connection->handleSuccess(success);
@@ -94,9 +102,25 @@ class MockServer {
94102
}
95103
}
96104

105+
// Return the number of pending timers cancelled
106+
auto close() {
107+
std::lock_guard<std::mutex> lock(mutex_);
108+
auto result = pendingTimers_.size();
109+
for (auto&& kv : pendingTimers_) {
110+
try {
111+
LOG_INFO("Cancelling timer for " << kv.first);
112+
kv.second->cancel();
113+
} catch (...) {
114+
LOG_WARN("Failed to cancel timer for " << kv.first);
115+
}
116+
}
117+
return result;
118+
}
119+
97120
private:
98121
mutable std::mutex mutex_;
99122
std::unordered_map<std::string, long> requestDelays_;
123+
std::unordered_map<std::string, DeadlineTimerPtr> pendingTimers_;
100124
ClientConnectionWeakPtr connection_;
101125

102126
DECLARE_LOG_OBJECT()

tests/ConsumerSeekTest.cc

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,12 @@ TEST_F(ConsumerSeekTest, testNoInternalConsumer) {
209209
static void assertSeekWithTimeout(Consumer& consumer) {
210210
using namespace std::chrono_literals;
211211
auto promise = std::make_shared<std::promise<Result>>();
212-
consumer.seekAsync(0L, [promise](Result result) { promise->set_value(result); });
212+
std::weak_ptr<std::promise<Result>> weakPromise = promise;
213+
consumer.seekAsync(0L, [weakPromise](Result result) {
214+
if (auto promise = weakPromise.lock()) {
215+
promise->set_value(result);
216+
}
217+
});
213218
auto future = promise->get_future();
214219
ASSERT_EQ(future.wait_for(5s), std::future_status::ready);
215220
ASSERT_EQ(future.get(), ResultOk);
@@ -231,13 +236,14 @@ TEST_F(ConsumerSeekTest, testSubscribeSeekRaces) {
231236
mockServer->setRequestDelay({{"SUBSCRIBE", 500}, {"SEEK", 1000}});
232237
assertSeekWithTimeout(consumer);
233238

239+
ASSERT_EQ(mockServer->close(), 0);
234240
client.close();
235241
}
236242

237243
TEST_F(ConsumerSeekTest, testReconnectionSlow) {
238244
Client client(lookupUrl, ClientConfiguration().setInitialBackoffIntervalMs(500));
239245
Consumer consumer;
240-
ASSERT_EQ(ResultOk, client.subscribe("testSubscribeSeekRaces", "sub", consumer));
246+
ASSERT_EQ(ResultOk, client.subscribe("testReconnectionSlow", "sub", consumer));
241247

242248
auto connection = *PulsarFriend::getConnections(client).begin();
243249
auto mockServer = std::make_shared<MockServer>(connection);
@@ -247,6 +253,7 @@ TEST_F(ConsumerSeekTest, testReconnectionSlow) {
247253
mockServer->setRequestDelay({{"SEEK", 100}});
248254
assertSeekWithTimeout(consumer);
249255

256+
ASSERT_EQ(mockServer->close(), 0);
250257
client.close();
251258
}
252259

0 commit comments

Comments
 (0)