Skip to content

Commit bd72300

Browse files
committed
Avoid blocking the message listener threads
1 parent a0f2d32 commit bd72300

6 files changed

Lines changed: 83 additions & 10 deletions

File tree

lib/ConsumerImpl.cc

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1032,7 +1032,9 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) {
10321032
return;
10331033
}
10341034

1035-
increaseAvailablePermits(currentCnx);
1035+
if (!hasParent_) {
1036+
increaseAvailablePermits(currentCnx);
1037+
}
10361038
if (track) {
10371039
trackMessage(msg.getMessageId());
10381040
}
@@ -1089,6 +1091,16 @@ void ConsumerImpl::increaseAvailablePermits(const ClientConnectionPtr& currentCn
10891091
}
10901092
}
10911093

1094+
void ConsumerImpl::increaseAvailablePermits(const Message& msg) {
1095+
ClientConnectionPtr currentCnx = getCnx().lock();
1096+
if (currentCnx && msg.impl_->cnx_ != currentCnx.get()) {
1097+
LOG_DEBUG(getName() << "Not adding permit since connection is different.");
1098+
return;
1099+
}
1100+
1101+
increaseAvailablePermits(currentCnx);
1102+
}
1103+
10921104
inline CommandSubscribe_SubType ConsumerImpl::getSubType() {
10931105
ConsumerType type = config_.getConsumerType();
10941106
switch (type) {

lib/ConsumerImpl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ class ConsumerImpl : public ConsumerImplBase {
168168
void discardCorruptedMessage(const ClientConnectionPtr& cnx, const proto::MessageIdData& messageId,
169169
CommandAck_ValidationError validationError);
170170
void increaseAvailablePermits(const ClientConnectionPtr& currentCnx, int delta = 1);
171+
void increaseAvailablePermits(const Message& msg);
171172
void drainIncomingMessageQueue(size_t count);
172173
uint32_t receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx, Message& batchedMessage,
173174
const BitSet& ackSet, int redeliveryCount);

lib/MessageImpl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ class MessageImpl {
4747
int redeliveryCount_;
4848
bool hasSchemaVersion_;
4949
const std::string* schemaVersion_;
50+
std::shared_ptr<class ConsumerImpl> consumerPtr_;
5051

5152
const std::string& getPartitionKey() const;
5253
bool hasPartitionKey() const;

lib/MultiTopicsConsumerImpl.cc

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,7 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
519519
LOG_DEBUG("Received Message from one of the topic - " << consumer.getTopic()
520520
<< " message:" << msg.getDataAsString());
521521
msg.impl_->setTopicName(consumer.impl_->topic_);
522+
msg.impl_->consumerPtr_ = std::static_pointer_cast<ConsumerImpl>(consumer.impl_);
522523

523524
Lock lock(pendingReceiveMutex_);
524525
if (!pendingReceives_.empty()) {
@@ -530,18 +531,12 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
530531
auto self = weakSelf.lock();
531532
if (self) {
532533
notifyPendingReceivedCallback(ResultOk, msg, callback);
534+
msg.impl_->consumerPtr_->increaseAvailablePermits(msg);
533535
}
534536
});
535537
return;
536538
}
537539

538-
if (incomingMessages_.full()) {
539-
lock.unlock();
540-
}
541-
542-
// add message to block queue.
543-
// when messages queue is full, will block listener thread on ConsumerImpl,
544-
// then will not send permits to broker, will broker stop push message.
545540
incomingMessages_.push(msg);
546541
incomingMessagesSize_.fetch_add(msg.getLength());
547542

@@ -1072,6 +1067,7 @@ void MultiTopicsConsumerImpl::notifyBatchPendingReceivedCallback(const BatchRece
10721067
void MultiTopicsConsumerImpl::messageProcessed(Message& msg) {
10731068
incomingMessagesSize_.fetch_sub(msg.getLength());
10741069
unAckedMessageTrackerPtr_->add(msg.getMessageId());
1070+
msg.impl_->consumerPtr_->increaseAvailablePermits(msg);
10751071
}
10761072

10771073
std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImpl::get_shared_this_ptr() {

lib/MultiTopicsConsumerImpl.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
#include <memory>
2525
#include <vector>
2626

27-
#include "BlockingQueue.h"
2827
#include "Commands.h"
2928
#include "ConsumerImplBase.h"
3029
#include "ConsumerInterceptors.h"
@@ -33,6 +32,7 @@
3332
#include "LookupDataResult.h"
3433
#include "SynchronizedHashMap.h"
3534
#include "TestUtil.h"
35+
#include "UnboundedBlockingQueue.h"
3636

3737
namespace pulsar {
3838
typedef std::shared_ptr<Promise<Result, Consumer>> ConsumerSubResultPromisePtr;
@@ -115,7 +115,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
115115
std::map<std::string, int> topicsPartitions_;
116116
mutable std::mutex mutex_;
117117
std::mutex pendingReceiveMutex_;
118-
BlockingQueue<Message> incomingMessages_;
118+
UnboundedBlockingQueue<Message> incomingMessages_;
119119
std::atomic_int incomingMessagesSize_ = {0};
120120
MessageListener messageListener_;
121121
DeadlineTimerPtr partitionsUpdateTimer_;

tests/ConsumerTest.cc

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1335,4 +1335,67 @@ TEST(ConsumerTest, testRetrySubscribe) {
13351335
// milliseconds
13361336
}
13371337

1338+
TEST(ConsumerTest, testNoListenerThreadBlocking) {
1339+
Client client{lookupUrl};
1340+
1341+
const int numPartitions = 2;
1342+
const std::string partitionedTopic = "testNoListenerThreadBlocking-" + std::to_string(time(nullptr));
1343+
int res =
1344+
makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions",
1345+
std::to_string(numPartitions));
1346+
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
1347+
1348+
const int receiverQueueSize = 1;
1349+
const int receiverQueueSizeAcrossPartitions = receiverQueueSize * numPartitions;
1350+
1351+
Consumer consumer1, consumer2;
1352+
ConsumerConfiguration consumerConfig;
1353+
consumerConfig.setReceiverQueueSize(receiverQueueSize);
1354+
consumerConfig.setMaxTotalReceiverQueueSizeAcrossPartitions(receiverQueueSizeAcrossPartitions);
1355+
Result consumerResult;
1356+
consumerResult = client.subscribe(partitionedTopic, "sub1", consumerConfig, consumer1);
1357+
ASSERT_EQ(consumerResult, ResultOk);
1358+
consumerResult = client.subscribe(partitionedTopic, "sub2", consumerConfig, consumer2);
1359+
ASSERT_EQ(consumerResult, ResultOk);
1360+
1361+
Producer producer;
1362+
ProducerConfiguration producerConfig;
1363+
producerConfig.setBatchingEnabled(false);
1364+
producerConfig.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
1365+
Result producerResult = client.createProducer(partitionedTopic, producerConfig, producer);
1366+
ASSERT_EQ(producerResult, ResultOk);
1367+
1368+
const int msgCount = receiverQueueSizeAcrossPartitions * 100;
1369+
1370+
for (int i = 0; i < msgCount; ++i) {
1371+
auto msg = MessageBuilder().setContent("test").build();
1372+
producer.sendAsync(msg, [](Result code, const MessageId& messageId) {});
1373+
}
1374+
producer.flush();
1375+
producer.close();
1376+
1377+
std::this_thread::sleep_for(std::chrono::milliseconds(200));
1378+
1379+
// check consumer1 prefetch num
1380+
auto multiConsumerImpl = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer1);
1381+
int prefetchNum = multiConsumerImpl->getNumOfPrefetchedMessages();
1382+
ASSERT_LE(prefetchNum, receiverQueueSizeAcrossPartitions);
1383+
1384+
// read consumer2 while consumer1 reaches the prefech limit
1385+
for (int i = 0; i < msgCount; ++i) {
1386+
auto multiConsumerImpl = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer2);
1387+
int prefetchNum = multiConsumerImpl->getNumOfPrefetchedMessages();
1388+
ASSERT_LE(prefetchNum, receiverQueueSizeAcrossPartitions);
1389+
1390+
Message msg;
1391+
Result ret = consumer2.receive(msg, 1000);
1392+
ASSERT_EQ(ret, ResultOk);
1393+
consumer2.acknowledge(msg);
1394+
}
1395+
1396+
consumer2.close();
1397+
consumer1.close();
1398+
client.close();
1399+
}
1400+
13381401
} // namespace pulsar

0 commit comments

Comments
 (0)