@@ -33,7 +33,7 @@ mod cuda;
3333#[ cfg( all( feature = "gui" , any( target_os = "linux" , target_os = "windows" ) ) ) ]
3434mod gui;
3535
36- use std:: collections:: HashMap ;
36+ use std:: collections:: { HashMap , VecDeque } ;
3737use std:: io:: { Cursor , Read } ;
3838use std:: path:: { Path , PathBuf } ;
3939use std:: sync:: atomic:: { AtomicBool , AtomicUsize , Ordering } ;
@@ -512,6 +512,55 @@ struct Status {
512512 /// a fatal stop. Surfaced in the tray so a Windows user (whose console logs are
513513 /// swallowed by the GUI subsystem) can tell an idle worker from a broken one.
514514 note : Arc < Mutex < String > > ,
515+ /// Rolling count of work items ("tries") run on the GPU, for a live rate in
516+ /// the tray. Fed per kernel tile by every GPU, so it aggregates across them.
517+ tries : Arc < Throughput > ,
518+ }
519+
520+ /// Timestamped record of GPU work items, for a ~1-minute average "tries/second".
521+ /// Fed per kernel tile (not per fragment) so the rate stays smooth even while a
522+ /// single multi-hundred-billion-item fragment runs for minutes.
523+ #[ derive( Default ) ]
524+ struct Throughput {
525+ /// `(instant, cumulative items)` samples spanning the window; the front is the
526+ /// window anchor, the back the latest total. Bounded to ~[`Self::WINDOW`].
527+ samples : Mutex < VecDeque < ( Instant , u64 ) > > ,
528+ }
529+
530+ impl Throughput {
531+ const WINDOW : Duration = Duration :: from_secs ( 60 ) ;
532+
533+ /// Add `items` just completed on the GPU (per tile). O(1) amortized.
534+ fn record ( & self , items : u64 ) {
535+ if items == 0 {
536+ return ;
537+ }
538+ let now = Instant :: now ( ) ;
539+ let mut s = self . samples . lock ( ) . unwrap ( ) ;
540+ let cum = s. back ( ) . map ( |& ( _, c) | c) . unwrap_or ( 0 ) + items;
541+ s. push_back ( ( now, cum) ) ;
542+ // Keep exactly one sample older than the window as the rate anchor.
543+ while s. len ( ) > 2
544+ && s. get ( 1 )
545+ . is_some_and ( |& ( t, _) | now. duration_since ( t) >= Self :: WINDOW )
546+ {
547+ s. pop_front ( ) ;
548+ }
549+ }
550+
551+ /// Average items/second across the retained window (up to ~1 min), or `None`
552+ /// until two samples span a positive interval. Only read by the GUI tray.
553+ #[ cfg_attr(
554+ not( all( feature = "gui" , any( target_os = "linux" , target_os = "windows" ) ) ) ,
555+ allow( dead_code)
556+ ) ]
557+ fn per_sec ( & self ) -> Option < f64 > {
558+ let s = self . samples . lock ( ) . unwrap ( ) ;
559+ let ( t0, c0) = * s. front ( ) ?;
560+ let ( t1, c1) = * s. back ( ) ?;
561+ let dt = t1. duration_since ( t0) . as_secs_f64 ( ) ;
562+ ( dt > 0.0 ) . then ( || ( c1 - c0) as f64 / dt)
563+ }
515564}
516565
517566impl Status {
@@ -552,6 +601,21 @@ impl Status {
552601 }
553602 }
554603
604+ /// Record `items` work items just run on the GPU (fed per kernel tile).
605+ fn record_tries ( & self , items : u64 ) {
606+ self . tries . record ( items) ;
607+ }
608+
609+ /// Recent GPU throughput in items/second (~1-minute average), or `None` when
610+ /// nothing has run recently. Only read by the GUI tray.
611+ #[ cfg_attr(
612+ not( all( feature = "gui" , any( target_os = "linux" , target_os = "windows" ) ) ) ,
613+ allow( dead_code)
614+ ) ]
615+ fn tries_per_sec ( & self ) -> Option < f64 > {
616+ self . tries . per_sec ( )
617+ }
618+
555619 /// Record why the worker is idle (shown in the tray). Set by the pipeline on
556620 /// pull errors / no-work and by the GUI on a fatal worker stop.
557621 fn set_note ( & self , note : impl Into < String > ) {
@@ -763,7 +827,15 @@ fn run_on_gpu(ordinal: i32, job: ReadyJob, status: &Status) -> Result<FinishedJo
763827 job. manifest . out_cap ,
764828 job. manifest . block ,
765829 job. manifest . tile ,
766- |_done, _total| { } ,
830+ {
831+ // Feed the tray's rate meter per tile: record the item delta since the
832+ // last callback (`done` is cumulative within the fragment).
833+ let mut reported = 0u64 ;
834+ move |done, _total| {
835+ status. record_tries ( done. saturating_sub ( reported) ) ;
836+ reported = done;
837+ }
838+ } ,
767839 // Between tiles, park here while paused so the GPU stops computing
768840 // promptly and the current fragment resumes losslessly.
769841 || status. wait_while_paused ( ) ,
@@ -1169,6 +1241,25 @@ mod tests {
11691241 let _ = std:: fs:: remove_dir_all ( & dir) ;
11701242 }
11711243
1244+ #[ test]
1245+ fn throughput_reports_a_positive_rate ( ) {
1246+ let t = Throughput :: default ( ) ;
1247+ assert ! ( t. per_sec( ) . is_none( ) , "no rate before any samples" ) ;
1248+ t. record ( 0 ) ; // ignored
1249+ t. record ( 1_000_000 ) ;
1250+ thread:: sleep ( Duration :: from_millis ( 30 ) ) ;
1251+ t. record ( 1_000_000 ) ;
1252+ let r = t. per_sec ( ) . expect ( "a rate after two spaced samples" ) ;
1253+ // ~2M items over ~30ms; timing is loose, so only assert it's a sane rate.
1254+ assert ! ( r > 0.0 , "rate should be positive, got {r}" ) ;
1255+ // The window keeps the deque bounded — a burst of samples collapses to the
1256+ // anchor + recents, never unbounded.
1257+ for _ in 0 ..1000 {
1258+ t. record ( 1 ) ;
1259+ }
1260+ assert ! ( t. samples. lock( ) . unwrap( ) . len( ) <= 1002 ) ;
1261+ }
1262+
11721263 #[ test]
11731264 fn pause_gate_blocks_until_resumed ( ) {
11741265 let status = Status :: default ( ) ;
0 commit comments