Skip to content

Commit 6d490fc

Browse files
maikelericniebler
andauthored
Make static thread pool sleep when empty (#1476)
Co-authored-by: Eric Niebler <eniebler@nvidia.com>
1 parent e0fa517 commit 6d490fc

1 file changed

Lines changed: 31 additions & 5 deletions

File tree

include/exec/static_thread_pool.hpp

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -608,6 +608,8 @@ namespace exec {
608608
void notify_one_sleeping();
609609
void set_stealing();
610610
void clear_stealing();
611+
void set_sleeping();
612+
void clear_sleeping();
611613

612614
bwos::lifo_queue<task_base*, numa_allocator<task_base*>> local_queue_;
613615
__intrusive_queue<&task_base::next> pending_queue_{};
@@ -624,7 +626,7 @@ namespace exec {
624626
void run(std::uint32_t index) noexcept;
625627
void join() noexcept;
626628

627-
alignas(64) std::atomic<std::uint32_t> numThiefs_{};
629+
alignas(64) std::atomic<std::uint32_t> numActive_{};
628630
alignas(64) remote_queue_list remotes_;
629631
std::uint32_t threadCount_;
630632
std::uint32_t maxSteals_{threadCount_ + 1};
@@ -695,6 +697,7 @@ namespace exec {
695697
threads_.reserve(threadCount);
696698

697699
try {
700+
numActive_.store(threadCount << 16, std::memory_order_relaxed);
698701
for (std::uint32_t i = 0; i < threadCount; ++i) {
699702
threads_.emplace_back([this, i] { run(i); });
700703
}
@@ -717,9 +720,9 @@ namespace exec {
717720
}
718721

719722
inline void static_thread_pool_::run(std::uint32_t threadIndex) noexcept {
723+
STDEXEC_ASSERT(threadIndex < threadCount_);
720724
// NOLINTNEXTLINE(bugprone-unused-return-value)
721725
numa_.bind_to_node(threadStates_[threadIndex]->numa_node());
722-
STDEXEC_ASSERT(threadIndex < threadCount_);
723726
while (true) {
724727
// Make a blocking call to de-queue a task if we don't already have one.
725728
auto [task, queueIndex] = threadStates_[threadIndex]->pop();
@@ -938,12 +941,30 @@ namespace exec {
938941
pending_queue_.prepend(std::move(tasks));
939942
}
940943

944+
inline void static_thread_pool_::thread_state::set_sleeping() {
945+
pool_->numActive_.fetch_sub(1 << 16, std::memory_order_relaxed);
946+
}
947+
948+
// wakeup a worker thread and maintain the invariant that we always one active thief as long as a potential victim is awake
949+
inline void static_thread_pool_::thread_state::clear_sleeping() {
950+
const std::uint32_t numActive = pool_->numActive_.fetch_add(1 << 16, std::memory_order_relaxed);
951+
if (numActive == 0) {
952+
notify_one_sleeping();
953+
}
954+
}
955+
941956
inline void static_thread_pool_::thread_state::set_stealing() {
942-
pool_->numThiefs_.fetch_add(1, std::memory_order_relaxed);
957+
const std::uint32_t diff = 1 - (1 << 16);
958+
pool_->numActive_.fetch_add(diff, std::memory_order_relaxed);
943959
}
944960

961+
// put a thief to sleep but maintain the invariant that we always have one active thief as long as a potential victim is awake
945962
inline void static_thread_pool_::thread_state::clear_stealing() {
946-
if (pool_->numThiefs_.fetch_sub(1, std::memory_order_relaxed) == 1) {
963+
constexpr std::uint32_t diff = 1 - (1 << 16);
964+
const std::uint32_t numActive = pool_->numActive_.fetch_sub(diff, std::memory_order_relaxed);
965+
const std::uint32_t numVictims = numActive >> 16;
966+
const std::uint32_t numThiefs = numActive & 0xffff;
967+
if (numThiefs == 1 && numVictims != 0) {
947968
notify_one_sleeping();
948969
}
949970
}
@@ -995,9 +1016,14 @@ namespace exec {
9951016
if (result.task) {
9961017
return result;
9971018
}
1019+
set_sleeping();
9981020
cv_.wait(lock);
1021+
lock.unlock();
1022+
clear_sleeping();
1023+
}
1024+
if (lock.owns_lock()) {
1025+
lock.unlock();
9991026
}
1000-
lock.unlock();
10011027
state_.store(state::running, std::memory_order_relaxed);
10021028
result = try_pop();
10031029
}

0 commit comments

Comments
 (0)