Skip to content

Commit e081a31

Browse files
committed
address comments
1 parent a825dcf commit e081a31

1 file changed

Lines changed: 6 additions & 3 deletions

File tree

lib/MultiTopicsConsumerImpl.cc

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -867,14 +867,17 @@ void MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(const BrokerConsumerSt
867867
std::lock_guard<std::mutex> lock{mutex_};
868868
statsPtr->add(stats, index);
869869
} else {
870-
failedResult->store(result, std::memory_order_release);
870+
// Store the first failed result as the final failed result
871+
auto expected = ResultOk;
872+
failedResult->compare_exchange_strong(expected, result);
871873
}
872874
if (--*latchPtr == 0) {
873-
if (failedResult->load(std::memory_order_acquire) == ResultOk) {
875+
if (auto firstFailedResult = failedResult->load(std::memory_order_acquire);
876+
firstFailedResult == ResultOk) {
874877
callback(ResultOk, BrokerConsumerStats{statsPtr});
875878
} else {
876879
// Fail the whole operation if any of the consumers failed
877-
callback(result, {});
880+
callback(firstFailedResult, {});
878881
}
879882
}
880883
});

0 commit comments

Comments
 (0)