Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions base/rts/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ void acton_free(void* ptr);
char *acton_strdup(const char *s);
char *acton_strndup(const char *s, size_t n);

int acton_sync_pause_begin(void);
void acton_sync_pause_end(void);

void *acton_gc_malloc(size_t size);
void *acton_gc_malloc_atomic(size_t size);
void *acton_gc_realloc(void* ptr, size_t size);
Expand Down
134 changes: 131 additions & 3 deletions base/rts/rts.c
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,132 @@ void pin_actor_affinity() { }
void set_actor_affinity(int wthread_id) { }
#endif // ACTON_THREADS

#ifdef ACTON_THREADS
static pthread_mutex_t sync_pause_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t sync_pause_cond = PTHREAD_COND_INITIALIZER;
static int sync_pause_requested = 0;
static int sync_pause_owner = -1;
static int sync_pause_parked_count = 0;
static int sync_pause_workers_are_started = 0;
static int sync_pause_parked[MAX_WTHREADS];

static void sync_pause_clear_parked(void) {
for (int i = 0; i <= num_wthreads; i++) {
sync_pause_parked[i] = 0;
}
}

static void wake_all_wt(void) {
for (int i = 0; i <= num_wthreads; i++) {
uv_async_send(&wake_ev[i]);
}
}

static void sync_pause_wait(void) {
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
// Use a bounded wait so pause owners and parked workers re-check rts_exit
// even if shutdown starts without a matching condition broadcast.
ts.tv_nsec += 10 * 1000 * 1000;
if (ts.tv_nsec >= 1000 * 1000 * 1000) {
ts.tv_sec++;
ts.tv_nsec -= 1000 * 1000 * 1000;
}
pthread_cond_timedwait(&sync_pause_cond, &sync_pause_lock, &ts);
}

int acton_sync_pause_begin(void) {
WorkerCtx wctx = GET_WCTX();
if (wctx == NULL || wctx->id < 0 || wctx->id > num_wthreads) {
return -1;
}
int owner = (int)wctx->id;

pthread_mutex_lock(&sync_pause_lock);
if (!sync_pause_workers_are_started || sync_pause_requested) {
pthread_mutex_unlock(&sync_pause_lock);
return -1;
}
if (rts_exit) {
pthread_mutex_unlock(&sync_pause_lock);
return -1;
}

sync_pause_requested = 1;
sync_pause_owner = owner;
sync_pause_parked_count = 0;
sync_pause_clear_parked();

wake_all_wt();
while (sync_pause_parked_count < num_wthreads && !rts_exit) {
sync_pause_wait();
}
if (rts_exit) {
sync_pause_requested = 0;
sync_pause_owner = -1;
sync_pause_parked_count = 0;
pthread_cond_broadcast(&sync_pause_cond);
pthread_mutex_unlock(&sync_pause_lock);
return -1;
}

pthread_mutex_unlock(&sync_pause_lock);
return 0;
}

void acton_sync_pause_end(void) {
WorkerCtx wctx = GET_WCTX();
if (wctx == NULL || wctx->id < 0 || wctx->id > num_wthreads) {
return;
}
int owner = (int)wctx->id;

pthread_mutex_lock(&sync_pause_lock);
if (!sync_pause_requested || sync_pause_owner != owner) {
pthread_mutex_unlock(&sync_pause_lock);
return;
}

sync_pause_requested = 0;
sync_pause_owner = -1;
sync_pause_parked_count = 0;
pthread_cond_broadcast(&sync_pause_cond);
pthread_mutex_unlock(&sync_pause_lock);
}

// Called from the worker loop between actor continuations. If a sync pause is
// active, non-owner workers park here while the owner runs the synchronized op.
static void maybe_sync_pause(void) {
WorkerCtx wctx = GET_WCTX();
if (wctx == NULL || wctx->id < 0 || wctx->id >= MAX_WTHREADS) {
return;
}
int id = (int)wctx->id;

pthread_mutex_lock(&sync_pause_lock);
while (sync_pause_requested && id != sync_pause_owner && !rts_exit) {
if (!sync_pause_parked[id]) {
sync_pause_parked[id] = 1;
sync_pause_parked_count++;
pthread_cond_broadcast(&sync_pause_cond);
}
sync_pause_wait();
}
pthread_mutex_unlock(&sync_pause_lock);
}

static void sync_pause_workers_started(void) {
pthread_mutex_lock(&sync_pause_lock);
sync_pause_workers_are_started = 1;
pthread_mutex_unlock(&sync_pause_lock);
}
#else
int acton_sync_pause_begin(void) { return 0; }
void acton_sync_pause_end(void) { }
static void maybe_sync_pause(void) { }
static void sync_pause_workers_started(void) { }
#endif

void wake_wt(int wtid) {
// We are sometimes optimistically called, i.e. the caller sometimes does
// not really know whether there is new work or not. We check and if there
Expand Down Expand Up @@ -1533,6 +1659,7 @@ void wt_work_cb(uv_check_t *ev) {

uv_clock_gettime(UV_CLOCK_MONOTONIC, &ts_start);
while (true) {
maybe_sync_pause();
if (rts_exit) {
return;
}
Expand Down Expand Up @@ -2605,9 +2732,9 @@ int main(int argc, char **argv) {
}

#ifdef ACTON_THREADS
if (num_wthreads > MAX_WTHREADS) {
fprintf(stderr, "ERROR: Maximum of %d worker threads supported.\n", MAX_WTHREADS);
fprintf(stderr, "HINT: Run this program with fewer worker threads: %s --rts-wthreads %d\n", argv[0], MAX_WTHREADS);
if (num_wthreads >= MAX_WTHREADS) {
fprintf(stderr, "ERROR: Maximum of %d worker threads supported.\n", MAX_WTHREADS - 1);
fprintf(stderr, "HINT: Run this program with fewer worker threads: %s --rts-wthreads %d\n", argv[0], MAX_WTHREADS - 1);
exit(1);
}
// Determine number of worker threads, normally 1:1 per CPU thread / core
Expand Down Expand Up @@ -2858,6 +2985,7 @@ int main(int argc, char **argv) {
//pthread_setaffinity_np(threads[idx-1], sizeof(cpu_set), &cpu_set);
}
}
sync_pause_workers_started();

pthread_attr_destroy(&ss_attr);
#endif
Expand Down
1 change: 1 addition & 0 deletions docs/acton-dev-guide/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- [FFI hooks](builtins/ffi.md)
- [Runtime](runtime/index.md)
- [Scheduler](runtime/scheduler.md)
- [RTS sync pause](runtime/sync_pause.md)
- [Actors](runtime/actors.md)
- [Memory and GC](runtime/memory.md)
- [Tooling](tooling/index.md)
Expand Down
5 changes: 5 additions & 0 deletions docs/acton-dev-guide/src/runtime/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,8 @@

This section covers the runtime system, scheduling model, and core services
that the compiler targets.

- [Scheduler](scheduler.md)
- [RTS sync pause](sync_pause.md)
- [Actors](actors.md)
- [Memory and GC](memory.md)
56 changes: 56 additions & 0 deletions docs/acton-dev-guide/src/runtime/sync_pause.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# RTS Sync Pause

The RTS sync pause is a cooperative stop-the-world primitive for worker
threads. It is intended for runtime operations that must publish process-wide
state while no other worker is executing Acton continuations, for example
dynamic library code reload. It is not a general blocking API for arbitrary
threads; callers must be RTS workers.

The public C entry points are declared in `base/rts/common.h`:

```c
int acton_sync_pause_begin(void);
void acton_sync_pause_end(void);
```

`acton_sync_pause_begin()` may only be called after the worker pool has started
and before shutdown. It returns `0` for the worker that owns the pause and `-1`
if the caller is not a worker, shutdown has started, or another pause is already
active. There is no queue of pause requests; callers that race with an active
pause fail and must decide at the higher layer whether to retry or report an
error. `acton_sync_pause_end()` releases the pause only when called by the
worker that owns it.

## Protocol

The pause owner records its worker id, clears the per-worker parked bitmap, and
wakes all worker-thread event loops with `wake_all_wt()`. The wake is just a
libuv poke; sending it to the owner's own loop is harmless because async
callbacks are not run inline and wake notifications may be coalesced.

Workers run `maybe_sync_pause()` at the top of `wt_work_cb()` before dequeuing
the next actor continuation. A non-owner
worker that sees an active pause marks itself parked exactly once, increments
`sync_pause_parked_count`, signals the condition variable, and waits until the
owner releases the pause. The owner waits until `sync_pause_parked_count`
reaches `num_wthreads`, which is the number of workers other than the owner in
the `0..num_wthreads` worker-id range.

The primitive is therefore cooperative. It does not interrupt a continuation
that is already running; the pause is established only once every other worker
has returned to the worker-loop pause check.

## Shutdown

Both the owner and parked workers use a short timed condition-variable wait
rather than an unbounded `pthread_cond_wait()`. This keeps the pause responsive
to `rts_exit` even if shutdown starts while a worker is waiting and no further
condition broadcast arrives. If shutdown is observed while the owner is waiting,
the owner clears the pause state, broadcasts to parked workers, and fails the
pause request.

## Non-Threaded Builds

When Acton is built without RTS threads, `acton_sync_pause_begin()` returns
success and `acton_sync_pause_end()` is a no-op. There are no peer workers to
park in that configuration.