@@ -235,8 +235,6 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const ClientConnectionPtr& c
235235 return promise.getFuture ();
236236 }
237237
238- // Register consumer so that we can handle other incomming commands (e.g. ACTIVE_CONSUMER_CHANGE) after
239- // sending the subscribe request.
240238 optional<MessageId> subscribeMessageId;
241239 {
242240 LockGuard lock{mutex_};
@@ -266,11 +264,11 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const ClientConnectionPtr& c
266264 cnx->sendRequestWithId (cmd, requestId, " SUBSCRIBE" )
267265 .addListener ([this , self, cnx, promise](Result result, const ResponseData& responseData) {
268266 Result handleResult = handleCreateConsumer (cnx, result);
269- if (handleResult == ResultOk) {
270- promise.setSuccess ();
271- } else {
267+ if (handleResult != ResultOk) {
272268 promise.setFailed (handleResult);
269+ return ;
273270 }
271+ promise.setSuccess ();
274272 // Complete the seek callback after completing `promise`, otherwise `reconnectionPending_` will
275273 // still be true when the seek operation is done.
276274 LockGuard lock{mutex_};
@@ -1821,6 +1819,7 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c
18211819 } else {
18221820 LOG_ERROR (getName () << " Failed to seek: " << result);
18231821 LockGuard lock{mutex_};
1822+ seekStatus_ = SeekStatus::NOT_STARTED ;
18241823 lastSeekArg_ = previousLastSeekArg;
18251824 executor_->postWork ([self, callback{std::exchange (seekCallback_, std::nullopt ).value ()}]() {
18261825 callback (ResultOk);
0 commit comments