Skip to content

Commit 9159bb5

Browse files
committed
improve testReconnectionSlow
1 parent 98b06ec commit 9159bb5

2 files changed

Lines changed: 15 additions & 4 deletions

File tree

lib/MockServer.h

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ class MockServer : public std::enable_shared_from_this<MockServer> {
3636
public:
3737
using RequestDelayType = std::unordered_map<std::string, long /* delay in milliseconds */>;
3838

39-
MockServer(const ClientConnectionPtr& connection) : connection_(connection) {}
39+
MockServer(const ClientConnectionPtr& connection) : connection_(connection) {
40+
requestDelays_["CLOSE_CONSUMER"] = 1;
41+
}
4042

4143
void setRequestDelay(std::initializer_list<typename RequestDelayType::value_type> delays) {
4244
std::lock_guard<std::mutex> lock(mutex_);
@@ -54,7 +56,15 @@ class MockServer : public std::enable_shared_from_this<MockServer> {
5456
if (auto iter = requestDelays_.find(request); iter != requestDelays_.end()) {
5557
// Mock the `CLOSE_CONSUMER` command sent by broker, for simplicity, disconnect all consumers
5658
if (request == "SEEK") {
57-
connection->executor_->postWork([connection] {
59+
auto closeConsumerDelayMs = requestDelays_["CLOSE_CONSUMER"];
60+
auto timer = connection->executor_->createDeadlineTimer();
61+
pendingTimers_["CLOSE_CONSUMER" + std::to_string(requestId)] = timer;
62+
timer->expires_from_now(std::chrono::milliseconds(closeConsumerDelayMs));
63+
timer->async_wait([connection](const auto& ec) {
64+
if (ec) {
65+
LOG_INFO("Timer cancelled for CLOSE_CONSUMER");
66+
return;
67+
}
5868
std::vector<uint64_t> consumerIds;
5969
{
6070
std::lock_guard<std::mutex> lock{connection->mutex_};

tests/ConsumerSeekTest.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,10 +250,11 @@ TEST_F(ConsumerSeekTest, testReconnectionSlow) {
250250
connection->attachMockServer(mockServer);
251251

252252
// Make seek response received before `connectionOpened` is called
253-
mockServer->setRequestDelay({{"SEEK", 100}});
253+
mockServer->setRequestDelay({{"SEEK", 500}, {"CLOSE_CONSUMER", 1000}});
254254
assertSeekWithTimeout(consumer);
255255

256-
ASSERT_EQ(mockServer->close(), 0);
256+
// The CLOSE_CONSUMER request is in still flight
257+
ASSERT_EQ(mockServer->close(), 1);
257258
client.close();
258259
}
259260

0 commit comments

Comments
 (0)