2020#define LIB_FUTURE_H_
2121
2222#include < atomic>
23- #include < chrono>
23+ #include < condition_variable>
24+ #include < forward_list>
2425#include < functional>
25- #include < future>
26- #include < list>
2726#include < memory>
2827#include < mutex>
29- #include < thread>
30- #include < utility>
3128
3229namespace pulsar {
3330
@@ -38,71 +35,70 @@ class InternalState {
3835 using Pair = std::pair<Result, Type>;
3936 using Lock = std::unique_lock<std::mutex>;
4037
38+ enum Status : uint8_t
39+ {
40+ INITIAL ,
41+ COMPLETING ,
42+ COMPLETED
43+ };
44+
4145 // NOTE: Add the constructor explicitly just to be compatible with GCC 4.8
4246 InternalState () {}
4347
4448 void addListener (Listener listener) {
4549 Lock lock{mutex_};
46- listeners_.emplace_back (listener);
47- lock.unlock ();
48-
4950 if (completed ()) {
50- Type value;
51- Result result = get (value);
52- triggerListeners (result, value);
51+ auto result = result_;
52+ auto value = value_;
53+ lock.unlock ();
54+ listener (result, value);
55+ } else {
56+ tailListener_ = listeners_.emplace_after (tailListener_, std::move (listener));
5357 }
5458 }
5559
5660 bool complete (Result result, const Type &value) {
57- bool expected = false ;
58- if (!completed_ .compare_exchange_strong (expected, true )) {
61+ Status expected = Status:: INITIAL ;
62+ if (!status_ .compare_exchange_strong (expected, Status:: COMPLETING )) {
5963 return false ;
6064 }
61- triggerListeners (result, value);
62- promise_.set_value (std::make_pair (result, value));
63- return true ;
64- }
65-
66- bool completed () const noexcept { return completed_; }
6765
68- Result get (Type &result) {
69- const auto &pair = future_.get ();
70- result = pair.second ;
71- return pair.first ;
72- }
66+ // Ensure if another thread calls `addListener` at the same time, that thread can get the value by
67+ // `get` before the existing listeners are executed
68+ Lock lock{mutex_};
69+ result_ = result;
70+ value_ = value;
71+ status_ = COMPLETED ;
72+ cond_.notify_all ();
7373
74- // Only public for test
75- void triggerListeners (Result result, const Type &value) {
76- while (true ) {
77- Lock lock{mutex_};
78- if (listeners_.empty ()) {
79- return ;
74+ if (!listeners_.empty ()) {
75+ auto listeners = std::move (listeners_);
76+ lock.unlock ();
77+ for (auto &&listener : listeners) {
78+ listener (result, value);
8079 }
80+ }
8181
82- bool expected = false ;
83- if (!listenerRunning_.compare_exchange_strong (expected, true )) {
84- // There is another thread that polled a listener that is running, skip polling and release
85- // the lock. Here we wait for some time to avoid busy waiting.
86- std::this_thread::sleep_for (std::chrono::milliseconds (1 ));
87- continue ;
88- }
89- auto listener = std::move (listeners_.front ());
90- listeners_.pop_front ();
91- lock.unlock ();
82+ return true ;
83+ }
9284
93- listener (result, value);
94- listenerRunning_ = false ;
95- }
85+ bool completed () const noexcept { return status_.load () == COMPLETED ; }
86+
87+ Result get (Type &value) const {
88+ Lock lock{mutex_};
89+ cond_.wait (lock, [this ] { return completed (); });
90+ value = value_;
91+ return result_;
9692 }
9793
9894 private:
99- std::atomic_bool completed_{false };
100- std::promise<Pair> promise_;
101- std::shared_future<Pair> future_{promise_.get_future ()};
102-
103- std::list<Listener> listeners_;
10495 mutable std::mutex mutex_;
105- std::atomic_bool listenerRunning_{false };
96+ mutable std::condition_variable cond_;
97+ std::forward_list<Listener> listeners_;
98+ decltype (listeners_.before_begin()) tailListener_{listeners_.before_begin ()};
99+ Result result_;
100+ Type value_;
101+ std::atomic<Status> status_;
106102};
107103
108104template <typename Result, typename Type>
0 commit comments