@@ -234,6 +234,132 @@ void pin_actor_affinity() { }
234234void set_actor_affinity (int wthread_id ) { }
235235#endif // ACTON_THREADS
236236
237+ #ifdef ACTON_THREADS
238+ static pthread_mutex_t sync_pause_lock = PTHREAD_MUTEX_INITIALIZER ;
239+ static pthread_cond_t sync_pause_cond = PTHREAD_COND_INITIALIZER ;
240+ static int sync_pause_requested = 0 ;
241+ static int sync_pause_owner = -1 ;
242+ static int sync_pause_parked_count = 0 ;
243+ static int sync_pause_workers_are_started = 0 ;
244+ static int sync_pause_parked [MAX_WTHREADS ];
245+
246+ static void sync_pause_clear_parked (void ) {
247+ for (int i = 0 ; i <= num_wthreads ; i ++ ) {
248+ sync_pause_parked [i ] = 0 ;
249+ }
250+ }
251+
252+ static void wake_all_wt (void ) {
253+ for (int i = 0 ; i <= num_wthreads ; i ++ ) {
254+ uv_async_send (& wake_ev [i ]);
255+ }
256+ }
257+
258+ static void sync_pause_wait (void ) {
259+ struct timespec ts ;
260+ clock_gettime (CLOCK_REALTIME , & ts );
261+ // Use a bounded wait so pause owners and parked workers re-check rts_exit
262+ // even if shutdown starts without a matching condition broadcast.
263+ ts .tv_nsec += 10 * 1000 * 1000 ;
264+ if (ts .tv_nsec >= 1000 * 1000 * 1000 ) {
265+ ts .tv_sec ++ ;
266+ ts .tv_nsec -= 1000 * 1000 * 1000 ;
267+ }
268+ pthread_cond_timedwait (& sync_pause_cond , & sync_pause_lock , & ts );
269+ }
270+
271+ int acton_sync_pause_begin (void ) {
272+ WorkerCtx wctx = GET_WCTX ();
273+ if (wctx == NULL || wctx -> id < 0 || wctx -> id > num_wthreads ) {
274+ return -1 ;
275+ }
276+ int owner = (int )wctx -> id ;
277+
278+ pthread_mutex_lock (& sync_pause_lock );
279+ if (!sync_pause_workers_are_started || sync_pause_requested ) {
280+ pthread_mutex_unlock (& sync_pause_lock );
281+ return -1 ;
282+ }
283+ if (rts_exit ) {
284+ pthread_mutex_unlock (& sync_pause_lock );
285+ return -1 ;
286+ }
287+
288+ sync_pause_requested = 1 ;
289+ sync_pause_owner = owner ;
290+ sync_pause_parked_count = 0 ;
291+ sync_pause_clear_parked ();
292+
293+ wake_all_wt ();
294+ while (sync_pause_parked_count < num_wthreads && !rts_exit ) {
295+ sync_pause_wait ();
296+ }
297+ if (rts_exit ) {
298+ sync_pause_requested = 0 ;
299+ sync_pause_owner = -1 ;
300+ sync_pause_parked_count = 0 ;
301+ pthread_cond_broadcast (& sync_pause_cond );
302+ pthread_mutex_unlock (& sync_pause_lock );
303+ return -1 ;
304+ }
305+
306+ pthread_mutex_unlock (& sync_pause_lock );
307+ return 0 ;
308+ }
309+
310+ void acton_sync_pause_end (void ) {
311+ WorkerCtx wctx = GET_WCTX ();
312+ if (wctx == NULL || wctx -> id < 0 || wctx -> id > num_wthreads ) {
313+ return ;
314+ }
315+ int owner = (int )wctx -> id ;
316+
317+ pthread_mutex_lock (& sync_pause_lock );
318+ if (!sync_pause_requested || sync_pause_owner != owner ) {
319+ pthread_mutex_unlock (& sync_pause_lock );
320+ return ;
321+ }
322+
323+ sync_pause_requested = 0 ;
324+ sync_pause_owner = -1 ;
325+ sync_pause_parked_count = 0 ;
326+ pthread_cond_broadcast (& sync_pause_cond );
327+ pthread_mutex_unlock (& sync_pause_lock );
328+ }
329+
330+ // Called from the worker loop between actor continuations. If a sync pause is
331+ // active, non-owner workers park here while the owner runs the synchronized op.
332+ static void maybe_sync_pause (void ) {
333+ WorkerCtx wctx = GET_WCTX ();
334+ if (wctx == NULL || wctx -> id < 0 || wctx -> id >= MAX_WTHREADS ) {
335+ return ;
336+ }
337+ int id = (int )wctx -> id ;
338+
339+ pthread_mutex_lock (& sync_pause_lock );
340+ while (sync_pause_requested && id != sync_pause_owner && !rts_exit ) {
341+ if (!sync_pause_parked [id ]) {
342+ sync_pause_parked [id ] = 1 ;
343+ sync_pause_parked_count ++ ;
344+ pthread_cond_broadcast (& sync_pause_cond );
345+ }
346+ sync_pause_wait ();
347+ }
348+ pthread_mutex_unlock (& sync_pause_lock );
349+ }
350+
351+ static void sync_pause_workers_started (void ) {
352+ pthread_mutex_lock (& sync_pause_lock );
353+ sync_pause_workers_are_started = 1 ;
354+ pthread_mutex_unlock (& sync_pause_lock );
355+ }
356+ #else
357+ int acton_sync_pause_begin (void ) { return 0 ; }
358+ void acton_sync_pause_end (void ) { }
359+ static void maybe_sync_pause (void ) { }
360+ static void sync_pause_workers_started (void ) { }
361+ #endif
362+
237363void wake_wt (int wtid ) {
238364 // We are sometimes optimistically called, i.e. the caller sometimes does
239365 // not really know whether there is new work or not. We check and if there
@@ -1533,6 +1659,7 @@ void wt_work_cb(uv_check_t *ev) {
15331659
15341660 uv_clock_gettime (UV_CLOCK_MONOTONIC , & ts_start );
15351661 while (true) {
1662+ maybe_sync_pause ();
15361663 if (rts_exit ) {
15371664 return ;
15381665 }
@@ -2605,9 +2732,9 @@ int main(int argc, char **argv) {
26052732 }
26062733
26072734#ifdef ACTON_THREADS
2608- if (num_wthreads > MAX_WTHREADS ) {
2609- fprintf (stderr , "ERROR: Maximum of %d worker threads supported.\n" , MAX_WTHREADS );
2610- fprintf (stderr , "HINT: Run this program with fewer worker threads: %s --rts-wthreads %d\n" , argv [0 ], MAX_WTHREADS );
2735+ if (num_wthreads >= MAX_WTHREADS ) {
2736+ fprintf (stderr , "ERROR: Maximum of %d worker threads supported.\n" , MAX_WTHREADS - 1 );
2737+ fprintf (stderr , "HINT: Run this program with fewer worker threads: %s --rts-wthreads %d\n" , argv [0 ], MAX_WTHREADS - 1 );
26112738 exit (1 );
26122739 }
26132740 // Determine number of worker threads, normally 1:1 per CPU thread / core
@@ -2858,6 +2985,7 @@ int main(int argc, char **argv) {
28582985 //pthread_setaffinity_np(threads[idx-1], sizeof(cpu_set), &cpu_set);
28592986 }
28602987 }
2988+ sync_pause_workers_started ();
28612989
28622990 pthread_attr_destroy (& ss_attr );
28632991#endif
0 commit comments