Skip to content

Commit 0a9e017

Browse files
authored
[fix][client-cpp] Fail producers immediately when topic is terminated (#567)
1 parent f9995da commit 0a9e017

6 files changed

Lines changed: 58 additions & 2 deletions

File tree

lib/HandlerBase.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ void HandlerBase::handleDisconnection(Result result, const ClientConnectionPtr&
171171
case Closing:
172172
case Closed:
173173
case Producer_Fenced:
174+
case Terminated:
174175
case Failed:
175176
LOG_DEBUG(getName() << "Ignoring connection closed event since the handler is not used anymore");
176177
break;

lib/HandlerBase.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,8 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
138138
Failed, // Handler is failed, in Java client: HandlerState.State.Failed
139139
Producer_Fenced, // The producer has been fenced by the broker
140140
// in Java client: HandlerState.State.ProducerFenced
141+
Terminated, // The topic has been terminatedproducer has been fenced by the broker
142+
// in Java client: HandlerState.State.Terminated
141143
};
142144

143145
std::atomic<State> state_;

lib/ProducerImpl.cc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,8 +273,8 @@ Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result
273273
}
274274
}
275275

276-
if (result == ResultProducerFenced) {
277-
state_ = Producer_Fenced;
276+
if (result == ResultProducerFenced || result == ResultTopicTerminated) {
277+
state_ = result == ResultProducerFenced ? Producer_Fenced : Terminated;
278278
failPendingMessages(result, false);
279279
auto client = client_.lock();
280280
if (client) {
@@ -450,6 +450,9 @@ bool ProducerImpl::isValidProducerState(const SendCallback& callback) const {
450450
case HandlerBase::Producer_Fenced:
451451
callback(ResultProducerFenced, {});
452452
return false;
453+
case HandlerBase::Terminated:
454+
callback(ResultTopicTerminated, {});
455+
return false;
453456
case HandlerBase::NotStarted:
454457
case HandlerBase::Failed:
455458
default:

lib/ResultUtils.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ inline bool isResultRetryable(Result result) {
3939
ResultInvalidConfiguration,
4040
ResultIncompatibleSchema,
4141
ResultTopicNotFound,
42+
ResultTopicTerminated,
4243
ResultOperationNotSupported,
4344
ResultNotAllowedError,
4445
ResultChecksumError,

tests/ProducerTest.cc

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,50 @@ TEST(ProducerTest, testBacklogQuotasExceeded) {
215215
client.close();
216216
}
217217

218+
TEST(ProducerTest, testCreateProducerAfterTopicTermination) {
219+
const auto topicName = "testCreateProducerAfterTopicTermination-" + std::to_string(time(nullptr));
220+
const auto topic = "persistent://public/default/" + topicName;
221+
222+
Client client(serviceUrl, ClientConfiguration().setOperationTimeoutSeconds(1));
223+
224+
Producer producer;
225+
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
226+
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("content").build()));
227+
ASSERT_EQ(ResultOk, producer.close());
228+
229+
const auto httpCode =
230+
makePostRequest(adminUrl + "admin/v2/persistent/public/default/" + topicName + "/terminate", "");
231+
ASSERT_EQ(200, httpCode) << "httpCode: " << httpCode;
232+
233+
Producer terminatedProducer;
234+
ASSERT_EQ(ResultTopicTerminated, client.createProducer(topic, terminatedProducer));
235+
236+
client.close();
237+
}
238+
239+
TEST(ProducerTest, testSendAfterTopicTerminationReconnect) {
240+
const auto topicName = "testSendAfterTopicTerminationReconnect-" + std::to_string(time(nullptr));
241+
const auto topic = "persistent://public/default/" + topicName;
242+
243+
Client client(serviceUrl, ClientConfiguration().setOperationTimeoutSeconds(1));
244+
245+
Producer producer;
246+
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
247+
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("before-terminate").build()));
248+
249+
const auto httpCode =
250+
makePostRequest(adminUrl + "admin/v2/persistent/public/default/" + topicName + "/terminate", "");
251+
ASSERT_EQ(200, httpCode) << "httpCode: " << httpCode;
252+
253+
PulsarFriend::getProducerImpl(producer).disconnectProducer();
254+
ASSERT_TRUE(
255+
waitUntil(std::chrono::seconds(3), [&producer] { return PulsarFriend::isTerminated(producer); }));
256+
257+
ASSERT_EQ(ResultTopicTerminated, producer.send(MessageBuilder().setContent("after-terminate").build()));
258+
259+
client.close();
260+
}
261+
218262
class ProducerTest : public ::testing::TestWithParam<bool> {};
219263

220264
TEST_P(ProducerTest, testMaxMessageSize) {

tests/PulsarFriend.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,11 @@ class PulsarFriend {
257257
return waitUntil(std::chrono::seconds(3),
258258
[producerImpl] { return !producerImpl->getCnx().expired(); });
259259
}
260+
261+
static bool isTerminated(Producer producer) {
262+
auto producerImpl = std::dynamic_pointer_cast<ProducerImpl>(producer.impl_);
263+
return producerImpl && producerImpl->state_ == HandlerBase::Terminated;
264+
}
260265
};
261266
} // namespace pulsar
262267

0 commit comments

Comments
 (0)