Skip to content

Commit 1303c05

Browse files
committed
fix tests
1 parent 814afb1 commit 1303c05

6 files changed

Lines changed: 165 additions & 53 deletions

File tree

lib/ClientConnection.cc

Lines changed: 6 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include "ConsumerImpl.h"
3333
#include "ExecutorService.h"
3434
#include "LogUtils.h"
35+
#include "MockServer.h"
3536
#include "OpSendMsg.h"
3637
#include "ProducerImpl.h"
3738
#include "PulsarApi.pb.h"
@@ -1190,39 +1191,11 @@ Future<Result, ResponseData> ClientConnection::sendRequestWithId(const SharedBuf
11901191
lock.unlock();
11911192

11921193
LOG_DEBUG(cnxString_ << "Inserted request " << requestType << " (req_id: " << requestId << ")");
1193-
if (mockingRequests_) {
1194-
if (auto iter = mockRequestDelays_.find(requestType); iter != mockRequestDelays_.end()) {
1195-
auto self = shared_from_this();
1196-
if (strcmp(requestType, "SEEK") == 0) {
1197-
// Mock the `CLOSE_CONSUMER` command sent by broker, for simplicity, disconnect all consumers
1198-
executor_->postWork([this, self] {
1199-
std::vector<uint64_t> consumerIds;
1200-
{
1201-
Lock lock(mutex_);
1202-
for (const auto& entry : consumers_) {
1203-
if (auto consumer = entry.second.lock()) {
1204-
consumerIds.push_back(consumer->getConsumerId());
1205-
}
1206-
}
1207-
}
1208-
for (auto consumerId : consumerIds) {
1209-
proto::CommandCloseConsumer closeConsumerCmd;
1210-
closeConsumerCmd.set_consumer_id(consumerId);
1211-
self->handleCloseConsumer(closeConsumerCmd);
1212-
}
1213-
});
1214-
}
1215-
auto timer = executor_->createDeadlineTimer();
1216-
timer->expires_after(std::chrono::milliseconds(iter->second));
1217-
LOG_INFO("Request " << requestType << " (req_id: " << requestId << ") is being delayed for "
1218-
<< iter->second << " ms");
1219-
timer->async_wait([self, cmd, requestId, timer](const ASIO_ERROR& ec) {
1220-
LOG_INFO("Complete request id: " << requestId);
1221-
proto::CommandSuccess success;
1222-
success.set_request_id(requestId);
1223-
self->handleSuccess(success);
1224-
});
1225-
} else {
1194+
if (mockingRequests_.load(std::memory_order_acquire)) {
1195+
if (mockServer_ == nullptr) {
1196+
LOG_WARN(cnxString_ << "Mock server is unexpectedly null when processing " << requestType);
1197+
sendCommand(cmd);
1198+
} else if (!mockServer_->sendRequest(requestType, requestId)) {
12261199
sendCommand(cmd);
12271200
}
12281201
} else {

lib/ClientConnection.h

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ struct ResponseData {
115115

116116
typedef std::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;
117117

118+
class MockServer;
118119
class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<ClientConnection> {
119120
enum State : uint8_t
120121
{
@@ -211,12 +212,11 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
211212
Future<Result, SchemaInfo> newGetSchema(const std::string& topicName, const std::string& version,
212213
uint64_t requestId);
213214

214-
void mockRequestDelay(RequestDelayType requestDelays) {
215-
if (mockingRequests_) {
216-
throw new std::runtime_error("Already mocking requests");
217-
}
218-
mockRequestDelays_.swap(requestDelays);
219-
mockingRequests_ = true;
215+
void attachMockServer(const std::shared_ptr<MockServer>& mockServer) {
216+
mockServer_ = mockServer;
217+
// Mark that requests will first go through the mock server, if the mock server cannot process it,
218+
// fall back to the normal logic
219+
mockingRequests_.store(true, std::memory_order_release);
220220
}
221221

222222
private:
@@ -320,6 +320,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
320320
}
321321
}
322322

323+
void mockSendCommand(const char* requestType, uint64_t requestId, const SharedBuffer& cmd);
324+
323325
std::atomic<State> state_{Pending};
324326
TimeDuration operationsTimeout_;
325327
AuthenticationPtr authentication_;
@@ -403,10 +405,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
403405
DeadlineTimerPtr keepAliveTimer_;
404406
DeadlineTimerPtr consumerStatsRequestTimer_;
405407

406-
// This map should only be modified before modifying `mockingRequests_` to true for thread safety, when
407-
// `mockingRequests_` is false, this field will never be accessed
408-
RequestDelayType mockRequestDelays_;
409408
std::atomic_bool mockingRequests_{false};
409+
std::shared_ptr<MockServer> mockServer_;
410410

411411
void handleConsumerStatsTimeout(const ASIO_ERROR& ec, const std::vector<uint64_t>& consumerStatsRequests);
412412

@@ -422,6 +422,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
422422

423423
friend class PulsarFriend;
424424
friend class ConsumerTest;
425+
friend class MockServer;
425426

426427
void checkServerError(ServerError error, const std::string& message);
427428

lib/ConsumerImpl.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1735,7 +1735,7 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c
17351735
}
17361736

17371737
auto expected = false;
1738-
if (hasPendingSeek_.compare_exchange_strong(expected, true)) {
1738+
if (!hasPendingSeek_.compare_exchange_strong(expected, true)) {
17391739
LOG_ERROR(getName() << " attempted to seek " << seekArg << " when there is a pending seek");
17401740
callback(ResultNotAllowedError);
17411741
return;

lib/ConsumerImpl.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -226,10 +226,12 @@ class ConsumerImpl : public ConsumerImplBase {
226226
void seekAsyncInternal(long requestId, const SharedBuffer& seek, const SeekArg& seekArg,
227227
ResultCallback&& callback);
228228
void completeSeekCallback(Result result) {
229-
if (auto callback = seekCallback_.release()) {
230-
callback(result);
229+
bool expected = true;
230+
if (hasPendingSeek_.compare_exchange_strong(expected, false)) {
231+
if (auto callback = seekCallback_.release()) {
232+
callback(result);
233+
}
231234
}
232-
hasPendingSeek_.store(false, std::memory_order_release);
233235
}
234236
void processPossibleToDLQ(const MessageId& messageId, const ProcessDLQCallBack& cb);
235237

lib/MockServer.h

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#pragma once
20+
21+
#include <initializer_list>
22+
#include <mutex>
23+
#include <unordered_map>
24+
#include <vector>
25+
26+
#include "ClientConnection.h"
27+
#include "ConsumerImpl.h"
28+
#include "ExecutorService.h"
29+
#include "LogUtils.h"
30+
#include "PulsarApi.pb.h"
31+
32+
namespace pulsar {
33+
34+
class MockServer {
35+
public:
36+
using RequestDelayType = std::unordered_map<std::string, long /* delay in milliseconds */>;
37+
38+
MockServer(const ClientConnectionPtr& connection) : connection_(connection) {}
39+
40+
void setRequestDelay(std::initializer_list<typename RequestDelayType::value_type> delays) {
41+
std::lock_guard<std::mutex> lock(mutex_);
42+
for (auto&& delay : delays) {
43+
requestDelays_[delay.first] = delay.second;
44+
}
45+
}
46+
47+
bool sendRequest(const std::string& request, uint64_t requestId) {
48+
auto connection = connection_.lock();
49+
if (!connection) {
50+
return false;
51+
}
52+
std::lock_guard<std::mutex> lock(mutex_);
53+
if (auto iter = requestDelays_.find(request); iter != requestDelays_.end()) {
54+
// Mock the `CLOSE_CONSUMER` command sent by broker, for simplicity, disconnect all consumers
55+
if (request == "SEEK") {
56+
connection->executor_->postWork([connection] {
57+
std::vector<uint64_t> consumerIds;
58+
{
59+
std::lock_guard<std::mutex> lock{connection->mutex_};
60+
for (auto&& kv : connection->consumers_) {
61+
if (auto consumer = kv.second.lock()) {
62+
consumerIds.push_back(consumer->getConsumerId());
63+
}
64+
}
65+
}
66+
for (auto consumerId : consumerIds) {
67+
proto::CommandCloseConsumer closeConsumerCmd;
68+
closeConsumerCmd.set_consumer_id(consumerId);
69+
connection->handleCloseConsumer(closeConsumerCmd);
70+
}
71+
});
72+
}
73+
long delayMs = iter->second;
74+
auto timer = connection->executor_->createDeadlineTimer();
75+
timer->expires_from_now(std::chrono::milliseconds(delayMs));
76+
timer->async_wait([connection, requestId, request, timer](const auto& ec) {
77+
if (ec) {
78+
LOG_INFO("Timer cancelled for request " << request << " with id " << requestId);
79+
return;
80+
}
81+
if (connection->isClosed()) {
82+
LOG_INFO("Connection is closed, not completing request " << request << " with id "
83+
<< requestId);
84+
return;
85+
}
86+
LOG_INFO("Completing delayed request " << request << " with id " << requestId);
87+
proto::CommandSuccess success;
88+
success.set_request_id(requestId);
89+
connection->handleSuccess(success);
90+
});
91+
return true;
92+
} else {
93+
return false;
94+
}
95+
}
96+
97+
private:
98+
mutable std::mutex mutex_;
99+
std::unordered_map<std::string, long> requestDelays_;
100+
ClientConnectionWeakPtr connection_;
101+
102+
DECLARE_LOG_OBJECT()
103+
};
104+
105+
} // namespace pulsar

tests/ConsumerSeekTest.cc

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,17 @@
1919
#include <gtest/gtest.h>
2020
#include <pulsar/Client.h>
2121

22+
#include <chrono>
23+
#include <future>
24+
#include <memory>
2225
#include <set>
2326
#include <stdexcept>
2427
#include <string>
2528

2629
#include "HttpHelper.h"
27-
#include "lib/Latch.h"
30+
#include "lib/ClientConnection.h"
2831
#include "lib/LogUtils.h"
32+
#include "lib/MockServer.h"
2933
#include "tests/PulsarFriend.h"
3034

3135
DECLARE_LOG_OBJECT()
@@ -202,19 +206,46 @@ TEST_F(ConsumerSeekTest, testNoInternalConsumer) {
202206
ASSERT_EQ(ResultOk, consumer.seek(MessageId::earliest()));
203207
}
204208

209+
static void assertSeekWithTimeout(Consumer& consumer) {
210+
using namespace std::chrono_literals;
211+
auto promise = std::make_shared<std::promise<Result>>();
212+
consumer.seekAsync(0L, [promise](Result result) { promise->set_value(result); });
213+
auto future = promise->get_future();
214+
ASSERT_EQ(future.wait_for(5s), std::future_status::ready);
215+
ASSERT_EQ(future.get(), ResultOk);
216+
}
217+
205218
// Verify the `seek` method won't be blocked forever in any order of the Subscribe response and Seek response
206219
TEST_F(ConsumerSeekTest, testSubscribeSeekRaces) {
207220
Client client(lookupUrl);
208221
Consumer consumer;
209222
ASSERT_EQ(ResultOk, client.subscribe("testSubscribeSeekRaces", "sub", consumer));
210223

211-
for (auto&& connection : PulsarFriend::getConnections(client)) {
212-
connection->mockRequestDelay({{"SUBSCRIBE", 1000}, {"SEEK", 600}});
213-
}
224+
auto connection = *PulsarFriend::getConnections(client).begin();
225+
auto mockServer = std::make_shared<MockServer>(connection);
226+
connection->attachMockServer(mockServer);
227+
228+
mockServer->setRequestDelay({{"SUBSCRIBE", 1000}, {"SEEK", 500}});
229+
assertSeekWithTimeout(consumer);
230+
231+
mockServer->setRequestDelay({{"SUBSCRIBE", 500}, {"SEEK", 1000}});
232+
assertSeekWithTimeout(consumer);
233+
234+
client.close();
235+
}
236+
237+
TEST_F(ConsumerSeekTest, testReconnectionSlow) {
238+
Client client(lookupUrl, ClientConfiguration().setInitialBackoffIntervalMs(500));
239+
Consumer consumer;
240+
ASSERT_EQ(ResultOk, client.subscribe("testSubscribeSeekRaces", "sub", consumer));
241+
242+
auto connection = *PulsarFriend::getConnections(client).begin();
243+
auto mockServer = std::make_shared<MockServer>(connection);
244+
connection->attachMockServer(mockServer);
214245

215-
Latch latch(1);
216-
consumer.seekAsync(0L, [&latch](Result result) { latch.countdown(); });
217-
ASSERT_TRUE(latch.wait(std::chrono::seconds(5)));
246+
// Make seek response received before `connectionOpened` is called
247+
mockServer->setRequestDelay({{"SEEK", 100}});
248+
assertSeekWithTimeout(consumer);
218249

219250
client.close();
220251
}

0 commit comments

Comments
 (0)