Skip to content

Commit 038c683

Browse files
committed
fix timer for CLOSE_CONSUMER not removed
1 parent 9159bb5 commit 038c683

1 file changed

Lines changed: 44 additions & 46 deletions

File tree

lib/MockServer.h

Lines changed: 44 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <initializer_list>
2222
#include <memory>
2323
#include <mutex>
24+
#include <string>
2425
#include <unordered_map>
2526
#include <vector>
2627

@@ -56,53 +57,25 @@ class MockServer : public std::enable_shared_from_this<MockServer> {
5657
if (auto iter = requestDelays_.find(request); iter != requestDelays_.end()) {
5758
// Mock the `CLOSE_CONSUMER` command sent by broker, for simplicity, disconnect all consumers
5859
if (request == "SEEK") {
59-
auto closeConsumerDelayMs = requestDelays_["CLOSE_CONSUMER"];
60-
auto timer = connection->executor_->createDeadlineTimer();
61-
pendingTimers_["CLOSE_CONSUMER" + std::to_string(requestId)] = timer;
62-
timer->expires_from_now(std::chrono::milliseconds(closeConsumerDelayMs));
63-
timer->async_wait([connection](const auto& ec) {
64-
if (ec) {
65-
LOG_INFO("Timer cancelled for CLOSE_CONSUMER");
66-
return;
67-
}
68-
std::vector<uint64_t> consumerIds;
69-
{
70-
std::lock_guard<std::mutex> lock{connection->mutex_};
71-
for (auto&& kv : connection->consumers_) {
72-
if (auto consumer = kv.second.lock()) {
73-
consumerIds.push_back(consumer->getConsumerId());
74-
}
75-
}
76-
}
77-
for (auto consumerId : consumerIds) {
78-
proto::CommandCloseConsumer closeConsumerCmd;
79-
closeConsumerCmd.set_consumer_id(consumerId);
80-
connection->handleCloseConsumer(closeConsumerCmd);
81-
}
82-
});
60+
schedule(connection, "CLOSE_CONSUMER" + std::to_string(requestId),
61+
requestDelays_["CLOSE_CONSUMER"], [connection] {
62+
std::vector<uint64_t> consumerIds;
63+
{
64+
std::lock_guard<std::mutex> lock{connection->mutex_};
65+
for (auto&& kv : connection->consumers_) {
66+
if (auto consumer = kv.second.lock()) {
67+
consumerIds.push_back(consumer->getConsumerId());
68+
}
69+
}
70+
}
71+
for (auto consumerId : consumerIds) {
72+
proto::CommandCloseConsumer closeConsumerCmd;
73+
closeConsumerCmd.set_consumer_id(consumerId);
74+
connection->handleCloseConsumer(closeConsumerCmd);
75+
}
76+
});
8377
}
84-
long delayMs = iter->second;
85-
auto timer = connection->executor_->createDeadlineTimer();
86-
auto key = request + std::to_string(requestId);
87-
pendingTimers_[key] = timer;
88-
timer->expires_from_now(std::chrono::milliseconds(delayMs));
89-
90-
LOG_INFO("Mock sending request " << key << " with delay " << delayMs << " ms");
91-
auto self = shared_from_this();
92-
timer->async_wait([this, self, key, connection, requestId, timer](const auto& ec) {
93-
{
94-
std::lock_guard<std::mutex> lock(mutex_);
95-
pendingTimers_.erase(key);
96-
}
97-
if (ec) {
98-
LOG_INFO("Timer cancelled for request " << key);
99-
return;
100-
}
101-
if (connection->isClosed()) {
102-
LOG_INFO("Connection is closed, not completing request " << key);
103-
return;
104-
}
105-
LOG_INFO("Completing delayed request " << key);
78+
schedule(connection, request + std::to_string(requestId), iter->second, [connection, requestId] {
10679
proto::CommandSuccess success;
10780
success.set_request_id(requestId);
10881
connection->handleSuccess(success);
@@ -135,6 +108,31 @@ class MockServer : public std::enable_shared_from_this<MockServer> {
135108
std::unordered_map<std::string, DeadlineTimerPtr> pendingTimers_;
136109
ClientConnectionWeakPtr connection_;
137110

111+
void schedule(ClientConnectionPtr& connection, const std::string& key, long delayMs,
112+
std::function<void()>&& task) {
113+
auto timer = connection->executor_->createDeadlineTimer();
114+
pendingTimers_[key] = timer;
115+
timer->expires_from_now(std::chrono::milliseconds(delayMs));
116+
LOG_INFO("Mock scheduling " << key << " with delay " << delayMs << " ms");
117+
auto self = shared_from_this();
118+
timer->async_wait([this, self, key, connection, task{std::move(task)}](const auto& ec) {
119+
{
120+
std::lock_guard<std::mutex> lock(mutex_);
121+
pendingTimers_.erase(key);
122+
}
123+
if (ec) {
124+
LOG_INFO("Timer cancelled for " << key);
125+
return;
126+
}
127+
if (connection->isClosed()) {
128+
LOG_INFO("Connection is closed, not completing request " << key);
129+
return;
130+
}
131+
LOG_INFO("Completing delayed request " << key);
132+
task();
133+
});
134+
}
135+
138136
DECLARE_LOG_OBJECT()
139137
};
140138

0 commit comments

Comments
 (0)