@@ -617,6 +617,8 @@ namespace exec {
617617 void notify_one_sleeping ();
618618 void set_stealing ();
619619 void clear_stealing ();
620+ void set_sleeping ();
621+ void clear_sleeping ();
620622
621623 bwos::lifo_queue<task_base*, numa_allocator<task_base*>> local_queue_;
622624 __intrusive_queue<&task_base::next> pending_queue_{};
@@ -633,7 +635,7 @@ namespace exec {
633635 void run (std::uint32_t index) noexcept ;
634636 void join () noexcept ;
635637
636- alignas (64 ) std::atomic<std::uint32_t> numThiefs_ {};
638+ alignas (64 ) std::atomic<std::uint32_t> numActive_ {};
637639 alignas (64 ) remote_queue_list remotes_;
638640 std::uint32_t threadCount_;
639641 std::uint32_t maxSteals_{threadCount_ + 1 };
@@ -703,6 +705,7 @@ namespace exec {
703705 threads_.reserve (threadCount);
704706
705707 try {
708+ numActive_.store (threadCount << 16u , std::memory_order_relaxed);
706709 for (std::uint32_t i = 0 ; i < threadCount; ++i) {
707710 threads_.emplace_back ([this , i] { run (i); });
708711 }
@@ -725,9 +728,9 @@ namespace exec {
725728 }
726729
727730 inline void static_thread_pool_::run (std::uint32_t threadIndex) noexcept {
731+ STDEXEC_ASSERT (threadIndex < threadCount_);
728732 // NOLINTNEXTLINE(bugprone-unused-return-value)
729733 numa_.bind_to_node (threadStates_[threadIndex]->numa_node ());
730- STDEXEC_ASSERT (threadIndex < threadCount_);
731734 while (true ) {
732735 // Make a blocking call to de-queue a task if we don't already have one.
733736 auto [task, queueIndex] = threadStates_[threadIndex]->pop ();
@@ -946,12 +949,31 @@ namespace exec {
946949 pending_queue_.prepend (std::move (tasks));
947950 }
948951
952+ inline void static_thread_pool_::thread_state::set_sleeping () {
953+ pool_->numActive_ .fetch_sub (1u << 16u , std::memory_order_relaxed);
954+ }
955+
956+ // wakeup a worker thread and maintain the invariant that we always one active thief as long as a potential victim is awake
957+ inline void static_thread_pool_::thread_state::clear_sleeping () {
958+ const std::uint32_t numActive =
959+ pool_->numActive_ .fetch_add (1u << 16u , std::memory_order_relaxed);
960+ if (numActive == 0 ) {
961+ notify_one_sleeping ();
962+ }
963+ }
964+
949965 inline void static_thread_pool_::thread_state::set_stealing () {
950- pool_->numThiefs_ .fetch_add (1 , std::memory_order_relaxed);
966+ const std::uint32_t diff = 1u - (1u << 16u );
967+ pool_->numActive_ .fetch_add (diff, std::memory_order_relaxed);
951968 }
952969
970+ // put a thief to sleep but maintain the invariant that we always have one active thief as long as a potential victim is awake
953971 inline void static_thread_pool_::thread_state::clear_stealing () {
954- if (pool_->numThiefs_ .fetch_sub (1 , std::memory_order_relaxed) == 1 ) {
972+ constexpr std::uint32_t diff = 1 - (1u << 16u );
973+ const std::uint32_t numActive = pool_->numActive_ .fetch_sub (diff, std::memory_order_relaxed);
974+ const std::uint32_t numVictims = numActive >> 16u ;
975+ const std::uint32_t numThiefs = numActive & 0xffffu ;
976+ if (numThiefs == 1 && numVictims != 0 ) {
955977 notify_one_sleeping ();
956978 }
957979 }
@@ -1003,9 +1025,14 @@ namespace exec {
10031025 if (result.task ) {
10041026 return result;
10051027 }
1028+ set_sleeping ();
10061029 cv_.wait (lock);
1030+ lock.unlock ();
1031+ clear_sleeping ();
1032+ }
1033+ if (lock.owns_lock ()) {
1034+ lock.unlock ();
10071035 }
1008- lock.unlock ();
10091036 state_.store (state::running, std::memory_order_relaxed);
10101037 result = try_pop ();
10111038 }
0 commit comments