Skip to content

Commit 0baf526

Browse files
committed
Add tests for reconnection ProducerBusy
1 parent bfb48a0 commit 0baf526

3 files changed

Lines changed: 24 additions & 1 deletion

File tree

lib/ProducerImpl.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -966,7 +966,7 @@ bool ProducerImpl::encryptMessage(proto::MessageMetadata& metadata, SharedBuffer
966966
}
967967

968968
void ProducerImpl::disconnectProducer() {
969-
LOG_DEBUG("Broker notification of Closed producer: " << producerId_);
969+
LOG_INFO("Broker notification of Closed producer: " << producerId_);
970970
resetCnx();
971971
scheduleReconnection();
972972
}

tests/ProducerTest.cc

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -618,4 +618,19 @@ TEST(ProducerTest, testNoDeadlockWhenClosingPartitionedProducerAfterPartitionsUp
618618
client.close();
619619
}
620620

621+
TEST(ProducerTest, testReconnectMultiConnectionsPerBroker) {
622+
ClientConfiguration conf;
623+
conf.setConnectionsPerBroker(10);
624+
625+
Client client(serviceUrl, conf);
626+
Producer producer;
627+
ASSERT_EQ(ResultOk, client.createProducer("producer-test-reconnect-twice", producer));
628+
629+
for (int i = 0; i < 5; i++) {
630+
ASSERT_TRUE(PulsarFriend::reconnect(producer)) << "i: " << i;
631+
}
632+
633+
client.close();
634+
}
635+
621636
INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));

tests/PulsarFriend.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include <string>
2323

24+
#include "WaitUtils.h"
2425
#include "lib/ClientConnection.h"
2526
#include "lib/ClientImpl.h"
2627
#include "lib/ConsumerConfigurationImpl.h"
@@ -197,6 +198,13 @@ class PulsarFriend {
197198
lookupData->setPartitions(newPartitions);
198199
partitionedProducer.handleGetPartitions(ResultOk, lookupData);
199200
}
201+
202+
static bool reconnect(Producer producer) {
203+
auto producerImpl = std::dynamic_pointer_cast<ProducerImpl>(producer.impl_);
204+
producerImpl->disconnectProducer();
205+
return waitUntil(std::chrono::seconds(3),
206+
[producerImpl] { return !producerImpl->getCnx().expired(); });
207+
}
200208
};
201209
} // namespace pulsar
202210

0 commit comments

Comments
 (0)