2424
2525#include < algorithm>
2626#include < utility>
27+ #include < variant>
2728
2829#include " AckGroupingTracker.h"
2930#include " AckGroupingTrackerDisabled.h"
@@ -124,7 +125,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr& client, const std::string& topic
124125 negativeAcksTracker_(std::make_shared<NegativeAcksTracker>(client, *this , conf)),
125126 ackGroupingTrackerPtr_(newAckGroupingTracker(topic, conf, client)),
126127 readCompacted_(conf.isReadCompacted()),
127- startMessageId_(getStartMessageId(startMessageId, conf.isStartMessageIdInclusive())),
128+ startMessageId_(pulsar:: getStartMessageId(startMessageId, conf.isStartMessageIdInclusive())),
128129 maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()),
129130 autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()),
130131 expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs()),
@@ -237,25 +238,16 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const ClientConnectionPtr& c
237238 // Register consumer so that we can handle other incomming commands (e.g. ACTIVE_CONSUMER_CHANGE) after
238239 // sending the subscribe request.
239240 optional<MessageId> subscribeMessageId;
240- bool duringSeek = false ;
241241 {
242- std::lock_guard<std::mutex> lock ( mutex_) ;
242+ LockGuard lock{ mutex_} ;
243243 setCnx (cnx);
244244 cnx->registerConsumer (consumerId_, get_shared_this_ptr ());
245245 LOG_DEBUG (cnx->cnxString () << " Registered consumer " << consumerId_);
246246
247- {
248- std::lock_guard<std::mutex> lock (mutexForMessageId_);
249- clearReceiveQueue ();
250- subscribeMessageId = (subscriptionMode_ == Commands::SubscriptionModeNonDurable)
251- ? startMessageId_.get ()
252- : std::nullopt ;
253- }
254-
255- duringSeek = seekCallback_.has_value ();
256- }
257- if (duringSeek) {
258- ackGroupingTrackerPtr_->flushAndClean ();
247+ clearReceiveQueue ();
248+ subscribeMessageId =
249+ (subscriptionMode_ == Commands::SubscriptionModeNonDurable) ? startMessageId_ : std::nullopt ;
250+ lastDequedMessageId_ = MessageId::earliest ();
259251 }
260252
261253 unAckedMessageTrackerPtr_->clear ();
@@ -279,6 +271,15 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const ClientConnectionPtr& c
279271 } else {
280272 promise.setFailed (handleResult);
281273 }
274+ // Complete the seek callback after completing `promise`, otherwise `reconnectionPending_` will
275+ // still be true when the seek operation is done.
276+ LockGuard lock{mutex_};
277+ if (seekStatus_ == SeekStatus::COMPLETED ) {
278+ executor_->postWork ([seekCallback{std::exchange (seekCallback_, std::nullopt ).value ()}]() {
279+ seekCallback (ResultOk);
280+ });
281+ seekStatus_ = SeekStatus::NOT_STARTED ;
282+ }
282283 });
283284
284285 return promise.getFuture ();
@@ -516,9 +517,10 @@ optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& pay
516517
517518 auto & chunkedMsgCtx = it->second ;
518519 if (it == chunkedMessageCache_.end () || !chunkedMsgCtx.validateChunkId (chunkId)) {
519- auto startMessageId = startMessageId_.get ().value_or (MessageId::earliest ());
520- if (!config_.isStartMessageIdInclusive () && startMessageId.ledgerId () == messageId.ledgerId () &&
521- startMessageId.entryId () == messageId.entryId ()) {
520+ auto startMessageId = getStartMessageId ();
521+ if (!config_.isStartMessageIdInclusive () && startMessageId &&
522+ startMessageId->ledgerId () == messageId.ledgerId () &&
523+ startMessageId->entryId () == messageId.entryId ()) {
522524 // When the start message id is not inclusive, the last chunk of the previous chunked message will
523525 // be delivered, which is expected and we only need to filter it out.
524526 chunkedMessageCache_.remove (uuid);
@@ -635,17 +637,14 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
635637 words[i] = msg.ack_set (i);
636638 }
637639 BitSet ackSet{std::move (words)};
638- Lock lock (mutex_);
639640 numOfMessageReceived = receiveIndividualMessagesFromBatch (cnx, m, ackSet, msg.redelivery_count ());
640641 } else {
641642 // try convert key value data.
642643 m.impl_ ->convertPayloadToKeyValue (config_.getSchema ());
643644
644- const auto startMessageId = startMessageId_.get ();
645- if (isPersistent_ && startMessageId &&
646- m.getMessageId ().ledgerId () == startMessageId.value ().ledgerId () &&
647- m.getMessageId ().entryId () == startMessageId.value ().entryId () &&
648- isPriorEntryIndex (m.getMessageId ().entryId ())) {
645+ const auto startMessageId = getStartMessageId ();
646+ if (isPersistent_ && startMessageId && m.getMessageId ().ledgerId () == startMessageId->ledgerId () &&
647+ isPrior (m.getMessageId ().entryId (), startMessageId->entryId ())) {
649648 LOG_DEBUG (getName () << " Ignoring message from before the startMessageId: "
650649 << startMessageId.value ());
651650 return ;
@@ -767,7 +766,7 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
767766 auto batchSize = batchedMessage.impl_ ->metadata .num_messages_in_batch ();
768767 LOG_DEBUG (" Received Batch messages of size - " << batchSize
769768 << " -- msgId: " << batchedMessage.getMessageId ());
770- const auto startMessageId = startMessageId_. get ();
769+ const auto startMessageId = getStartMessageId ();
771770
772771 int skippedMessages = 0 ;
773772
@@ -797,9 +796,9 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
797796
798797 // If we are receiving a batch message, we need to discard messages that were prior
799798 // to the startMessageId
800- if (isPersistent_ && msgId.ledgerId () == startMessageId. value (). ledgerId () &&
801- msgId.entryId () == startMessageId. value (). entryId () &&
802- isPriorBatchIndex (msgId.batchIndex ())) {
799+ if (isPersistent_ && msgId.ledgerId () == startMessageId-> ledgerId () &&
800+ msgId.entryId () == startMessageId-> entryId () &&
801+ isPrior (msgId.batchIndex (), startMessageId-> batchIndex ())) {
803802 LOG_DEBUG (getName () << " Ignoring message from before the startMessageId"
804803 << msg.getMessageId ());
805804 ++skippedMessages;
@@ -939,7 +938,7 @@ void ConsumerImpl::internalListener() {
939938 trackMessage (msg.getMessageId ());
940939 try {
941940 consumerStatsBasePtr_->receivedMessage (msg, ResultOk);
942- lastDequedMessageId_ = msg.getMessageId ();
941+ setLastDequedMessageId ( msg.getMessageId () );
943942 Consumer consumer{get_shared_this_ptr ()};
944943 Message interceptMsg = interceptors_->beforeConsume (Consumer (shared_from_this ()), msg);
945944 messageListener_ (consumer, interceptMsg);
@@ -1112,10 +1111,7 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) {
11121111}
11131112
11141113void ConsumerImpl::messageProcessed (Message& msg, bool track) {
1115- Lock lock (mutexForMessageId_);
1116- lastDequedMessageId_ = msg.getMessageId ();
1117- lock.unlock ();
1118-
1114+ setLastDequedMessageId (msg.getMessageId ());
11191115 incomingMessagesSize_.fetch_sub (msg.getLength ());
11201116
11211117 ClientConnectionPtr currentCnx = getCnx ().lock ();
@@ -1137,19 +1133,18 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) {
11371133 * was
11381134 * not seen by the application
11391135 * `startMessageId_` is updated so that we can discard messages after delivery restarts.
1136+ * NOTE: `mutex_` must be locked before calling this method.
11401137 */
11411138void ConsumerImpl::clearReceiveQueue () {
1142- // NOTE: This method must be called with `mutex_` held for thread safety where
1143- if (seekCallback_.has_value ()) {
1144- executor_->postWork (
1145- [callback{std::exchange (seekCallback_, std::nullopt ).value ()}] { callback (ResultOk); });
1146-
1147- if (hasSoughtByTimestamp ()) {
1148- // Invalidate startMessageId_ so that isPriorBatchIndex and isPriorEntryIndex checks will be
1149- // skipped, and hasMessageAvailableAsync won't use startMessageId_ in compare.
1150- startMessageId_ = std::nullopt ;
1139+ if (seekStatus_ != SeekStatus::NOT_STARTED ) {
1140+ // Flush the pending ACKs in case newly arrived messages are filtered out by the previous pending ACKs
1141+ ackGroupingTrackerPtr_->flushAndClean ();
1142+ if (std::holds_alternative<MessageId>(lastSeekArg_)) {
1143+ startMessageId_ = std::get<MessageId>(lastSeekArg_);
11511144 } else {
1152- startMessageId_ = seekMessageId_.get ();
1145+ // Invalidate startMessageId_ so that `isPrior` checks will be skipped, and
1146+ // `hasMessageAvailableAsync` won't use `startMessageId_` in compare.
1147+ startMessageId_ = std::nullopt ;
11531148 }
11541149 return ;
11551150 } else if (subscriptionMode_ == Commands::SubscriptionModeDurable) {
@@ -1568,7 +1563,7 @@ void ConsumerImpl::seekAsync(const MessageId& msgId, const ResultCallback& callb
15681563 std::move (nonNullCallback));
15691564}
15701565
1571- void ConsumerImpl::seekAsync (uint64_t timestamp, const ResultCallback& callback) {
1566+ void ConsumerImpl::seekAsync (SeekTimestampType timestamp, const ResultCallback& callback) {
15721567 const auto state = state_.load ();
15731568 if (state == Closed || state == Closing) {
15741569 LOG_ERROR (getName () << " Client connection already closed." );
@@ -1593,16 +1588,16 @@ void ConsumerImpl::hasMessageAvailableAsync(const HasMessageAvailableCallback& c
15931588 }
15941589 bool compareMarkDeletePosition;
15951590 {
1596- std::lock_guard<std::mutex> lock{mutexForMessageId_ };
1591+ LockGuard lock{mutex_ };
15971592 compareMarkDeletePosition =
15981593 // there is no message received by consumer, so we cannot compare the last position with the last
15991594 // received position
16001595 lastDequedMessageId_ == MessageId::earliest () &&
16011596 // If the start message id is latest, we should seek to the actual last message first.
1602- (startMessageId_.get (). value_or (MessageId::earliest ()) == MessageId::latest () ||
1597+ (startMessageId_.value_or (MessageId::earliest ()) == MessageId::latest () ||
16031598 // If there is a previous seek operation by timestamp, the start message id will be incorrect, so
16041599 // we cannot compare the start position with the last position.
1605- hasSoughtByTimestamp ( ));
1600+ std::holds_alternative<SeekTimestampType>(lastSeekArg_ ));
16061601 }
16071602 if (compareMarkDeletePosition) {
16081603 auto self = get_shared_this_ptr ();
@@ -1623,7 +1618,12 @@ void ConsumerImpl::hasMessageAvailableAsync(const HasMessageAvailableCallback& c
16231618 callback (ResultOk, false );
16241619 }
16251620 };
1626- if (self->config_ .isStartMessageIdInclusive () && !self->hasSoughtByTimestamp ()) {
1621+ bool lastSeekIsByTimestamp = false ;
1622+ {
1623+ LockGuard lock{self->mutex_ };
1624+ lastSeekIsByTimestamp = std::holds_alternative<SeekTimestampType>(self->lastSeekArg_ );
1625+ }
1626+ if (self->config_ .isStartMessageIdInclusive () && !lastSeekIsByTimestamp) {
16271627 self->seekAsync (response.getLastMessageId (), [callback, handleResponse](Result result) {
16281628 if (result != ResultOk) {
16291629 callback (result, {});
@@ -1680,9 +1680,10 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, Time
16801680 .addListener ([this , self, callback](Result result, const GetLastMessageIdResponse& response) {
16811681 if (result == ResultOk) {
16821682 LOG_DEBUG (getName () << " getLastMessageId: " << response);
1683- Lock lock (mutexForMessageId_);
1684- lastMessageIdInBroker_ = response.getLastMessageId ();
1685- lock.unlock ();
1683+ {
1684+ LockGuard lock{mutex_};
1685+ lastMessageIdInBroker_ = response.getLastMessageId ();
1686+ }
16861687 } else {
16871688 LOG_ERROR (getName () << " Failed to getLastMessageId: " << result);
16881689 }
@@ -1747,51 +1748,57 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c
17471748 return ;
17481749 }
17491750 bool hasPendingSeek = false ;
1751+ // Save the previous last seek arg in case seek failed
1752+ decltype (lastSeekArg_) previousLastSeekArg;
17501753 {
17511754 std::lock_guard<std::mutex> lock (mutex_);
1752- if (seekCallback_. has_value () ) {
1755+ if (seekStatus_ != SeekStatus:: NOT_STARTED ) {
17531756 hasPendingSeek = true ;
17541757 } else {
1758+ if (seekCallback_.has_value ()) {
1759+ // This should never happen
1760+ LOG_ERROR (getName () << " Previous seek callback is not triggered unexpectedly" );
1761+ executor_->postWork ([callback{std::exchange (seekCallback_, std::nullopt ).value ()}] {
1762+ callback (ResultTimeout);
1763+ });
1764+ }
17551765 seekCallback_ = std::move (callback);
1766+ previousLastSeekArg = lastSeekArg_;
1767+ lastSeekArg_ = seekArg;
17561768 }
17571769 }
17581770 if (hasPendingSeek) {
1759- LOG_ERROR (getName () << " attempted to seek " << seekArg << " when there is a pending seek" );
1771+ std::visit (
1772+ [this ](auto && arg) {
1773+ LOG_ERROR (getName () << " Attempted to seek " << arg << " when there is a pending seek" );
1774+ },
1775+ seekArg);
17601776 callback (ResultNotAllowedError);
17611777 return ;
17621778 }
17631779
1764- const auto originalSeekMessageId = seekMessageId_.get ();
1765- if (boost::get<uint64_t >(&seekArg)) {
1766- hasSoughtByTimestamp_.store (true , std::memory_order_release);
1767- } else {
1768- seekMessageId_ = *boost::get<MessageId>(&seekArg);
1769- hasSoughtByTimestamp_.store (false , std::memory_order_release);
1770- }
1771- LOG_INFO (getName () << " Seeking subscription to " << seekArg);
1780+ std::visit ([this ](auto && arg) { LOG_INFO (getName () << " Seeking subscription to " << arg); }, seekArg);
17721781
17731782 auto weakSelf = weak_from_this ();
17741783
17751784 cnx->sendRequestWithId (seek, requestId, " SEEK" )
1776- .addListener ([this , weakSelf, originalSeekMessageId](Result result,
1777- const ResponseData& responseData) {
1785+ .addListener ([this , weakSelf, previousLastSeekArg](Result result, const ResponseData& responseData) {
17781786 auto self = weakSelf.lock ();
17791787 if (!self) {
17801788 return ;
17811789 }
17821790 if (result == ResultOk) {
1783- LOG_INFO (getName () << " Seek successfully" );
1784- ackGroupingTrackerPtr_->flushAndClean ();
1785- incomingMessages_.clear ();
1786- {
1787- std::lock_guard<std::mutex> lock (mutexForMessageId_);
1788- lastDequedMessageId_ = MessageId::earliest ();
1789- }
1790-
1791- std::lock_guard<std::mutex> lock (mutex_);
1792- if (!getCnx ().expired ()) {
1793- if (!hasSoughtByTimestamp ()) {
1794- startMessageId_ = seekMessageId_.get ();
1791+ LockGuard lock (mutex_);
1792+ if (getCnx ().expired () || reconnectionPending_) {
1793+ // It's during reconnection, complete the seek future after connection is established
1794+ seekStatus_ = SeekStatus::COMPLETED ;
1795+ LOG_INFO (getName () << " Delay the seek future until the reconnection is done" );
1796+ } else {
1797+ LOG_INFO (getName () << " Seek successfully" );
1798+ ackGroupingTrackerPtr_->flushAndClean ();
1799+ incomingMessages_.clear ();
1800+ if (std::holds_alternative<MessageId>(lastSeekArg_)) {
1801+ startMessageId_ = std::get<MessageId>(lastSeekArg_);
17951802 }
17961803 if (!seekCallback_.has_value ()) {
17971804 LOG_ERROR (getName () << " Seek callback is not set" );
@@ -1801,27 +1808,19 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c
18011808 [self, callback{std::exchange (seekCallback_, std::nullopt ).value ()}]() {
18021809 callback (ResultOk);
18031810 });
1811+ seekStatus_ = SeekStatus::NOT_STARTED ;
18041812 } // else: complete the seek future after connection is established
18051813 } else {
18061814 LOG_ERROR (getName () << " Failed to seek: " << result);
1807- seekMessageId_ = originalSeekMessageId;
1815+ LockGuard lock{mutex_};
1816+ lastSeekArg_ = previousLastSeekArg;
18081817 executor_->postWork ([self, callback{std::exchange (seekCallback_, std::nullopt ).value ()}]() {
18091818 callback (ResultOk);
18101819 });
18111820 }
18121821 });
18131822}
18141823
1815- bool ConsumerImpl::isPriorBatchIndex (int32_t idx) {
1816- return config_.isStartMessageIdInclusive () ? idx < startMessageId_.get ().value ().batchIndex ()
1817- : idx <= startMessageId_.get ().value ().batchIndex ();
1818- }
1819-
1820- bool ConsumerImpl::isPriorEntryIndex (int64_t idx) {
1821- return config_.isStartMessageIdInclusive () ? idx < startMessageId_.get ().value ().entryId ()
1822- : idx <= startMessageId_.get ().value ().entryId ();
1823- }
1824-
18251824bool ConsumerImpl::hasEnoughMessagesForBatchReceive () const {
18261825 if (batchReceivePolicy_.getMaxNumMessages () <= 0 && batchReceivePolicy_.getMaxNumBytes () <= 0 ) {
18271826 return false ;
0 commit comments