Skip to content

Commit 9d879bb

Browse files
committed
Handle TopicTerminated for producers to fail immediately rather than retrying and timing out.
1 parent 69afa1a commit 9d879bb

4 files changed

Lines changed: 9 additions & 2 deletions

File tree

lib/HandlerBase.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ void HandlerBase::handleDisconnection(Result result, const ClientConnectionPtr&
171171
case Closing:
172172
case Closed:
173173
case Producer_Fenced:
174+
case Terminated:
174175
case Failed:
175176
LOG_DEBUG(getName() << "Ignoring connection closed event since the handler is not used anymore");
176177
break;

lib/HandlerBase.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,8 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
138138
Failed, // Handler is failed, in Java client: HandlerState.State.Failed
139139
Producer_Fenced, // The producer has been fenced by the broker
140140
// in Java client: HandlerState.State.ProducerFenced
141+
Terminated, // The topic has been terminatedproducer has been fenced by the broker
142+
// in Java client: HandlerState.State.Terminated
141143
};
142144

143145
std::atomic<State> state_;

lib/ProducerImpl.cc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,8 +273,8 @@ Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result
273273
}
274274
}
275275

276-
if (result == ResultProducerFenced) {
277-
state_ = Producer_Fenced;
276+
if (result == ResultProducerFenced || result == ResultTopicTerminated) {
277+
state_ = result == ResultProducerFenced ? Producer_Fenced : Terminated;
278278
failPendingMessages(result, false);
279279
auto client = client_.lock();
280280
if (client) {
@@ -450,6 +450,9 @@ bool ProducerImpl::isValidProducerState(const SendCallback& callback) const {
450450
case HandlerBase::Producer_Fenced:
451451
callback(ResultProducerFenced, {});
452452
return false;
453+
case HandlerBase::Terminated:
454+
callback(ResultTopicTerminated, {});
455+
return false;
453456
case HandlerBase::NotStarted:
454457
case HandlerBase::Failed:
455458
default:

lib/ResultUtils.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ inline bool isResultRetryable(Result result) {
3939
ResultInvalidConfiguration,
4040
ResultIncompatibleSchema,
4141
ResultTopicNotFound,
42+
ResultTopicTerminated,
4243
ResultOperationNotSupported,
4344
ResultNotAllowedError,
4445
ResultChecksumError,

0 commit comments

Comments
 (0)