Skip to content

Commit 3850259

Browse files
committed
fix unstable tests
1 parent 7fd7fed commit 3850259

3 files changed

Lines changed: 47 additions & 11 deletions

File tree

tests/BasicEndToEndTest.cc

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3188,7 +3188,17 @@ static void expectTimeoutOnRecv(Consumer &consumer) {
31883188
ASSERT_EQ(ResultTimeout, res);
31893189
}
31903190

3191-
void testNegativeAcks(const std::string &topic, bool batchingEnabled) {
3191+
static std::vector<std::string> expectedNegativeAckMessages(size_t numMessages) {
3192+
std::vector<std::string> expected;
3193+
expected.reserve(numMessages);
3194+
for (size_t i = 0; i < numMessages; i++) {
3195+
expected.emplace_back("test-" + std::to_string(i));
3196+
}
3197+
return expected;
3198+
}
3199+
3200+
void testNegativeAcks(const std::string &topic, bool batchingEnabled, bool expectOrdered = true) {
3201+
constexpr size_t numMessages = 10;
31923202
Client client(lookupUrl);
31933203
Consumer consumer;
31943204
ConsumerConfiguration conf;
@@ -3202,22 +3212,32 @@ void testNegativeAcks(const std::string &topic, bool batchingEnabled) {
32023212
result = client.createProducer(topic, producerConf, producer);
32033213
ASSERT_EQ(ResultOk, result);
32043214

3205-
for (int i = 0; i < 10; i++) {
3215+
for (size_t i = 0; i < numMessages; i++) {
32063216
Message msg = MessageBuilder().setContent("test-" + std::to_string(i)).build();
32073217
producer.sendAsync(msg, nullptr);
32083218
}
32093219

32103220
producer.flush();
32113221

3222+
std::vector<std::string> receivedMessages;
3223+
receivedMessages.reserve(numMessages);
32123224
std::vector<MessageId> toNeg;
3213-
for (int i = 0; i < 10; i++) {
3225+
for (size_t i = 0; i < numMessages; i++) {
32143226
Message msg;
32153227
consumer.receive(msg);
32163228

32173229
LOG_INFO("Received message " << msg.getDataAsString());
3218-
ASSERT_EQ(msg.getDataAsString(), "test-" + std::to_string(i));
3230+
if (expectOrdered) {
3231+
ASSERT_EQ(msg.getDataAsString(), "test-" + std::to_string(i));
3232+
}
3233+
receivedMessages.emplace_back(msg.getDataAsString());
32193234
toNeg.push_back(msg.getMessageId());
32203235
}
3236+
if (!expectOrdered) {
3237+
auto expectedMessages = expectedNegativeAckMessages(numMessages);
3238+
std::sort(receivedMessages.begin(), receivedMessages.end());
3239+
ASSERT_EQ(expectedMessages, receivedMessages);
3240+
}
32213241
// No more messages expected
32223242
expectTimeoutOnRecv(consumer);
32233243

@@ -3228,15 +3248,25 @@ void testNegativeAcks(const std::string &topic, bool batchingEnabled) {
32283248
}
32293249
PulsarFriend::setNegativeAckEnabled(consumer, true);
32303250

3231-
for (int i = 0; i < 10; i++) {
3251+
std::vector<std::string> redeliveredMessages;
3252+
redeliveredMessages.reserve(numMessages);
3253+
for (size_t i = 0; i < numMessages; i++) {
32323254
Message msg;
32333255
consumer.receive(msg);
32343256
LOG_INFO("-- Redelivery -- Received message " << msg.getDataAsString());
32353257

3236-
ASSERT_EQ(msg.getDataAsString(), "test-" + std::to_string(i));
3258+
if (expectOrdered) {
3259+
ASSERT_EQ(msg.getDataAsString(), "test-" + std::to_string(i));
3260+
}
3261+
redeliveredMessages.emplace_back(msg.getDataAsString());
32373262

32383263
consumer.acknowledge(msg);
32393264
}
3265+
if (!expectOrdered) {
3266+
auto expectedMessages = expectedNegativeAckMessages(numMessages);
3267+
std::sort(redeliveredMessages.begin(), redeliveredMessages.end());
3268+
ASSERT_EQ(expectedMessages, redeliveredMessages);
3269+
}
32403270

32413271
// No more messages expected
32423272
expectTimeoutOnRecv(consumer);
@@ -3262,7 +3292,7 @@ TEST(BasicEndToEndTest, testNegativeAcksWithPartitions) {
32623292
LOG_INFO("res = " << res);
32633293
ASSERT_FALSE(res != 204 && res != 409);
32643294

3265-
testNegativeAcks(topicName, true);
3295+
testNegativeAcks(topicName, true, false);
32663296
}
32673297

32683298
void testNegativeAckPrecisionBitCnt(const std::string &topic, int precisionBitCnt) {

tests/MultiTopicsConsumerTest.cc

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,11 +162,12 @@ TEST(MultiTopicsConsumerTest, testGetConsumerStatsFail) {
162162
BrokerConsumerStats stats;
163163
return consumer.getBrokerConsumerStats(stats);
164164
});
165-
// Trigger the `getBrokerConsumerStats` in a new thread
166-
future.wait_for(std::chrono::milliseconds(100));
167-
std::this_thread::sleep_for(std::chrono::milliseconds(100));
165+
const auto expectedRequests = topics.size();
166+
ASSERT_TRUE(waitUntil(std::chrono::seconds(1), [connection, expectedRequests] {
167+
return PulsarFriend::getPendingConsumerStatsRequests(*connection) == expectedRequests;
168+
}));
168169

169-
connection->handleKeepAliveTimeout(ASIO_SUCCESS);
170+
connection->close(ResultDisconnected);
170171
ASSERT_EQ(ResultDisconnected, future.get());
171172

172173
mockServer->close();

tests/PulsarFriend.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,11 @@ class PulsarFriend {
162162
return consumers;
163163
}
164164

165+
static size_t getPendingConsumerStatsRequests(const ClientConnection& cnx) {
166+
std::lock_guard<std::mutex> lock(cnx.mutex_);
167+
return cnx.pendingConsumerStatsMap_.size();
168+
}
169+
165170
static void setNegativeAckEnabled(Consumer consumer, bool enabled) {
166171
consumer.impl_->setNegativeAcknowledgeEnabledForTesting(enabled);
167172
}

0 commit comments

Comments
 (0)