Skip to content

Commit 7d81e3c

Browse files
committed
Replace FTP pool barrier dispatch with per-worker condvar dispatch
Mirror of Phase 5.3 on the FTP side. Replaces broadcast mark_barrier/ done_barrier dispatch in the persistent thread pool with per-worker (wake_mutex, wake_cond, wake_flag) and pool-level (done_mutex, done_cond, workers_done_count). Main thread wakes only [1, adaptive_workers) helpers and waits for that many done signals. Idle workers stay asleep. Together with the previous controller-state commit (5ef3a61) and the helper-extraction commit (12a3440), the FTP build now uses the same random-walk controller logic as the GIL build via the shared _PyGC_RandomWalkUpdate helper. The adaptive count actually takes effect on FTP now: _PyGC_GetParallelWorkers returns pool->adaptive_workers, so FTP collections scale per-collection like GIL. Component changes: - Include/internal/pycore_gc_barrier.h: add _PyGCBarrier_Resize. Safe only when no one is waiting; used by the FTP dispatch to size phase_barrier to adaptive_workers per dispatch (so internal multi- phase work like update_refs init/compute only waits for active participants). - _PyGCWorkerState: add wake_mutex, wake_cond, wake_flag. - _PyGCThreadPool: add done_mutex, done_cond, workers_done_count, dispatch_in_progress. Drop mark_barrier and done_barrier (no longer used for dispatch). Keep phase_barrier; it's now resized per dispatch. - _PyGC_FTDispatchAndWait: new helper (mirror of GIL _PyGC_DispatchAndWait). Resizes phase_barrier, resets done counter, wakes helpers, runs main's share (worker 0), waits for helper completions. Has reentrancy guard although FTP STW collections shouldn't ever be reentrant; the four call sites assert reentrant == 0. - thread_pool_worker_func: replace mark_barrier wait with per-worker condvar wait; replace done_barrier wait with done_mutex/cond signal. - thread_pool_do_work: removed the adaptive_workers early-return added in the previous commit; no longer needed because idle workers don't wake at all under condvar dispatch. - _PyGC_ThreadPoolInit: initialise the new sync primitives, per-worker condvars, drop mark/done barrier Init. Update error-path cleanup. - _PyGC_ThreadPoolFini: wake all helpers via per-worker condvars (so they see shutdown and exit), Fini per-worker mutex/cond and pool-level done_mutex/done_cond. Drop mark/done barrier Fini. - Four pool dispatch call sites (PropagateAliveWithPool, ParallelUpdateRefsWithPool, ParallelMarkHeapWithPool, ParallelScanHeapWithPool): replace mark_barrier + thread_pool_do_work(0) + done_barrier with _PyGC_FTDispatchAndWait(pool, adaptive_workers). The ScanHeap merge loop now iterates adaptive_workers (only active workers contribute results). - gcmodule.c get_parallel_config (FTP branch): expose both num_workers (configured max) and adaptive_workers (current); mirror of GIL build's config API. Previously was inadvertently returning adaptive_workers as num_workers via _PyGC_GetParallelWorkers, breaking the historical meaning of the num_workers key. - _PyGC_FTParallelGetStats: same num_workers/adaptive_workers split. - _PyGC_GetParallelWorkers: now returns pool->adaptive_workers (the earlier revert of this is now superseded by the dispatch refactor that makes it safe to return the adaptive count). - Loosen assertions in PropagateAliveWithPool and UpdateRefs/MarkHeap/ ScanHeap that previously required state->num_workers == pool->num_workers to allow state->num_workers <= pool->num_workers. Verified end-to-end: - FTP smoke test (20 collections, 4 workers, enable/disable cycle): OK - FTP parallel-GC suite: 177 tests pass, 7 skips - FTP full CPython suite: 481 OK, 0 failures - GIL full CPython suite: 479 OK, 0 failures (no regression) The barrier mark_barrier and done_barrier struct fields can be cleaned up in a follow-up (parallel to GIL Phase 5 cleanup that removed the same fields from _PyParallelGCState).
1 parent 9c77265 commit 7d81e3c

4 files changed

Lines changed: 239 additions & 100 deletions

File tree

Include/internal/pycore_gc_barrier.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,22 @@ _PyGCBarrier_Fini(_PyGCBarrier *barrier)
9797
_PyGC_MUTEX_FINI(&barrier->lock);
9898
}
9999

100+
// Re-size a barrier to a new capacity. Safe to call only when no thread
101+
// is currently inside _PyGCBarrier_Wait for this barrier (typically: at
102+
// the start of each dispatch, before any worker is woken). Used by the
103+
// FTP parallel GC to size the phase_barrier to adaptive_workers each
104+
// dispatch so only active workers participate.
105+
static inline void
106+
_PyGCBarrier_Resize(_PyGCBarrier *barrier, unsigned int new_capacity)
107+
{
108+
assert(new_capacity > 0);
109+
_PyGC_MUTEX_LOCK(&barrier->lock);
110+
barrier->capacity = new_capacity;
111+
barrier->num_left = new_capacity;
112+
// epoch unchanged; no one should be waiting
113+
_PyGC_MUTEX_UNLOCK(&barrier->lock);
114+
}
115+
100116
// Wait at barrier - blocks until all threads arrive
101117
static inline void
102118
_PyGCBarrier_Wait(_PyGCBarrier *barrier)

Include/internal/pycore_gc_ft_parallel.h

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,14 @@ typedef struct _PyGCWorkerState {
273273
// Per-phase atomic stats - copied from TLS after work completes
274274
_PyGCAtomicWorkerStats atomic_stats;
275275
#endif
276+
277+
// Per-worker condvar for targeted wakeup (adaptive worker count).
278+
// Mirrors the GIL parallel GC. Replaces the previous mark_barrier
279+
// broadcast dispatch: main thread sets wake_flag and signals wake_cond
280+
// for only the active workers; idle workers stay asleep.
281+
PyMUTEX_T wake_mutex;
282+
PyCOND_T wake_cond;
283+
int wake_flag; // 0=sleeping, 1=wake up
276284
} _PyGCWorkerState;
277285

278286
// Work descriptor - describes work for a single collection
@@ -328,18 +336,28 @@ typedef struct _PyGCThreadPool {
328336
// Current work descriptor - set by main thread before signalling workers
329337
_PyGCWorkDescriptor *current_work;
330338

331-
// Barrier synchronization (like GIL-based parallel GC)
332-
_PyGCBarrier mark_barrier; // Workers wait here for work
333-
_PyGCBarrier done_barrier; // All workers wait here when done
334-
_PyGCBarrier phase_barrier; // For multi-phase operations
339+
// Per-collection done signaling. Workers signal done_cond when they
340+
// finish; main waits on done_cond until workers_done_count reaches
341+
// the count of woken workers. Mirrors the GIL parallel GC.
342+
PyMUTEX_T done_mutex;
343+
PyCOND_T done_cond;
344+
volatile int workers_done_count; // Protected by done_mutex
345+
346+
// Reentrancy guard for the dispatch path.
347+
int dispatch_in_progress;
348+
349+
// Phase barrier — used INSIDE work functions (e.g. update_refs has
350+
// init/compute phases). Resized to the active worker count at the
351+
// start of each dispatch so only active workers participate.
352+
_PyGCBarrier phase_barrier;
335353

336354
// Worker control
337355
int shutdown; // 1 = pool is shutting down (use atomics)
338356

339357
// Adaptive worker count — see Include/internal/pycore_gc_random_walk.h.
340358
// Same controller as the GIL parallel GC for identical behaviour.
341359
// adaptive_workers ∈ [2, num_workers]; workers with worker_id >=
342-
// adaptive_workers no-op for this collection.
360+
// adaptive_workers do not wake this dispatch.
343361
size_t adaptive_workers;
344362
double prev_cost_per_obj_ns;
345363
uint32_t explore_rng;
@@ -529,29 +547,28 @@ struct _PyGCScanHeapResult {
529547
};
530548

531549
// Get number of parallel GC workers for the next collection.
532-
// Returns 0 if parallel GC is disabled, otherwise the worker count.
550+
// Returns 0 if parallel GC is disabled, otherwise the adaptive worker
551+
// count chosen by the random-walk controller (see pycore_gc_random_walk.h).
533552
//
534-
// Currently returns pool->num_workers (the configured maximum), NOT the
535-
// adaptive count. The pool dispatch path
536-
// (_PyGC_ParallelPropagateAliveWithPool and similar) asserts that the
537-
// passed num_workers matches pool->num_workers and that the internal
538-
// phase_barrier (sized num_workers) receives that many participants.
539-
// Returning adaptive_workers here would break both invariants.
553+
// Pool dispatch (_PyGC_FTDispatchAndWait) wakes only [0, adaptive_workers)
554+
// of the pool's helpers via per-worker condvars; idle workers stay asleep
555+
// and don't enter the work functions. The internal phase_barrier is resized
556+
// to adaptive_workers per dispatch so it only waits for active participants.
540557
//
541-
// Plumbing the adaptive count through requires the pool dispatch refactor:
542-
// per-worker condvars instead of mark_barrier broadcast, dynamically-sized
543-
// (or per-dispatch) phase_barriers. Once that lands, change the body to:
544-
// _PyGCThreadPool *pool = gc->thread_pool;
545-
// return pool ? (int)pool->adaptive_workers : gc->parallel_gc_num_workers;
546-
// and FTP collections will scale per-collection like the GIL build does.
558+
// Falls back to configured num_workers only if the pool isn't initialised
559+
// yet (e.g. enable_parallel hasn't completed). Once pool exists, the
560+
// adaptive count owned by the pool is authoritative.
547561
static inline int
548562
_PyGC_GetParallelWorkers(PyInterpreterState *interp)
549563
{
550564
struct _gc_runtime_state *gc = &interp->gc;
551565
if (!gc->parallel_gc_enabled) {
552566
return 0;
553567
}
554-
// num_workers is required and validated by gc.enable_parallel()
568+
_PyGCThreadPool *pool = gc->thread_pool;
569+
if (pool != NULL) {
570+
return (int)pool->adaptive_workers;
571+
}
555572
return gc->parallel_gc_num_workers;
556573
}
557574

Modules/gcmodule.c

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -730,8 +730,11 @@ gc_get_parallel_config_impl(PyObject *module)
730730
return NULL;
731731
}
732732

733+
// num_workers = the configured maximum (what enable_parallel was called
734+
// with). adaptive_workers = the current count chosen by the random-walk
735+
// controller; <= num_workers. Mirror of the GIL build's config API.
733736
int num_workers = interp->gc.parallel_gc_enabled ?
734-
_PyGC_GetParallelWorkers(interp) : 0;
737+
interp->gc.parallel_gc_num_workers : 0;
735738
PyObject *workers = PyLong_FromLong(num_workers);
736739
if (workers == NULL || PyDict_SetItemString(result, "num_workers", workers) < 0) {
737740
Py_XDECREF(workers);
@@ -740,6 +743,16 @@ gc_get_parallel_config_impl(PyObject *module)
740743
}
741744
Py_DECREF(workers);
742745

746+
if (interp->gc.parallel_gc_enabled && interp->gc.thread_pool != NULL) {
747+
PyObject *aw = PyLong_FromSize_t(interp->gc.thread_pool->adaptive_workers);
748+
if (aw == NULL || PyDict_SetItemString(result, "adaptive_workers", aw) < 0) {
749+
Py_XDECREF(aw);
750+
Py_DECREF(result);
751+
return NULL;
752+
}
753+
Py_DECREF(aw);
754+
}
755+
743756
// Parallel cleanup is available in FTP builds
744757
if (PyDict_SetItemString(result, "parallel_cleanup", Py_True) < 0) {
745758
Py_DECREF(result);

0 commit comments

Comments
 (0)