diff --git a/base/rts/common.h b/base/rts/common.h index d15e8cda4..6ca8f9065 100644 --- a/base/rts/common.h +++ b/base/rts/common.h @@ -35,6 +35,9 @@ void acton_free(void* ptr); char *acton_strdup(const char *s); char *acton_strndup(const char *s, size_t n); +int acton_sync_pause_begin(void); +void acton_sync_pause_end(void); + void *acton_gc_malloc(size_t size); void *acton_gc_malloc_atomic(size_t size); void *acton_gc_realloc(void* ptr, size_t size); diff --git a/base/rts/rts.c b/base/rts/rts.c index 899f7eb0d..5bee45dae 100644 --- a/base/rts/rts.c +++ b/base/rts/rts.c @@ -234,6 +234,132 @@ void pin_actor_affinity() { } void set_actor_affinity(int wthread_id) { } #endif // ACTON_THREADS +#ifdef ACTON_THREADS +static pthread_mutex_t sync_pause_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t sync_pause_cond = PTHREAD_COND_INITIALIZER; +static int sync_pause_requested = 0; +static int sync_pause_owner = -1; +static int sync_pause_parked_count = 0; +static int sync_pause_workers_are_started = 0; +static int sync_pause_parked[MAX_WTHREADS]; + +static void sync_pause_clear_parked(void) { + for (int i = 0; i <= num_wthreads; i++) { + sync_pause_parked[i] = 0; + } +} + +static void wake_all_wt(void) { + for (int i = 0; i <= num_wthreads; i++) { + uv_async_send(&wake_ev[i]); + } +} + +static void sync_pause_wait(void) { + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + // Use a bounded wait so pause owners and parked workers re-check rts_exit + // even if shutdown starts without a matching condition broadcast. + ts.tv_nsec += 10 * 1000 * 1000; + if (ts.tv_nsec >= 1000 * 1000 * 1000) { + ts.tv_sec++; + ts.tv_nsec -= 1000 * 1000 * 1000; + } + pthread_cond_timedwait(&sync_pause_cond, &sync_pause_lock, &ts); +} + +int acton_sync_pause_begin(void) { + WorkerCtx wctx = GET_WCTX(); + if (wctx == NULL || wctx->id < 0 || wctx->id > num_wthreads) { + return -1; + } + int owner = (int)wctx->id; + + pthread_mutex_lock(&sync_pause_lock); + if (!sync_pause_workers_are_started || sync_pause_requested) { + pthread_mutex_unlock(&sync_pause_lock); + return -1; + } + if (rts_exit) { + pthread_mutex_unlock(&sync_pause_lock); + return -1; + } + + sync_pause_requested = 1; + sync_pause_owner = owner; + sync_pause_parked_count = 0; + sync_pause_clear_parked(); + + wake_all_wt(); + while (sync_pause_parked_count < num_wthreads && !rts_exit) { + sync_pause_wait(); + } + if (rts_exit) { + sync_pause_requested = 0; + sync_pause_owner = -1; + sync_pause_parked_count = 0; + pthread_cond_broadcast(&sync_pause_cond); + pthread_mutex_unlock(&sync_pause_lock); + return -1; + } + + pthread_mutex_unlock(&sync_pause_lock); + return 0; +} + +void acton_sync_pause_end(void) { + WorkerCtx wctx = GET_WCTX(); + if (wctx == NULL || wctx->id < 0 || wctx->id > num_wthreads) { + return; + } + int owner = (int)wctx->id; + + pthread_mutex_lock(&sync_pause_lock); + if (!sync_pause_requested || sync_pause_owner != owner) { + pthread_mutex_unlock(&sync_pause_lock); + return; + } + + sync_pause_requested = 0; + sync_pause_owner = -1; + sync_pause_parked_count = 0; + pthread_cond_broadcast(&sync_pause_cond); + pthread_mutex_unlock(&sync_pause_lock); +} + +// Called from the worker loop between actor continuations. If a sync pause is +// active, non-owner workers park here while the owner runs the synchronized op. +static void maybe_sync_pause(void) { + WorkerCtx wctx = GET_WCTX(); + if (wctx == NULL || wctx->id < 0 || wctx->id >= MAX_WTHREADS) { + return; + } + int id = (int)wctx->id; + + pthread_mutex_lock(&sync_pause_lock); + while (sync_pause_requested && id != sync_pause_owner && !rts_exit) { + if (!sync_pause_parked[id]) { + sync_pause_parked[id] = 1; + sync_pause_parked_count++; + pthread_cond_broadcast(&sync_pause_cond); + } + sync_pause_wait(); + } + pthread_mutex_unlock(&sync_pause_lock); +} + +static void sync_pause_workers_started(void) { + pthread_mutex_lock(&sync_pause_lock); + sync_pause_workers_are_started = 1; + pthread_mutex_unlock(&sync_pause_lock); +} +#else +int acton_sync_pause_begin(void) { return 0; } +void acton_sync_pause_end(void) { } +static void maybe_sync_pause(void) { } +static void sync_pause_workers_started(void) { } +#endif + void wake_wt(int wtid) { // We are sometimes optimistically called, i.e. the caller sometimes does // not really know whether there is new work or not. We check and if there @@ -1533,6 +1659,7 @@ void wt_work_cb(uv_check_t *ev) { uv_clock_gettime(UV_CLOCK_MONOTONIC, &ts_start); while (true) { + maybe_sync_pause(); if (rts_exit) { return; } @@ -2605,9 +2732,9 @@ int main(int argc, char **argv) { } #ifdef ACTON_THREADS - if (num_wthreads > MAX_WTHREADS) { - fprintf(stderr, "ERROR: Maximum of %d worker threads supported.\n", MAX_WTHREADS); - fprintf(stderr, "HINT: Run this program with fewer worker threads: %s --rts-wthreads %d\n", argv[0], MAX_WTHREADS); + if (num_wthreads >= MAX_WTHREADS) { + fprintf(stderr, "ERROR: Maximum of %d worker threads supported.\n", MAX_WTHREADS - 1); + fprintf(stderr, "HINT: Run this program with fewer worker threads: %s --rts-wthreads %d\n", argv[0], MAX_WTHREADS - 1); exit(1); } // Determine number of worker threads, normally 1:1 per CPU thread / core @@ -2858,6 +2985,7 @@ int main(int argc, char **argv) { //pthread_setaffinity_np(threads[idx-1], sizeof(cpu_set), &cpu_set); } } + sync_pause_workers_started(); pthread_attr_destroy(&ss_attr); #endif diff --git a/docs/acton-dev-guide/src/SUMMARY.md b/docs/acton-dev-guide/src/SUMMARY.md index c724aa551..6b0568360 100644 --- a/docs/acton-dev-guide/src/SUMMARY.md +++ b/docs/acton-dev-guide/src/SUMMARY.md @@ -29,6 +29,7 @@ - [FFI hooks](builtins/ffi.md) - [Runtime](runtime/index.md) - [Scheduler](runtime/scheduler.md) + - [RTS sync pause](runtime/sync_pause.md) - [Actors](runtime/actors.md) - [Memory and GC](runtime/memory.md) - [Tooling](tooling/index.md) diff --git a/docs/acton-dev-guide/src/runtime/index.md b/docs/acton-dev-guide/src/runtime/index.md index ed05df624..5bfd4039e 100644 --- a/docs/acton-dev-guide/src/runtime/index.md +++ b/docs/acton-dev-guide/src/runtime/index.md @@ -2,3 +2,8 @@ This section covers the runtime system, scheduling model, and core services that the compiler targets. + +- [Scheduler](scheduler.md) +- [RTS sync pause](sync_pause.md) +- [Actors](actors.md) +- [Memory and GC](memory.md) diff --git a/docs/acton-dev-guide/src/runtime/sync_pause.md b/docs/acton-dev-guide/src/runtime/sync_pause.md new file mode 100644 index 000000000..8f3de49a1 --- /dev/null +++ b/docs/acton-dev-guide/src/runtime/sync_pause.md @@ -0,0 +1,56 @@ +# RTS Sync Pause + +The RTS sync pause is a cooperative stop-the-world primitive for worker +threads. It is intended for runtime operations that must publish process-wide +state while no other worker is executing Acton continuations, for example +dynamic library code reload. It is not a general blocking API for arbitrary +threads; callers must be RTS workers. + +The public C entry points are declared in `base/rts/common.h`: + +```c +int acton_sync_pause_begin(void); +void acton_sync_pause_end(void); +``` + +`acton_sync_pause_begin()` may only be called after the worker pool has started +and before shutdown. It returns `0` for the worker that owns the pause and `-1` +if the caller is not a worker, shutdown has started, or another pause is already +active. There is no queue of pause requests; callers that race with an active +pause fail and must decide at the higher layer whether to retry or report an +error. `acton_sync_pause_end()` releases the pause only when called by the +worker that owns it. + +## Protocol + +The pause owner records its worker id, clears the per-worker parked bitmap, and +wakes all worker-thread event loops with `wake_all_wt()`. The wake is just a +libuv poke; sending it to the owner's own loop is harmless because async +callbacks are not run inline and wake notifications may be coalesced. + +Workers run `maybe_sync_pause()` at the top of `wt_work_cb()` before dequeuing +the next actor continuation. A non-owner +worker that sees an active pause marks itself parked exactly once, increments +`sync_pause_parked_count`, signals the condition variable, and waits until the +owner releases the pause. The owner waits until `sync_pause_parked_count` +reaches `num_wthreads`, which is the number of workers other than the owner in +the `0..num_wthreads` worker-id range. + +The primitive is therefore cooperative. It does not interrupt a continuation +that is already running; the pause is established only once every other worker +has returned to the worker-loop pause check. + +## Shutdown + +Both the owner and parked workers use a short timed condition-variable wait +rather than an unbounded `pthread_cond_wait()`. This keeps the pause responsive +to `rts_exit` even if shutdown starts while a worker is waiting and no further +condition broadcast arrives. If shutdown is observed while the owner is waiting, +the owner clears the pause state, broadcasts to parked workers, and fails the +pause request. + +## Non-Threaded Builds + +When Acton is built without RTS threads, `acton_sync_pause_begin()` returns +success and `acton_sync_pause_end()` is a no-op. There are no peer workers to +park in that configuration.