Skip to content

Commit 814afb1

Browse files
committed
fix testSubscribeSeekRaces
1 parent 9d32866 commit 814afb1

3 files changed

Lines changed: 33 additions & 34 deletions

File tree

lib/ConsumerImpl.cc

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const ClientConnectionPtr& c
238238
cnx->registerConsumer(consumerId_, get_shared_this_ptr());
239239
LOG_DEBUG(cnx->cnxString() << "Registered consumer " << consumerId_);
240240

241-
if (duringSeek()) {
241+
if (hasPendingSeek_.load(std::memory_order_acquire)) {
242242
ackGroupingTrackerPtr_->flushAndClean();
243243
}
244244

@@ -269,6 +269,7 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const ClientConnectionPtr& c
269269
} else {
270270
promise.setFailed(handleResult);
271271
}
272+
completeSeekCallback(ResultOk);
272273
});
273274

274275
return promise.getFuture();
@@ -1129,19 +1130,14 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) {
11291130
* `startMessageId_` is updated so that we can discard messages after delivery restarts.
11301131
*/
11311132
void ConsumerImpl::clearReceiveQueue() {
1132-
if (duringSeek()) {
1133+
if (hasPendingSeek_.load(std::memory_order_acquire)) {
11331134
if (hasSoughtByTimestamp()) {
11341135
// Invalidate startMessageId_ so that isPriorBatchIndex and isPriorEntryIndex checks will be
11351136
// skipped, and hasMessageAvailableAsync won't use startMessageId_ in compare.
11361137
startMessageId_ = std::nullopt;
11371138
} else {
11381139
startMessageId_ = seekMessageId_.get();
11391140
}
1140-
SeekStatus expected = SeekStatus::COMPLETED;
1141-
if (seekStatus_.compare_exchange_strong(expected, SeekStatus::NOT_STARTED)) {
1142-
auto seekCallback = seekCallback_.release();
1143-
executor_->postWork([seekCallback] { seekCallback(ResultOk); });
1144-
}
11451141
return;
11461142
} else if (subscriptionMode_ == Commands::SubscriptionModeDurable) {
11471143
return;
@@ -1554,7 +1550,9 @@ void ConsumerImpl::seekAsync(const MessageId& msgId, const ResultCallback& callb
15541550
}
15551551

15561552
const auto requestId = newRequestId();
1557-
seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, msgId), SeekArg{msgId}, callback);
1553+
auto nonNullCallback = (callback != nullptr) ? callback : [](Result) {};
1554+
seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, msgId), SeekArg{msgId},
1555+
std::move(nonNullCallback));
15581556
}
15591557

15601558
void ConsumerImpl::seekAsync(uint64_t timestamp, const ResultCallback& callback) {
@@ -1568,8 +1566,9 @@ void ConsumerImpl::seekAsync(uint64_t timestamp, const ResultCallback& callback)
15681566
}
15691567

15701568
const auto requestId = newRequestId();
1569+
auto nonNullCallback = (callback != nullptr) ? callback : [](Result) {};
15711570
seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, timestamp), SeekArg{timestamp},
1572-
callback);
1571+
std::move(nonNullCallback));
15731572
}
15741573

15751574
bool ConsumerImpl::isReadCompacted() { return readCompacted_; }
@@ -1727,18 +1726,17 @@ bool ConsumerImpl::isConnected() const { return !getCnx().expired() && state_ ==
17271726
uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 1 : 0; }
17281727

17291728
void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, const SeekArg& seekArg,
1730-
const ResultCallback& callback) {
1729+
ResultCallback&& callback) {
17311730
ClientConnectionPtr cnx = getCnx().lock();
17321731
if (!cnx) {
17331732
LOG_ERROR(getName() << " Client Connection not ready for Consumer");
17341733
callback(ResultNotConnected);
17351734
return;
17361735
}
17371736

1738-
auto expected = SeekStatus::NOT_STARTED;
1739-
if (!seekStatus_.compare_exchange_strong(expected, SeekStatus::IN_PROGRESS)) {
1740-
LOG_ERROR(getName() << " attempted to seek " << seekArg << " when the status is "
1741-
<< static_cast<int>(expected));
1737+
auto expected = false;
1738+
if (hasPendingSeek_.compare_exchange_strong(expected, true)) {
1739+
LOG_ERROR(getName() << " attempted to seek " << seekArg << " when there is a pending seek");
17421740
callback(ResultNotAllowedError);
17431741
return;
17441742
}
@@ -1750,8 +1748,7 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c
17501748
seekMessageId_ = *boost::get<MessageId>(&seekArg);
17511749
hasSoughtByTimestamp_.store(false, std::memory_order_release);
17521750
}
1753-
seekStatus_ = SeekStatus::IN_PROGRESS;
1754-
seekCallback_ = callback;
1751+
seekCallback_ = std::move(callback);
17551752
LOG_INFO(getName() << " Seeking subscription to " << seekArg);
17561753

17571754
auto weakSelf = weak_from_this();
@@ -1771,20 +1768,17 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c
17711768
Lock lock(mutexForMessageId_);
17721769
lastDequedMessageId_ = MessageId::earliest();
17731770
lock.unlock();
1774-
if (getCnx().expired()) {
1775-
// It's during reconnection, complete the seek future after connection is established
1776-
seekStatus_ = SeekStatus::COMPLETED;
1777-
} else {
1771+
1772+
if (!getCnx().expired()) {
17781773
if (!hasSoughtByTimestamp()) {
17791774
startMessageId_ = seekMessageId_.get();
17801775
}
1781-
seekCallback_.release()(result);
1782-
}
1776+
completeSeekCallback(result);
1777+
} // else: complete the seek future after connection is established
17831778
} else {
17841779
LOG_ERROR(getName() << "Failed to seek: " << result);
17851780
seekMessageId_ = originalSeekMessageId;
1786-
seekStatus_ = SeekStatus::NOT_STARTED;
1787-
seekCallback_.release()(result);
1781+
completeSeekCallback(result);
17881782
}
17891783
});
17901784
}

lib/ConsumerImpl.h

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include <pulsar/Reader.h>
2323

24+
#include <atomic>
2425
#include <boost/variant.hpp>
2526
#include <cstdint>
2627
#include <functional>
@@ -77,13 +78,6 @@ const static std::string SYSTEM_PROPERTY_REAL_TOPIC = "REAL_TOPIC";
7778
const static std::string PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID";
7879
const static std::string DLQ_GROUP_TOPIC_SUFFIX = "-DLQ";
7980

80-
enum class SeekStatus : std::uint8_t
81-
{
82-
NOT_STARTED,
83-
IN_PROGRESS,
84-
COMPLETED
85-
};
86-
8781
class ConsumerImpl : public ConsumerImplBase {
8882
public:
8983
ConsumerImpl(const ClientImplPtr& client, const std::string& topic, const std::string& subscriptionName,
@@ -230,7 +224,13 @@ class ConsumerImpl : public ConsumerImplBase {
230224
}
231225

232226
void seekAsyncInternal(long requestId, const SharedBuffer& seek, const SeekArg& seekArg,
233-
const ResultCallback& callback);
227+
ResultCallback&& callback);
228+
void completeSeekCallback(Result result) {
229+
if (auto callback = seekCallback_.release()) {
230+
callback(result);
231+
}
232+
hasPendingSeek_.store(false, std::memory_order_release);
233+
}
234234
void processPossibleToDLQ(const MessageId& messageId, const ProcessDLQCallBack& cb);
235235

236236
std::mutex mutexForReceiveWithZeroQueueSize;
@@ -274,14 +274,13 @@ class ConsumerImpl : public ConsumerImplBase {
274274
MessageId lastDequedMessageId_{MessageId::earliest()};
275275
MessageId lastMessageIdInBroker_{MessageId::earliest()};
276276

277-
std::atomic<SeekStatus> seekStatus_{SeekStatus::NOT_STARTED};
278277
Synchronized<ResultCallback> seekCallback_{[](Result) {}};
279278
Synchronized<optional<MessageId>> startMessageId_;
279+
std::atomic_bool hasPendingSeek_{false};
280280
Synchronized<MessageId> seekMessageId_{MessageId::earliest()};
281281
std::atomic<bool> hasSoughtByTimestamp_{false};
282282

283283
bool hasSoughtByTimestamp() const { return hasSoughtByTimestamp_.load(std::memory_order_acquire); }
284-
bool duringSeek() const { return seekStatus_ != SeekStatus::NOT_STARTED; }
285284

286285
class ChunkedMessageCtx {
287286
public:

lib/Synchronized.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ class Synchronized {
4141
return *this;
4242
}
4343

44+
Synchronized& operator=(T&& value) {
45+
std::lock_guard<std::mutex> lock(mutex_);
46+
value_ = value;
47+
return *this;
48+
}
49+
4450
private:
4551
T value_;
4652
mutable std::mutex mutex_;

0 commit comments

Comments
 (0)