Skip to content

Commit 9d32866

Browse files
committed
Add tests for the race
1 parent 2f6ab2b commit 9d32866

3 files changed

Lines changed: 73 additions & 1 deletion

File tree

lib/ClientConnection.cc

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <openssl/x509.h>
2222
#include <pulsar/MessageIdBuilder.h>
2323

24+
#include <chrono>
2425
#include <fstream>
2526

2627
#include "AsioDefines.h"
@@ -1189,7 +1190,44 @@ Future<Result, ResponseData> ClientConnection::sendRequestWithId(const SharedBuf
11891190
lock.unlock();
11901191

11911192
LOG_DEBUG(cnxString_ << "Inserted request " << requestType << " (req_id: " << requestId << ")");
1192-
sendCommand(cmd);
1193+
if (mockingRequests_) {
1194+
if (auto iter = mockRequestDelays_.find(requestType); iter != mockRequestDelays_.end()) {
1195+
auto self = shared_from_this();
1196+
if (strcmp(requestType, "SEEK") == 0) {
1197+
// Mock the `CLOSE_CONSUMER` command sent by broker, for simplicity, disconnect all consumers
1198+
executor_->postWork([this, self] {
1199+
std::vector<uint64_t> consumerIds;
1200+
{
1201+
Lock lock(mutex_);
1202+
for (const auto& entry : consumers_) {
1203+
if (auto consumer = entry.second.lock()) {
1204+
consumerIds.push_back(consumer->getConsumerId());
1205+
}
1206+
}
1207+
}
1208+
for (auto consumerId : consumerIds) {
1209+
proto::CommandCloseConsumer closeConsumerCmd;
1210+
closeConsumerCmd.set_consumer_id(consumerId);
1211+
self->handleCloseConsumer(closeConsumerCmd);
1212+
}
1213+
});
1214+
}
1215+
auto timer = executor_->createDeadlineTimer();
1216+
timer->expires_after(std::chrono::milliseconds(iter->second));
1217+
LOG_INFO("Request " << requestType << " (req_id: " << requestId << ") is being delayed for "
1218+
<< iter->second << " ms");
1219+
timer->async_wait([self, cmd, requestId, timer](const ASIO_ERROR& ec) {
1220+
LOG_INFO("Complete request id: " << requestId);
1221+
proto::CommandSuccess success;
1222+
success.set_request_id(requestId);
1223+
self->handleSuccess(success);
1224+
});
1225+
} else {
1226+
sendCommand(cmd);
1227+
}
1228+
} else {
1229+
sendCommand(cmd);
1230+
}
11931231
return requestData.promise.getFuture();
11941232
}
11951233

lib/ClientConnection.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
123123
Ready,
124124
Disconnected
125125
};
126+
using RequestDelayType =
127+
std::unordered_map<std::string /* request type */, long /* delay in milliseconds */>;
126128

127129
public:
128130
typedef std::shared_ptr<ASIO::ip::tcp::socket> SocketPtr;
@@ -209,6 +211,14 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
209211
Future<Result, SchemaInfo> newGetSchema(const std::string& topicName, const std::string& version,
210212
uint64_t requestId);
211213

214+
void mockRequestDelay(RequestDelayType requestDelays) {
215+
if (mockingRequests_) {
216+
throw new std::runtime_error("Already mocking requests");
217+
}
218+
mockRequestDelays_.swap(requestDelays);
219+
mockingRequests_ = true;
220+
}
221+
212222
private:
213223
struct PendingRequestData {
214224
Promise<Result, ResponseData> promise;
@@ -393,6 +403,11 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
393403
DeadlineTimerPtr keepAliveTimer_;
394404
DeadlineTimerPtr consumerStatsRequestTimer_;
395405

406+
// This map should only be modified before modifying `mockingRequests_` to true for thread safety, when
407+
// `mockingRequests_` is false, this field will never be accessed
408+
RequestDelayType mockRequestDelays_;
409+
std::atomic_bool mockingRequests_{false};
410+
396411
void handleConsumerStatsTimeout(const ASIO_ERROR& ec, const std::vector<uint64_t>& consumerStatsRequests);
397412

398413
void startConsumerStatsTimer(std::vector<uint64_t> consumerStatsRequests);

tests/ConsumerSeekTest.cc

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
#include <string>
2525

2626
#include "HttpHelper.h"
27+
#include "lib/Latch.h"
2728
#include "lib/LogUtils.h"
29+
#include "tests/PulsarFriend.h"
2830

2931
DECLARE_LOG_OBJECT()
3032

@@ -200,6 +202,23 @@ TEST_F(ConsumerSeekTest, testNoInternalConsumer) {
200202
ASSERT_EQ(ResultOk, consumer.seek(MessageId::earliest()));
201203
}
202204

205+
// Verify the `seek` method won't be blocked forever in any order of the Subscribe response and Seek response
206+
TEST_F(ConsumerSeekTest, testSubscribeSeekRaces) {
207+
Client client(lookupUrl);
208+
Consumer consumer;
209+
ASSERT_EQ(ResultOk, client.subscribe("testSubscribeSeekRaces", "sub", consumer));
210+
211+
for (auto&& connection : PulsarFriend::getConnections(client)) {
212+
connection->mockRequestDelay({{"SUBSCRIBE", 1000}, {"SEEK", 600}});
213+
}
214+
215+
Latch latch(1);
216+
consumer.seekAsync(0L, [&latch](Result result) { latch.countdown(); });
217+
ASSERT_TRUE(latch.wait(std::chrono::seconds(5)));
218+
219+
client.close();
220+
}
221+
203222
INSTANTIATE_TEST_SUITE_P(Pulsar, ConsumerSeekTest, ::testing::Values(true, false));
204223

205224
} // namespace pulsar

0 commit comments

Comments
 (0)