Skip to content

Commit 03661b0

Browse files
committed
Add regression tests for topic termination
1 parent 9d879bb commit 03661b0

2 files changed

Lines changed: 52 additions & 0 deletions

File tree

tests/ProducerTest.cc

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,53 @@ TEST(ProducerTest, testBacklogQuotasExceeded) {
215215
client.close();
216216
}
217217

218+
TEST(ProducerTest, testCreateProducerAfterTopicTermination) {
219+
const auto topicName =
220+
"testCreateProducerAfterTopicTermination-" + std::to_string(time(nullptr));
221+
const auto topic = "persistent://public/default/" + topicName;
222+
223+
Client client(serviceUrl, ClientConfiguration().setOperationTimeoutSeconds(1));
224+
225+
Producer producer;
226+
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
227+
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("content").build()));
228+
ASSERT_EQ(ResultOk, producer.close());
229+
230+
const auto httpCode =
231+
makePostRequest(adminUrl + "admin/v2/persistent/public/default/" + topicName + "/terminate", "");
232+
ASSERT_EQ(200, httpCode) << "httpCode: " << httpCode;
233+
234+
Producer terminatedProducer;
235+
ASSERT_EQ(ResultTopicTerminated, client.createProducer(topic, terminatedProducer));
236+
237+
client.close();
238+
}
239+
240+
TEST(ProducerTest, testSendAfterTopicTerminationReconnect) {
241+
const auto topicName =
242+
"testSendAfterTopicTerminationReconnect-" + std::to_string(time(nullptr));
243+
const auto topic = "persistent://public/default/" + topicName;
244+
245+
Client client(serviceUrl, ClientConfiguration().setOperationTimeoutSeconds(1));
246+
247+
Producer producer;
248+
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
249+
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("before-terminate").build()));
250+
251+
const auto httpCode =
252+
makePostRequest(adminUrl + "admin/v2/persistent/public/default/" + topicName + "/terminate", "");
253+
ASSERT_EQ(200, httpCode) << "httpCode: " << httpCode;
254+
255+
PulsarFriend::getProducerImpl(producer).disconnectProducer();
256+
ASSERT_TRUE(waitUntil(std::chrono::seconds(3),
257+
[&producer] { return PulsarFriend::isTerminated(producer); }));
258+
259+
ASSERT_EQ(ResultTopicTerminated,
260+
producer.send(MessageBuilder().setContent("after-terminate").build()));
261+
262+
client.close();
263+
}
264+
218265
class ProducerTest : public ::testing::TestWithParam<bool> {};
219266

220267
TEST_P(ProducerTest, testMaxMessageSize) {

tests/PulsarFriend.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,11 @@ class PulsarFriend {
257257
return waitUntil(std::chrono::seconds(3),
258258
[producerImpl] { return !producerImpl->getCnx().expired(); });
259259
}
260+
261+
static bool isTerminated(Producer producer) {
262+
auto producerImpl = std::dynamic_pointer_cast<ProducerImpl>(producer.impl_);
263+
return producerImpl && producerImpl->state_ == HandlerBase::Terminated;
264+
}
260265
};
261266
} // namespace pulsar
262267

0 commit comments

Comments
 (0)