From 30b5322637fd9c81e040812ee5253ee4016bda72 Mon Sep 17 00:00:00 2001 From: Kristian Larsson Date: Thu, 21 May 2026 22:06:52 +0200 Subject: [PATCH] Add RTS worker sync pause Introduce a cooperative sync-pause primitive for RTS worker threads. A worker can request a pause, wake its peers, wait until they park at worker-loop pause checks, and release them when the synchronized operation completes. Pause requests fail before the worker pool is running or while another pause is active. Active pause waits time out briefly and re-check rts_exit, so graceful shutdown cannot leave the owner or parked workers asleep on the pause condition. --- base/rts/common.h | 3 + base/rts/rts.c | 134 +++++++++++++++++- docs/acton-dev-guide/src/SUMMARY.md | 1 + docs/acton-dev-guide/src/runtime/index.md | 5 + .../acton-dev-guide/src/runtime/sync_pause.md | 56 ++++++++ 5 files changed, 196 insertions(+), 3 deletions(-) create mode 100644 docs/acton-dev-guide/src/runtime/sync_pause.md diff --git a/base/rts/common.h b/base/rts/common.h index d15e8cda4..6ca8f9065 100644 --- a/base/rts/common.h +++ b/base/rts/common.h @@ -35,6 +35,9 @@ void acton_free(void* ptr); char *acton_strdup(const char *s); char *acton_strndup(const char *s, size_t n); +int acton_sync_pause_begin(void); +void acton_sync_pause_end(void); + void *acton_gc_malloc(size_t size); void *acton_gc_malloc_atomic(size_t size); void *acton_gc_realloc(void* ptr, size_t size); diff --git a/base/rts/rts.c b/base/rts/rts.c index 899f7eb0d..5bee45dae 100644 --- a/base/rts/rts.c +++ b/base/rts/rts.c @@ -234,6 +234,132 @@ void pin_actor_affinity() { } void set_actor_affinity(int wthread_id) { } #endif // ACTON_THREADS +#ifdef ACTON_THREADS +static pthread_mutex_t sync_pause_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t sync_pause_cond = PTHREAD_COND_INITIALIZER; +static int sync_pause_requested = 0; +static int sync_pause_owner = -1; +static int sync_pause_parked_count = 0; +static int sync_pause_workers_are_started = 0; +static int sync_pause_parked[MAX_WTHREADS]; + +static void sync_pause_clear_parked(void) { + for (int i = 0; i <= num_wthreads; i++) { + sync_pause_parked[i] = 0; + } +} + +static void wake_all_wt(void) { + for (int i = 0; i <= num_wthreads; i++) { + uv_async_send(&wake_ev[i]); + } +} + +static void sync_pause_wait(void) { + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + // Use a bounded wait so pause owners and parked workers re-check rts_exit + // even if shutdown starts without a matching condition broadcast. + ts.tv_nsec += 10 * 1000 * 1000; + if (ts.tv_nsec >= 1000 * 1000 * 1000) { + ts.tv_sec++; + ts.tv_nsec -= 1000 * 1000 * 1000; + } + pthread_cond_timedwait(&sync_pause_cond, &sync_pause_lock, &ts); +} + +int acton_sync_pause_begin(void) { + WorkerCtx wctx = GET_WCTX(); + if (wctx == NULL || wctx->id < 0 || wctx->id > num_wthreads) { + return -1; + } + int owner = (int)wctx->id; + + pthread_mutex_lock(&sync_pause_lock); + if (!sync_pause_workers_are_started || sync_pause_requested) { + pthread_mutex_unlock(&sync_pause_lock); + return -1; + } + if (rts_exit) { + pthread_mutex_unlock(&sync_pause_lock); + return -1; + } + + sync_pause_requested = 1; + sync_pause_owner = owner; + sync_pause_parked_count = 0; + sync_pause_clear_parked(); + + wake_all_wt(); + while (sync_pause_parked_count < num_wthreads && !rts_exit) { + sync_pause_wait(); + } + if (rts_exit) { + sync_pause_requested = 0; + sync_pause_owner = -1; + sync_pause_parked_count = 0; + pthread_cond_broadcast(&sync_pause_cond); + pthread_mutex_unlock(&sync_pause_lock); + return -1; + } + + pthread_mutex_unlock(&sync_pause_lock); + return 0; +} + +void acton_sync_pause_end(void) { + WorkerCtx wctx = GET_WCTX(); + if (wctx == NULL || wctx->id < 0 || wctx->id > num_wthreads) { + return; + } + int owner = (int)wctx->id; + + pthread_mutex_lock(&sync_pause_lock); + if (!sync_pause_requested || sync_pause_owner != owner) { + pthread_mutex_unlock(&sync_pause_lock); + return; + } + + sync_pause_requested = 0; + sync_pause_owner = -1; + sync_pause_parked_count = 0; + pthread_cond_broadcast(&sync_pause_cond); + pthread_mutex_unlock(&sync_pause_lock); +} + +// Called from the worker loop between actor continuations. If a sync pause is +// active, non-owner workers park here while the owner runs the synchronized op. +static void maybe_sync_pause(void) { + WorkerCtx wctx = GET_WCTX(); + if (wctx == NULL || wctx->id < 0 || wctx->id >= MAX_WTHREADS) { + return; + } + int id = (int)wctx->id; + + pthread_mutex_lock(&sync_pause_lock); + while (sync_pause_requested && id != sync_pause_owner && !rts_exit) { + if (!sync_pause_parked[id]) { + sync_pause_parked[id] = 1; + sync_pause_parked_count++; + pthread_cond_broadcast(&sync_pause_cond); + } + sync_pause_wait(); + } + pthread_mutex_unlock(&sync_pause_lock); +} + +static void sync_pause_workers_started(void) { + pthread_mutex_lock(&sync_pause_lock); + sync_pause_workers_are_started = 1; + pthread_mutex_unlock(&sync_pause_lock); +} +#else +int acton_sync_pause_begin(void) { return 0; } +void acton_sync_pause_end(void) { } +static void maybe_sync_pause(void) { } +static void sync_pause_workers_started(void) { } +#endif + void wake_wt(int wtid) { // We are sometimes optimistically called, i.e. the caller sometimes does // not really know whether there is new work or not. We check and if there @@ -1533,6 +1659,7 @@ void wt_work_cb(uv_check_t *ev) { uv_clock_gettime(UV_CLOCK_MONOTONIC, &ts_start); while (true) { + maybe_sync_pause(); if (rts_exit) { return; } @@ -2605,9 +2732,9 @@ int main(int argc, char **argv) { } #ifdef ACTON_THREADS - if (num_wthreads > MAX_WTHREADS) { - fprintf(stderr, "ERROR: Maximum of %d worker threads supported.\n", MAX_WTHREADS); - fprintf(stderr, "HINT: Run this program with fewer worker threads: %s --rts-wthreads %d\n", argv[0], MAX_WTHREADS); + if (num_wthreads >= MAX_WTHREADS) { + fprintf(stderr, "ERROR: Maximum of %d worker threads supported.\n", MAX_WTHREADS - 1); + fprintf(stderr, "HINT: Run this program with fewer worker threads: %s --rts-wthreads %d\n", argv[0], MAX_WTHREADS - 1); exit(1); } // Determine number of worker threads, normally 1:1 per CPU thread / core @@ -2858,6 +2985,7 @@ int main(int argc, char **argv) { //pthread_setaffinity_np(threads[idx-1], sizeof(cpu_set), &cpu_set); } } + sync_pause_workers_started(); pthread_attr_destroy(&ss_attr); #endif diff --git a/docs/acton-dev-guide/src/SUMMARY.md b/docs/acton-dev-guide/src/SUMMARY.md index c724aa551..6b0568360 100644 --- a/docs/acton-dev-guide/src/SUMMARY.md +++ b/docs/acton-dev-guide/src/SUMMARY.md @@ -29,6 +29,7 @@ - [FFI hooks](builtins/ffi.md) - [Runtime](runtime/index.md) - [Scheduler](runtime/scheduler.md) + - [RTS sync pause](runtime/sync_pause.md) - [Actors](runtime/actors.md) - [Memory and GC](runtime/memory.md) - [Tooling](tooling/index.md) diff --git a/docs/acton-dev-guide/src/runtime/index.md b/docs/acton-dev-guide/src/runtime/index.md index ed05df624..5bfd4039e 100644 --- a/docs/acton-dev-guide/src/runtime/index.md +++ b/docs/acton-dev-guide/src/runtime/index.md @@ -2,3 +2,8 @@ This section covers the runtime system, scheduling model, and core services that the compiler targets. + +- [Scheduler](scheduler.md) +- [RTS sync pause](sync_pause.md) +- [Actors](actors.md) +- [Memory and GC](memory.md) diff --git a/docs/acton-dev-guide/src/runtime/sync_pause.md b/docs/acton-dev-guide/src/runtime/sync_pause.md new file mode 100644 index 000000000..8f3de49a1 --- /dev/null +++ b/docs/acton-dev-guide/src/runtime/sync_pause.md @@ -0,0 +1,56 @@ +# RTS Sync Pause + +The RTS sync pause is a cooperative stop-the-world primitive for worker +threads. It is intended for runtime operations that must publish process-wide +state while no other worker is executing Acton continuations, for example +dynamic library code reload. It is not a general blocking API for arbitrary +threads; callers must be RTS workers. + +The public C entry points are declared in `base/rts/common.h`: + +```c +int acton_sync_pause_begin(void); +void acton_sync_pause_end(void); +``` + +`acton_sync_pause_begin()` may only be called after the worker pool has started +and before shutdown. It returns `0` for the worker that owns the pause and `-1` +if the caller is not a worker, shutdown has started, or another pause is already +active. There is no queue of pause requests; callers that race with an active +pause fail and must decide at the higher layer whether to retry or report an +error. `acton_sync_pause_end()` releases the pause only when called by the +worker that owns it. + +## Protocol + +The pause owner records its worker id, clears the per-worker parked bitmap, and +wakes all worker-thread event loops with `wake_all_wt()`. The wake is just a +libuv poke; sending it to the owner's own loop is harmless because async +callbacks are not run inline and wake notifications may be coalesced. + +Workers run `maybe_sync_pause()` at the top of `wt_work_cb()` before dequeuing +the next actor continuation. A non-owner +worker that sees an active pause marks itself parked exactly once, increments +`sync_pause_parked_count`, signals the condition variable, and waits until the +owner releases the pause. The owner waits until `sync_pause_parked_count` +reaches `num_wthreads`, which is the number of workers other than the owner in +the `0..num_wthreads` worker-id range. + +The primitive is therefore cooperative. It does not interrupt a continuation +that is already running; the pause is established only once every other worker +has returned to the worker-loop pause check. + +## Shutdown + +Both the owner and parked workers use a short timed condition-variable wait +rather than an unbounded `pthread_cond_wait()`. This keeps the pause responsive +to `rts_exit` even if shutdown starts while a worker is waiting and no further +condition broadcast arrives. If shutdown is observed while the owner is waiting, +the owner clears the pause state, broadcasts to parked workers, and fails the +pause request. + +## Non-Threaded Builds + +When Acton is built without RTS threads, `acton_sync_pause_begin()` returns +success and `acton_sync_pause_end()` is a no-op. There are no peer workers to +park in that configuration.