Skip to content

Commit 967529b

Browse files
authored
[fix][client-cpp] Fix Reader segfault when messageListenerThreads=0 (apache#553)
1 parent 40259cc commit 967529b

4 files changed

Lines changed: 80 additions & 5 deletions

File tree

lib/ConsumerImpl.cc

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,11 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
622622
if (state == Closing || state == Closed) {
623623
return;
624624
}
625+
if (!listenerExecutor_) {
626+
LOG_ERROR(getName() << " listenerExecutor_ is null, discarding message to avoid null dereference");
627+
increaseAvailablePermits(cnx);
628+
return;
629+
}
625630
uint32_t numOfMessageReceived = m.impl_->metadata.num_messages_in_batch();
626631
if (ackGroupingTrackerPtr_->isDuplicate(m.getMessageId())) {
627632
LOG_DEBUG(getName() << " Ignoring message as it was ACKed earlier by same consumer.");
@@ -663,8 +668,11 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
663668
return;
664669
}
665670
// Trigger message listener callback in a separate thread
666-
while (numOfMessageReceived--) {
667-
listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, get_shared_this_ptr()));
671+
if (listenerExecutor_) {
672+
while (numOfMessageReceived--) {
673+
listenerExecutor_->postWork(
674+
std::bind(&ConsumerImpl::internalListener, get_shared_this_ptr()));
675+
}
668676
}
669677
}
670678
}
@@ -713,8 +721,12 @@ void ConsumerImpl::executeNotifyCallback(Message& msg) {
713721

714722
// has pending receive, direct callback.
715723
if (asyncReceivedWaiting) {
716-
listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
717-
get_shared_this_ptr(), ResultOk, msg, callback));
724+
if (listenerExecutor_) {
725+
listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
726+
get_shared_this_ptr(), ResultOk, msg, callback));
727+
} else {
728+
notifyPendingReceivedCallback(ResultOk, msg, callback);
729+
}
718730
return;
719731
}
720732

lib/ExecutorService.cc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
#include "ExecutorService.h"
2020

21+
#include <algorithm>
22+
2123
#include "LogUtils.h"
2224
#include "TimeUtils.h"
2325
DECLARE_LOG_OBJECT()
@@ -128,9 +130,12 @@ void ExecutorService::close(long timeoutMs) {
128130
/////////////////////
129131

130132
ExecutorServiceProvider::ExecutorServiceProvider(int nthreads)
131-
: executors_(nthreads), executorIdx_(0), mutex_() {}
133+
: executors_(std::max(1, nthreads)), executorIdx_(0), mutex_() {}
132134

133135
ExecutorServicePtr ExecutorServiceProvider::get(size_t idx) {
136+
if (executors_.empty()) {
137+
return nullptr;
138+
}
134139
idx %= executors_.size();
135140
Lock lock(mutex_);
136141

tests/ReaderTest.cc

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
#include <gtest/gtest.h>
2020
#include <pulsar/Client.h>
21+
#include <pulsar/ClientConfiguration.h>
2122
#include <pulsar/Reader.h>
2223
#include <time.h>
2324

@@ -982,5 +983,49 @@ TEST_F(ReaderSeekTest, testSeekInclusiveChunkMessage) {
982983
assertStartMessageId(false, secondMsgId);
983984
}
984985

986+
// Regression test for segfault when Reader is used with messageListenerThreads=0.
987+
// Verifies ExecutorServiceProvider(0) does not cause undefined behavior and
988+
// ConsumerImpl::messageReceived does not dereference null listenerExecutor_.
989+
TEST(ReaderTest, testReaderWithZeroMessageListenerThreads) {
990+
ClientConfiguration clientConf;
991+
clientConf.setMessageListenerThreads(0);
992+
Client client(serviceUrl, clientConf);
993+
994+
const std::string topicName = "testReaderWithZeroMessageListenerThreads-" + std::to_string(time(nullptr));
995+
996+
Producer producer;
997+
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
998+
999+
ReaderConfiguration readerConf;
1000+
Reader reader;
1001+
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
1002+
1003+
constexpr int numMessages = 5;
1004+
for (int i = 0; i < numMessages; i++) {
1005+
Message msg = MessageBuilder().setContent("msg-" + std::to_string(i)).build();
1006+
ASSERT_EQ(ResultOk, producer.send(msg));
1007+
}
1008+
1009+
int received = 0;
1010+
for (int i = 0; i < numMessages + 2; i++) {
1011+
bool hasMessageAvailable = false;
1012+
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
1013+
if (!hasMessageAvailable) {
1014+
break;
1015+
}
1016+
Message msg;
1017+
Result res = reader.readNext(msg, 3000);
1018+
ASSERT_EQ(ResultOk, res) << "readNext failed at iteration " << i;
1019+
std::string content = msg.getDataAsString();
1020+
EXPECT_EQ("msg-" + std::to_string(received), content);
1021+
++received;
1022+
}
1023+
EXPECT_EQ(received, numMessages);
1024+
1025+
producer.close();
1026+
reader.close();
1027+
client.close();
1028+
}
1029+
9851030
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));
9861031
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderSeekTest, ::testing::Values(true, false));

tests/RetryableOperationCacheTest.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <chrono>
2323
#include <stdexcept>
2424

25+
#include "lib/ExecutorService.h"
2526
#include "lib/RetryableOperationCache.h"
2627

2728
namespace pulsar {
@@ -82,6 +83,18 @@ class RetryableOperationCacheTest : public ::testing::Test {
8283

8384
using namespace pulsar;
8485

86+
// Regression test: ExecutorServiceProvider(0) must not cause undefined behavior (e.g. idx % 0).
87+
// After fix, nthreads is clamped to at least 1, so get() returns a valid executor.
88+
TEST(ExecutorServiceProviderTest, ZeroThreadsReturnsValidExecutor) {
89+
ExecutorServiceProviderPtr provider = std::make_shared<ExecutorServiceProvider>(0);
90+
for (int i = 0; i < 3; i++) {
91+
ExecutorServicePtr executor = provider->get();
92+
ASSERT_NE(executor, nullptr)
93+
<< "get() must not return null when created with 0 threads (clamped to 1)";
94+
}
95+
provider->close();
96+
}
97+
8598
TEST_F(RetryableOperationCacheTest, testRetry) {
8699
auto cache = RetryableOperationCache<int>::create(provider_, std::chrono::seconds(30));
87100
for (int i = 0; i < 10; i++) {

0 commit comments

Comments
 (0)