Skip to content

Commit 8da5e33

Browse files
committed
Fix idle CPU usage using progressive backoff and CV based sleep in work-stealing loop
1 parent 39805c2 commit 8da5e33

1 file changed

Lines changed: 69 additions & 11 deletions

File tree

include/dbscan/pbbs/scheduler.h

Lines changed: 69 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,10 @@
4949
#include <array>
5050
#include <atomic>
5151
#include <chrono>
52+
#include <condition_variable>
5253
#include <iostream>
5354
#include <memory>
55+
#include <mutex>
5456
#include <stdexcept>
5557
#include <string>
5658
#include <thread>
@@ -159,7 +161,8 @@ struct scheduler {
159161
deques(num_deques),
160162
attempts(num_deques),
161163
spawned_threads(),
162-
finished_flag(false) {
164+
finished_flag(false),
165+
num_sleeping(0) {
163166
// Stopping condition
164167
auto finished = [this]() {
165168
return finished_flag.load(std::memory_order_relaxed);
@@ -177,29 +180,43 @@ struct scheduler {
177180

178181
~scheduler() {
179182
finished_flag.store(true, std::memory_order_relaxed);
183+
// Wake any sleeping threads so they see the finished flag
184+
wake_sleeping();
180185
for (unsigned int i = 1; i < num_threads; i++) {
181186
spawned_threads[i - 1].join();
182187
}
183188
}
184189

185-
// Push onto local stack.
190+
// Push onto local stack. Wakes sleeping threads only if any exist,
191+
// so this is zero-cost during active computation.
186192
void spawn(Job* job) {
187193
int id = worker_id();
188194
deques[id].push_bottom(job);
195+
if (num_sleeping.load(std::memory_order_relaxed) > 0)
196+
wake_sleeping();
189197
}
190198

191199
// Wait for condition: finished().
200+
// Does NOT enter Phase 3 (CV sleep) — instead spins/yields while
201+
// opportunistically executing other stolen work. This prevents deadlock
202+
// where a waiting thread sleeps on the CV but nobody notifies it.
192203
template <typename F>
193204
void wait(F finished, bool conservative = false) {
194-
// Conservative avoids deadlock if scheduler is used in conjunction
195-
// with user locks enclosing a wait.
196205
if (conservative) {
197206
while (!finished()) std::this_thread::yield();
207+
} else {
208+
while (!finished()) {
209+
Job* job = try_pop();
210+
if (!job) {
211+
size_t id = worker_id();
212+
job = try_steal(id);
213+
}
214+
if (job)
215+
(*job)();
216+
else
217+
std::this_thread::yield();
218+
}
198219
}
199-
// If not conservative, schedule within the wait.
200-
// Can deadlock if a stolen job uses same lock as encloses the wait.
201-
else
202-
start(finished);
203220
}
204221

205222
// All scheduler threads quit after this is called.
@@ -247,6 +264,23 @@ struct scheduler {
247264
std::vector<attempt> attempts;
248265
std::vector<std::thread> spawned_threads;
249266
std::atomic<bool> finished_flag;
267+
// Sleep/wake mechanism: threads entering Phase 3 sleep on the CV.
268+
// spawn() only signals when num_sleeping > 0, making it zero-cost
269+
// during active computation. wake_gen is bumped to break the CV wait.
270+
std::mutex sleep_mutex;
271+
std::condition_variable sleep_cv;
272+
std::atomic<int> num_sleeping;
273+
std::atomic<unsigned> wake_gen{0};
274+
275+
void wake_sleeping() {
276+
// Must lock to prevent race: a thread could be between incrementing
277+
// num_sleeping and entering wait(), missing our notify.
278+
{
279+
std::lock_guard<std::mutex> lk(sleep_mutex);
280+
wake_gen.fetch_add(1, std::memory_order_release);
281+
}
282+
sleep_cv.notify_all();
283+
}
250284

251285
// Start an individual scheduler task. Runs until finished().
252286
template <typename F>
@@ -266,21 +300,45 @@ struct scheduler {
266300
}
267301

268302
// Find a job, first trying local stack, then random steals.
303+
// Uses progressive backoff: spin aggressively at first (for latency when
304+
// work is about to appear), then yield, then sleep with increasing duration.
269305
template <typename F>
270306
Job* get_job(F finished) {
271307
if (finished()) return nullptr;
272308
Job* job = try_pop();
273309
if (job) return job;
274310
size_t id = worker_id();
275311
while (true) {
276-
// By coupon collector's problem, this should touch all.
312+
// Phase 1: aggressive spin — covers all deques several times
277313
for (int i = 0; i <= num_deques * 100; i++) {
278314
if (finished()) return nullptr;
279315
job = try_steal(id);
280316
if (job) return job;
281317
}
282-
// If haven't found anything, take a breather.
283-
std::this_thread::sleep_for(std::chrono::nanoseconds(num_deques * 100));
318+
// Phase 2: yield + short spins (transition period)
319+
for (int round = 0; round < 40; round++) {
320+
std::this_thread::yield();
321+
for (int i = 0; i <= num_deques * 20; i++) {
322+
if (finished()) return nullptr;
323+
job = try_steal(id);
324+
if (job) return job;
325+
}
326+
}
327+
// Phase 3: CV sleep (truly idle — no work found after phases 1+2).
328+
// Uses CV with short timeout so threads wake for both new work
329+
// (notified by spawn()) and job completions (checked via finished()).
330+
{
331+
std::unique_lock<std::mutex> lk(sleep_mutex);
332+
auto gen_before = wake_gen.load(std::memory_order_acquire);
333+
num_sleeping.fetch_add(1, std::memory_order_release);
334+
sleep_cv.wait(lk, [&]() {
335+
return finished() ||
336+
wake_gen.load(std::memory_order_acquire) != gen_before;
337+
});
338+
num_sleeping.fetch_sub(1, std::memory_order_release);
339+
}
340+
if (finished()) return nullptr;
341+
// After wake, loop back to Phase 1 (aggressive steal)
284342
}
285343
}
286344

0 commit comments

Comments
 (0)