1010//! [`suspend_point`](SnapshotCoordinator::suspend_point) to suspend.
1111//! - The snapshotter waits for every in-flight operation to drain or suspend, takes its snapshot,
1212//! then wakes everyone.
13- //!
14- //! ## Why this is a module
15- //!
16- //! The inner protocol mixes a tagged atomic counter with a mutex-protected
17- //! flag and two condvars. Implementing it correctly on weak memory hardware
18- //! requires careful pairing: every notify must be issued under the same mutex
19- //! the waiters use, and every predicate read must be Acquire-ordered. The
20- //! module hides those details so callers cannot reach in and break them.
21- //!
22- //! ## Test surface
23- //!
24- //! The coordinator is generic over the suspended-operation type, so unit
25- //! tests can use a trivial placeholder (`()` or a small struct) to exercise
26- //! the protocol without dragging in the rest of the backend.
2713
2814use std:: sync:: {
2915 Arc ,
@@ -37,9 +23,6 @@ use crate::utils::ptr_eq_arc::PtrEqArc;
3723
3824/// High bit: set while a snapshot is requested or in flight.
3925/// Low bits: count of operations currently executing (not suspended).
40- ///
41- /// We pack both into one atomic so a single RMW can answer both questions
42- /// (count and bit) without taking the mutex on the operation hot path.
4326const SNAPSHOT_REQUESTED_BIT : usize = 1 << ( usize:: BITS - 1 ) ;
4427
4528/// State protected by the mutex. Kept tiny so critical sections stay short.
@@ -365,10 +348,12 @@ mod tests {
365348 let started_snapshot = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
366349
367350 let coord2 = coord. clone ( ) ;
368- let started2 = started_snapshot. clone ( ) ;
369- let snap_thread = thread:: spawn ( move || {
370- let _phase = coord2. begin_snapshot ( ) ;
371- started2. store ( 1 , Ordering :: Release ) ;
351+ let snap_thread = thread:: spawn ( {
352+ let started_snapshot = started_snapshot. clone ( ) ;
353+ move || {
354+ let _phase = coord2. begin_snapshot ( ) ;
355+ started_snapshot. store ( 1 , Ordering :: Release ) ;
356+ }
372357 } ) ;
373358
374359 // Give the snapshotter time to set the bit and start waiting.
@@ -388,10 +373,12 @@ mod tests {
388373 let started_op = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
389374
390375 let coord2 = coord. clone ( ) ;
391- let started2 = started_op. clone ( ) ;
392- let op_thread = thread:: spawn ( move || {
393- let _g = coord2. begin_operation ( ) ;
394- started2. store ( 1 , Ordering :: Release ) ;
376+ let op_thread = thread:: spawn ( {
377+ let started_op = started_op. clone ( ) ;
378+ move || {
379+ let _guard = coord2. begin_operation ( ) ;
380+ started_op. store ( 1 , Ordering :: Release ) ;
381+ }
395382 } ) ;
396383
397384 thread:: sleep ( Duration :: from_millis ( 50 ) ) ;
@@ -409,14 +396,17 @@ mod tests {
409396
410397 let snapshotter_done = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
411398 let coord_snap = coord. clone ( ) ;
412- let snap_done = snapshotter_done. clone ( ) ;
413- let snap_thread = thread:: spawn ( move || {
414- let phase = coord_snap. begin_snapshot ( ) ;
415- assert_eq ! ( phase. suspended_operations( ) . len( ) , 1 ) ;
416- snap_done. store ( 1 , Ordering :: Release ) ;
417- // Hold the snapshot for a moment so the suspend_point thread
418- // observes `snapshot_requested == true` after waking.
419- thread:: sleep ( Duration :: from_millis ( 20 ) ) ;
399+
400+ let snap_thread = thread:: spawn ( {
401+ let snapshotter_done = snapshotter_done. clone ( ) ;
402+ move || {
403+ let phase = coord_snap. begin_snapshot ( ) ;
404+ assert_eq ! ( phase. suspended_operations( ) . len( ) , 1 ) ;
405+ snapshotter_done. store ( 1 , Ordering :: Release ) ;
406+ // Hold the snapshot for a moment so the suspend_point thread
407+ // observes `snapshot_requested == true` after waking.
408+ thread:: sleep ( Duration :: from_millis ( 20 ) ) ;
409+ }
420410 } ) ;
421411
422412 thread:: sleep ( Duration :: from_millis ( 20 ) ) ;
@@ -439,20 +429,23 @@ mod tests {
439429 timeout : Duration ,
440430 ) -> Arc < std:: sync:: atomic:: AtomicBool > {
441431 let done = Arc :: new ( std:: sync:: atomic:: AtomicBool :: new ( false ) ) ;
442- let done_watch = done. clone ( ) ;
443- thread:: spawn ( move || {
444- let deadline = std:: time:: Instant :: now ( ) + timeout;
445- while std:: time:: Instant :: now ( ) < deadline {
446- if done_watch. load ( Ordering :: Acquire ) {
447- return ;
432+
433+ thread:: spawn ( {
434+ let done_watch = done. clone ( ) ;
435+ move || {
436+ let deadline = std:: time:: Instant :: now ( ) + timeout;
437+ while std:: time:: Instant :: now ( ) < deadline {
438+ if done_watch. load ( Ordering :: Acquire ) {
439+ return ;
440+ }
441+ thread:: sleep ( Duration :: from_millis ( 50 ) ) ;
448442 }
449- thread:: sleep ( Duration :: from_millis ( 50 ) ) ;
443+ eprintln ! (
444+ "[watchdog] {label}: timed out after {timeout:?}, missed-wakeup race likely; \
445+ aborting"
446+ ) ;
447+ std:: process:: abort ( ) ;
450448 }
451- eprintln ! (
452- "[watchdog] {label}: timed out after {timeout:?}, missed-wakeup race likely; \
453- aborting"
454- ) ;
455- std:: process:: abort ( ) ;
456449 } ) ;
457450 done
458451 }
@@ -475,39 +468,46 @@ mod tests {
475468 let mut op_handles = Vec :: new ( ) ;
476469 for _ in 0 ..8 {
477470 let coord = coord. clone ( ) ;
478- let stop = stop. clone ( ) ;
479- op_handles. push ( thread:: spawn ( move || {
480- while !stop. load ( Ordering :: Relaxed ) {
481- let _g = coord. begin_operation ( ) ;
471+ op_handles. push ( thread:: spawn ( {
472+ let stop = stop. clone ( ) ;
473+ move || {
474+ while !stop. load ( Ordering :: Relaxed ) {
475+ let _g = coord. begin_operation ( ) ;
476+ }
482477 }
483478 } ) ) ;
484479 }
485480 let mut snap_handles = Vec :: new ( ) ;
486481 for _ in 0 ..2 {
487- let coord = coord. clone ( ) ;
488- let snapshot_lock = snapshot_lock. clone ( ) ;
489- let snap_count = snap_count. clone ( ) ;
490- snap_handles. push ( thread:: spawn ( move || {
491- for _ in 0 ..200 {
492- let _ser = snapshot_lock. lock ( ) ;
493- let _phase = coord. begin_snapshot ( ) ;
494- snap_count. fetch_add ( 1 , Ordering :: Relaxed ) ;
482+ snap_handles. push ( thread:: spawn ( {
483+ let coord = coord. clone ( ) ;
484+ let snapshot_lock = snapshot_lock. clone ( ) ;
485+ let snap_count = snap_count. clone ( ) ;
486+ move || {
487+ for _ in 0 ..200 {
488+ let _ser = snapshot_lock. lock ( ) ;
489+ let _phase = coord. begin_snapshot ( ) ;
490+ snap_count. fetch_add ( 1 , Ordering :: Relaxed ) ;
491+ }
495492 }
496493 } ) ) ;
497494 }
498495
499496 // Progress watchdog: print snapshot count every 5s so we can see
500497 // if the test is making progress or actually wedged.
501498 let stop_progress = Arc :: new ( std:: sync:: atomic:: AtomicBool :: new ( false ) ) ;
502- let stop_progress_clone = stop_progress. clone ( ) ;
503- let snap_count_clone = snap_count. clone ( ) ;
504- let progress = thread:: spawn ( move || {
505- while !stop_progress_clone. load ( Ordering :: Relaxed ) {
506- thread:: sleep ( Duration :: from_secs ( 5 ) ) ;
507- eprintln ! (
508- "[stress] snapshots completed: {}" ,
509- snap_count_clone. load( Ordering :: Relaxed ) ,
510- ) ;
499+
500+ let progress = thread:: spawn ( {
501+ let stop_progress = stop_progress. clone ( ) ;
502+ let snap_count = snap_count. clone ( ) ;
503+ move || {
504+ while !stop_progress. load ( Ordering :: Relaxed ) {
505+ thread:: sleep ( Duration :: from_secs ( 1 ) ) ;
506+ eprintln ! (
507+ "[stress] snapshots completed: {}" ,
508+ snap_count. load( Ordering :: Relaxed ) ,
509+ ) ;
510+ }
511511 }
512512 } ) ;
513513
@@ -536,24 +536,28 @@ mod tests {
536536
537537 let mut handles = Vec :: new ( ) ;
538538 for _ in 0 ..8 {
539- let coord = coord. clone ( ) ;
540- let counter = counter. clone ( ) ;
541- handles. push ( thread:: spawn ( move || {
542- for _ in 0 ..200 {
543- let _g = coord. begin_operation ( ) ;
544- counter. fetch_add ( 1 , Ordering :: Relaxed ) ;
539+ handles. push ( thread:: spawn ( {
540+ let coord = coord. clone ( ) ;
541+ let counter = counter. clone ( ) ;
542+ move || {
543+ for _ in 0 ..200 {
544+ let _g = coord. begin_operation ( ) ;
545+ counter. fetch_add ( 1 , Ordering :: Relaxed ) ;
546+ }
545547 }
546548 } ) ) ;
547549 }
548550 for _ in 0 ..2 {
549- let coord = coord. clone ( ) ;
550- let snapshot_lock = snapshot_lock. clone ( ) ;
551- handles. push ( thread:: spawn ( move || {
552- for _ in 0 ..50 {
553- let _ser = snapshot_lock. lock ( ) ;
554- let _phase = coord. begin_snapshot ( ) ;
555- // Pretend to do snapshot work.
556- thread:: sleep ( Duration :: from_micros ( 10 ) ) ;
551+ handles. push ( thread:: spawn ( {
552+ let coord = coord. clone ( ) ;
553+ let snapshot_lock = snapshot_lock. clone ( ) ;
554+ move || {
555+ for _ in 0 ..50 {
556+ let _ser = snapshot_lock. lock ( ) ;
557+ let _phase = coord. begin_snapshot ( ) ;
558+ // Pretend to do snapshot work.
559+ thread:: sleep ( Duration :: from_micros ( 10 ) ) ;
560+ }
557561 }
558562 } ) ) ;
559563 }
0 commit comments