Skip to content

Commit fe2bf5a

Browse files
author
Kristian Larsson
committed
Add RTS worker sync pause
Introduce a cooperative sync-pause primitive for RTS worker threads. A worker can request a pause, wake its peers, wait until they park at worker-loop safepoints, and release them when the synchronized operation completes. Pause requests fail before the worker pool is running or while another pause is active. Active pause waits time out briefly and re-check rts_exit, so graceful shutdown cannot leave the owner or parked workers asleep on the pause condition.
1 parent 836cce0 commit fe2bf5a

2 files changed

Lines changed: 141 additions & 3 deletions

File tree

base/rts/common.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ void acton_free(void* ptr);
3535
char *acton_strdup(const char *s);
3636
char *acton_strndup(const char *s, size_t n);
3737

38+
int acton_sync_pause_begin(void);
39+
void acton_sync_pause_end(void);
40+
3841
void *acton_gc_malloc(size_t size);
3942
void *acton_gc_malloc_atomic(size_t size);
4043
void *acton_gc_realloc(void* ptr, size_t size);

base/rts/rts.c

Lines changed: 138 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,139 @@ void pin_actor_affinity() { }
234234
void set_actor_affinity(int wthread_id) { }
235235
#endif // ACTON_THREADS
236236

237+
#ifdef ACTON_THREADS
238+
static pthread_mutex_t sync_pause_lock = PTHREAD_MUTEX_INITIALIZER;
239+
static pthread_cond_t sync_pause_cond = PTHREAD_COND_INITIALIZER;
240+
static int sync_pause_requested = 0;
241+
static int sync_pause_owner = -1;
242+
static int sync_pause_parked_count = 0;
243+
static int sync_pause_workers_are_started = 0;
244+
static int sync_pause_parked[MAX_WTHREADS];
245+
246+
static int sync_pause_current_worker_id(void) {
247+
WorkerCtx wctx = (WorkerCtx)pthread_getspecific(pkey_wctx);
248+
if (wctx == NULL) {
249+
return -1;
250+
}
251+
return (int)wctx->id;
252+
}
253+
254+
static int sync_pause_target_count(void) {
255+
return (int)num_wthreads;
256+
}
257+
258+
static void sync_pause_clear_parked(void) {
259+
for (int i = 0; i <= num_wthreads; i++) {
260+
sync_pause_parked[i] = 0;
261+
}
262+
}
263+
264+
static void sync_pause_wake_workers(void) {
265+
for (int i = 0; i <= num_wthreads; i++) {
266+
if (i != sync_pause_owner) {
267+
uv_async_send(&wake_ev[i]);
268+
}
269+
}
270+
}
271+
272+
static void sync_pause_wait(void) {
273+
struct timespec ts;
274+
clock_gettime(CLOCK_REALTIME, &ts);
275+
ts.tv_nsec += 10 * 1000 * 1000;
276+
if (ts.tv_nsec >= 1000 * 1000 * 1000) {
277+
ts.tv_sec++;
278+
ts.tv_nsec -= 1000 * 1000 * 1000;
279+
}
280+
pthread_cond_timedwait(&sync_pause_cond, &sync_pause_lock, &ts);
281+
}
282+
283+
int acton_sync_pause_begin(void) {
284+
int owner = sync_pause_current_worker_id();
285+
if (owner < 0 || owner > num_wthreads) {
286+
return -1;
287+
}
288+
289+
pthread_mutex_lock(&sync_pause_lock);
290+
if (!sync_pause_workers_are_started || sync_pause_requested) {
291+
pthread_mutex_unlock(&sync_pause_lock);
292+
return -1;
293+
}
294+
if (rts_exit) {
295+
pthread_mutex_unlock(&sync_pause_lock);
296+
return -1;
297+
}
298+
299+
sync_pause_requested = 1;
300+
sync_pause_owner = owner;
301+
sync_pause_parked_count = 0;
302+
sync_pause_clear_parked();
303+
304+
sync_pause_wake_workers();
305+
while (sync_pause_parked_count < sync_pause_target_count() && !rts_exit) {
306+
sync_pause_wait();
307+
}
308+
if (rts_exit) {
309+
sync_pause_requested = 0;
310+
sync_pause_owner = -1;
311+
sync_pause_parked_count = 0;
312+
pthread_cond_broadcast(&sync_pause_cond);
313+
pthread_mutex_unlock(&sync_pause_lock);
314+
return -1;
315+
}
316+
317+
pthread_mutex_unlock(&sync_pause_lock);
318+
return 0;
319+
}
320+
321+
void acton_sync_pause_end(void) {
322+
int owner = sync_pause_current_worker_id();
323+
if (owner < 0 || owner > num_wthreads) {
324+
return;
325+
}
326+
327+
pthread_mutex_lock(&sync_pause_lock);
328+
if (!sync_pause_requested || sync_pause_owner != owner) {
329+
pthread_mutex_unlock(&sync_pause_lock);
330+
return;
331+
}
332+
333+
sync_pause_requested = 0;
334+
sync_pause_owner = -1;
335+
sync_pause_parked_count = 0;
336+
pthread_cond_broadcast(&sync_pause_cond);
337+
pthread_mutex_unlock(&sync_pause_lock);
338+
}
339+
340+
static void sync_pause_safepoint(void) {
341+
int id = sync_pause_current_worker_id();
342+
if (id < 0 || id >= MAX_WTHREADS) {
343+
return;
344+
}
345+
346+
pthread_mutex_lock(&sync_pause_lock);
347+
while (sync_pause_requested && id != sync_pause_owner && !rts_exit) {
348+
if (!sync_pause_parked[id]) {
349+
sync_pause_parked[id] = 1;
350+
sync_pause_parked_count++;
351+
pthread_cond_broadcast(&sync_pause_cond);
352+
}
353+
sync_pause_wait();
354+
}
355+
pthread_mutex_unlock(&sync_pause_lock);
356+
}
357+
358+
static void sync_pause_workers_started(void) {
359+
pthread_mutex_lock(&sync_pause_lock);
360+
sync_pause_workers_are_started = 1;
361+
pthread_mutex_unlock(&sync_pause_lock);
362+
}
363+
#else
364+
int acton_sync_pause_begin(void) { return 0; }
365+
void acton_sync_pause_end(void) { }
366+
static void sync_pause_safepoint(void) { }
367+
static void sync_pause_workers_started(void) { }
368+
#endif
369+
237370
void wake_wt(int wtid) {
238371
// We are sometimes optimistically called, i.e. the caller sometimes does
239372
// not really know whether there is new work or not. We check and if there
@@ -1533,6 +1666,7 @@ void wt_work_cb(uv_check_t *ev) {
15331666

15341667
uv_clock_gettime(UV_CLOCK_MONOTONIC, &ts_start);
15351668
while (true) {
1669+
sync_pause_safepoint();
15361670
if (rts_exit) {
15371671
return;
15381672
}
@@ -2605,9 +2739,9 @@ int main(int argc, char **argv) {
26052739
}
26062740

26072741
#ifdef ACTON_THREADS
2608-
if (num_wthreads > MAX_WTHREADS) {
2609-
fprintf(stderr, "ERROR: Maximum of %d worker threads supported.\n", MAX_WTHREADS);
2610-
fprintf(stderr, "HINT: Run this program with fewer worker threads: %s --rts-wthreads %d\n", argv[0], MAX_WTHREADS);
2742+
if (num_wthreads >= MAX_WTHREADS) {
2743+
fprintf(stderr, "ERROR: Maximum of %d worker threads supported.\n", MAX_WTHREADS - 1);
2744+
fprintf(stderr, "HINT: Run this program with fewer worker threads: %s --rts-wthreads %d\n", argv[0], MAX_WTHREADS - 1);
26112745
exit(1);
26122746
}
26132747
// Determine number of worker threads, normally 1:1 per CPU thread / core
@@ -2858,6 +2992,7 @@ int main(int argc, char **argv) {
28582992
//pthread_setaffinity_np(threads[idx-1], sizeof(cpu_set), &cpu_set);
28592993
}
28602994
}
2995+
sync_pause_workers_started();
28612996

28622997
pthread_attr_destroy(&ss_attr);
28632998
#endif

0 commit comments

Comments
 (0)