Skip to content

Commit 6c2c9b4

Browse files
committed
fix
1 parent 277e8e8 commit 6c2c9b4

2 files changed

Lines changed: 31 additions & 5 deletions

File tree

lib/ConsumerImpl.cc

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -334,11 +334,11 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result
334334
return ResultAlreadyClosed;
335335
}
336336

337+
mutexLock.unlock();
338+
LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString());
337339
incomingMessages_.clear();
338340
possibleSendToDeadLetterTopicMessages_.clear();
339341
backoff_.reset();
340-
mutexLock.unlock();
341-
LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString());
342342
if (!messageListener_ && config_.getReceiverQueueSize() == 0) {
343343
// Complicated logic since we don't have a isLocked() function for mutex
344344
if (waitingForZeroQueueSizeMessage) {
@@ -1846,9 +1846,10 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c
18461846
LockGuard lock{mutex_};
18471847
seekStatus_ = SeekStatus::NOT_STARTED;
18481848
lastSeekArg_ = previousLastSeekArg;
1849-
executor_->postWork([self, callback{std::exchange(seekCallback_, std::nullopt).value()}]() {
1850-
callback(ResultOk);
1851-
});
1849+
executor_->postWork(
1850+
[self, callback{std::exchange(seekCallback_, std::nullopt).value()}, result]() {
1851+
callback(result);
1852+
});
18521853
}
18531854
});
18541855
}

tests/ConsumerSeekTest.cc

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,31 @@ TEST_F(ConsumerSeekTest, testReconnectionSlow) {
258258
client.close();
259259
}
260260

261+
TEST_F(ConsumerSeekTest, testSeekFailureIsPropagated) {
262+
using namespace std::chrono_literals;
263+
264+
Client client(lookupUrl, ClientConfiguration().setOperationTimeoutSeconds(1));
265+
Consumer consumer;
266+
ASSERT_EQ(ResultOk, client.subscribe("testSeekFailureIsPropagated", "sub", consumer));
267+
268+
auto connection = *PulsarFriend::getConnections(client).begin();
269+
auto mockServer = std::make_shared<MockServer>(connection);
270+
connection->attachMockServer(mockServer);
271+
mockServer->setRequestDelay({{"SEEK", 5000}});
272+
273+
std::promise<Result> promise;
274+
auto future = promise.get_future();
275+
consumer.seekAsync(MessageId::earliest(), [&promise](Result result) { promise.set_value(result); });
276+
277+
// Cancel the mocked SEEK success so request completes with timeout.
278+
ASSERT_GE(mockServer->close(), 1);
279+
280+
ASSERT_EQ(future.wait_for(5s), std::future_status::ready);
281+
ASSERT_EQ(future.get(), ResultTimeout);
282+
283+
client.close();
284+
}
285+
261286
INSTANTIATE_TEST_SUITE_P(Pulsar, ConsumerSeekTest, ::testing::Values(true, false));
262287

263288
} // namespace pulsar

0 commit comments

Comments
 (0)