@@ -13,10 +13,11 @@ use pulsebeam_core::net::UdpSocket;
1313use std:: collections:: HashMap ;
1414use std:: sync:: Arc ;
1515use std:: sync:: atomic:: { AtomicBool , AtomicUsize , Ordering } ;
16- use std:: time:: { Duration , Instant } ;
16+ use std:: time:: Duration ;
1717use tokio:: runtime:: Builder ;
1818use tokio:: sync:: mpsc;
1919use tokio:: task:: JoinSet ;
20+ use tokio:: time:: Instant ;
2021use tracing:: error;
2122use tracing_subscriber:: { EnvFilter , layer:: SubscriberExt , util:: SubscriberInitExt } ;
2223
@@ -140,22 +141,19 @@ impl Default for AgentDelta {
140141
141142#[ derive( Debug ) ]
142143pub struct AgentStatReport {
144+ pub agent_id : usize ,
143145 pub delta : AgentDelta ,
144146}
145147
146148#[ derive( Default ) ]
147149pub struct StatsProcessor {
148- prev_tx_bytes : u64 ,
149- prev_rx_bytes : u64 ,
150150 prev_tx_layers : HashMap < ( Mid , Option < Rid > ) , LayerState > ,
151151 prev_rx_layers : HashMap < ( Mid , Option < Rid > ) , LayerState > ,
152152}
153153
154154#[ derive( Default , Clone ) ]
155155struct LayerState {
156156 packets : u64 ,
157- nacks : u64 ,
158- plis : u64 ,
159157 silent_ticks : u32 ,
160158}
161159
@@ -169,17 +167,15 @@ impl StatsProcessor {
169167 ) -> AgentDelta {
170168 let mut delta = AgentDelta :: default ( ) ;
171169
172- delta. tx_bytes = tx_bytes. saturating_sub ( self . prev_tx_bytes ) ;
173- delta. rx_bytes = rx_bytes. saturating_sub ( self . prev_rx_bytes ) ;
174- self . prev_tx_bytes = tx_bytes;
175- self . prev_rx_bytes = rx_bytes;
170+ delta. tx_bytes = tx_bytes;
171+ delta. rx_bytes = rx_bytes;
176172
177173 for ( mid, rid, packets, nacks, plis) in tx_layers {
178174 let prev = self . prev_tx_layers . entry ( ( mid, rid) ) . or_default ( ) ;
179175 let d_packets = packets. saturating_sub ( prev. packets ) ;
180- delta. tx_packets += d_packets ;
181- delta. tx_nacks += nacks. saturating_sub ( prev . nacks ) ;
182- delta. tx_plis += plis. saturating_sub ( prev . plis ) ;
176+ delta. tx_packets += packets ;
177+ delta. tx_nacks += nacks;
178+ delta. tx_plis += plis;
183179
184180 prev. silent_ticks = if d_packets == 0 {
185181 prev. silent_ticks . saturating_add ( 1 )
@@ -190,16 +186,14 @@ impl StatsProcessor {
190186 delta. tx_active += 1 ;
191187 }
192188 prev. packets = packets;
193- prev. nacks = nacks;
194- prev. plis = plis;
195189 }
196190
197191 for ( mid, rid, packets, nacks, plis, loss) in rx_layers {
198192 let prev = self . prev_rx_layers . entry ( ( mid, rid) ) . or_default ( ) ;
199193 let d_packets = packets. saturating_sub ( prev. packets ) ;
200- delta. rx_packets += d_packets ;
201- delta. rx_nacks += nacks. saturating_sub ( prev . nacks ) ;
202- delta. rx_plis += plis. saturating_sub ( prev . plis ) ;
194+ delta. rx_packets += packets ;
195+ delta. rx_nacks += nacks;
196+ delta. rx_plis += plis;
203197
204198 prev. silent_ticks = if d_packets == 0 {
205199 prev. silent_ticks . saturating_add ( 1 )
@@ -214,8 +208,6 @@ impl StatsProcessor {
214208 }
215209 }
216210 prev. packets = packets;
217- prev. nacks = nacks;
218- prev. plis = plis;
219211 }
220212
221213 delta
@@ -459,29 +451,30 @@ async fn spawn_room(
459451 } ) ;
460452}
461453
454+ #[ derive( Default ) ]
455+ struct AgentHistory {
456+ tx_bytes : u64 ,
457+ rx_bytes : u64 ,
458+ tx_packets : u64 ,
459+ rx_packets : u64 ,
460+ tx_nacks : u64 ,
461+ rx_nacks : u64 ,
462+ tx_plis : u64 ,
463+ rx_plis : u64 ,
464+ }
465+
462466async fn monitor_task ( mut stats_rx : mpsc:: Receiver < AgentStatReport > , state : Arc < SharedState > ) {
463467 let mut interval = tokio:: time:: interval ( Duration :: from_secs ( 5 ) ) ;
464468 interval. set_missed_tick_behavior ( tokio:: time:: MissedTickBehavior :: Skip ) ;
465469 let start = Instant :: now ( ) ;
470+ let mut last_interval = start;
466471
467472 let mut forwarding_hist =
468473 Histogram :: < u64 > :: new_with_max ( HISTOGRAM_MAX_US , HISTOGRAM_SIGFIG ) . unwrap ( ) ;
469474 let mut rtt_hist = Histogram :: < u64 > :: new_with_max ( HISTOGRAM_MAX_US , HISTOGRAM_SIGFIG ) . unwrap ( ) ;
470475
471- let mut tx_bytes = 0u64 ;
472- let mut rx_bytes = 0u64 ;
473- let mut tx_nacks = 0u64 ;
474- let mut rx_nacks = 0u64 ;
475- let mut tx_plis = 0u64 ;
476- let mut rx_plis = 0u64 ;
477-
478- let mut tx_active_streams = 0usize ;
479- let mut rx_active_streams = 0usize ;
480-
481- let mut tx_packets = 0u64 ;
482- let mut rx_packets = 0u64 ;
483- let mut rx_loss_sum = 0.0f32 ;
484- let mut rx_loss_count = 0usize ;
476+ let mut agent_latest: HashMap < usize , AgentDelta > = HashMap :: new ( ) ;
477+ let mut agent_prev: HashMap < usize , AgentHistory > = HashMap :: new ( ) ;
485478
486479 let mut consecutive_high_p99 = 0u32 ;
487480 let mut consecutive_high_loss = 0u32 ;
@@ -490,17 +483,65 @@ async fn monitor_task(mut stats_rx: mpsc::Receiver<AgentStatReport>, state: Arc<
490483 tokio:: select! {
491484 biased;
492485
493- _ = interval. tick( ) => {
486+ now = interval. tick( ) => {
487+ let interval_secs = now. duration_since( last_interval) . as_secs_f64( ) ;
488+ last_interval = now;
489+
490+ if interval_secs <= 0.0 {
491+ continue ;
492+ }
493+
494494 let elapsed = start. elapsed( ) . as_secs( ) ;
495495 if elapsed == 0 { continue ; }
496496
497497 let rooms = state. active_rooms. load( Ordering :: Relaxed ) ;
498498 let agents = state. active_agents. load( Ordering :: Relaxed ) ;
499499
500- let tx_mbps = ( tx_bytes * 8 ) as f64 / 1_000_000.0 ;
501- let rx_mbps = ( rx_bytes * 8 ) as f64 / 1_000_000.0 ;
502- let tx_pps = tx_packets;
503- let rx_pps = rx_packets;
500+ let mut total_tx_bytes_delta = 0u64 ;
501+ let mut total_rx_bytes_delta = 0u64 ;
502+ let mut total_tx_packets_delta = 0u64 ;
503+ let mut total_rx_packets_delta = 0u64 ;
504+ let mut total_tx_nacks_delta = 0u64 ;
505+ let mut total_rx_nacks_delta = 0u64 ;
506+ let mut total_tx_plis_delta = 0u64 ;
507+ let mut total_rx_plis_delta = 0u64 ;
508+
509+ let mut tx_active_streams = 0usize ;
510+ let mut rx_active_streams = 0usize ;
511+ let mut rx_loss_sum = 0.0f32 ;
512+ let mut rx_loss_count = 0usize ;
513+
514+ for ( & id, latest) in & agent_latest {
515+ let prev = agent_prev. entry( id) . or_default( ) ;
516+
517+ total_tx_bytes_delta += latest. tx_bytes. saturating_sub( prev. tx_bytes) ;
518+ total_rx_bytes_delta += latest. rx_bytes. saturating_sub( prev. rx_bytes) ;
519+ total_tx_packets_delta += latest. tx_packets. saturating_sub( prev. tx_packets) ;
520+ total_rx_packets_delta += latest. rx_packets. saturating_sub( prev. rx_packets) ;
521+ total_tx_nacks_delta += latest. tx_nacks. saturating_sub( prev. tx_nacks) ;
522+ total_rx_nacks_delta += latest. rx_nacks. saturating_sub( prev. rx_nacks) ;
523+ total_tx_plis_delta += latest. tx_plis. saturating_sub( prev. tx_plis) ;
524+ total_rx_plis_delta += latest. rx_plis. saturating_sub( prev. rx_plis) ;
525+
526+ tx_active_streams += latest. tx_active;
527+ rx_active_streams += latest. rx_active;
528+ rx_loss_sum += latest. rx_loss_sum;
529+ rx_loss_count += latest. rx_loss_count;
530+
531+ prev. tx_bytes = latest. tx_bytes;
532+ prev. rx_bytes = latest. rx_bytes;
533+ prev. tx_packets = latest. tx_packets;
534+ prev. rx_packets = latest. rx_packets;
535+ prev. tx_nacks = latest. tx_nacks;
536+ prev. rx_nacks = latest. rx_nacks;
537+ prev. tx_plis = latest. tx_plis;
538+ prev. rx_plis = latest. rx_plis;
539+ }
540+
541+ let tx_mbps = ( total_tx_bytes_delta as f64 * 8.0 ) / 1_000_000.0 / interval_secs;
542+ let rx_mbps = ( total_rx_bytes_delta as f64 * 8.0 ) / 1_000_000.0 / interval_secs;
543+ let tx_pps = total_tx_packets_delta as f64 / interval_secs;
544+ let rx_pps = total_rx_packets_delta as f64 / interval_secs;
504545
505546 let ( p50, p95, p99) = if forwarding_hist. len( ) > 0 {
506547 (
@@ -536,9 +577,9 @@ async fn monitor_task(mut stats_rx: mpsc::Receiver<AgentStatReport>, state: Arc<
536577 let rtt99_str = rtt99. map( |v| format!( "{:.3}" , v) ) . unwrap_or_else( || "NA" . to_string( ) ) ;
537578
538579 println!(
539- "{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}" ,
580+ "{},{},{},{:.3 },{:.3 },{:.1 },{:.1 },{:.2 },{},{},{},{},{},{},{},{},{},{},{},{}" ,
540581 elapsed, rooms, agents, tx_mbps, rx_mbps, tx_pps, rx_pps, avg_loss_pct,
541- tx_nacks , rx_nacks , tx_plis , rx_plis ,
582+ total_tx_nacks_delta , total_rx_nacks_delta , total_tx_plis_delta , total_rx_plis_delta ,
542583 p50_str, p95_str, p99_str, rtt50_str, rtt95_str, rtt99_str,
543584 tx_active_streams, rx_active_streams,
544585 ) ;
@@ -561,49 +602,25 @@ async fn monitor_task(mut stats_rx: mpsc::Receiver<AgentStatReport>, state: Arc<
561602 consecutive_high_loss = 0 ;
562603 }
563604
564- tx_bytes = 0 ;
565- rx_bytes = 0 ;
566- tx_packets = 0 ;
567- rx_packets = 0 ;
568- tx_nacks = 0 ;
569- rx_nacks = 0 ;
570- tx_plis = 0 ;
571- rx_plis = 0 ;
572- tx_active_streams = 0 ;
573- rx_active_streams = 0 ;
574- rx_loss_sum = 0.0 ;
575- rx_loss_count = 0 ;
576605 forwarding_hist. reset( ) ;
577606 rtt_hist. reset( ) ;
578607 }
579608
580609 Some ( report) = stats_rx. recv( ) => {
581- tx_bytes += report. delta. tx_bytes;
582- rx_bytes += report. delta. rx_bytes;
583- tx_packets += report. delta. tx_packets;
584- rx_packets += report. delta. rx_packets;
585- tx_nacks += report. delta. tx_nacks;
586- rx_nacks += report. delta. rx_nacks;
587- tx_plis += report. delta. tx_plis;
588- rx_plis += report. delta. rx_plis;
589- tx_active_streams += report. delta. tx_active;
590- rx_active_streams += report. delta. rx_active;
591- rx_loss_sum += report. delta. rx_loss_sum;
592- rx_loss_count += report. delta. rx_loss_count;
593-
594- for sample in report. delta. forwarding_samples {
595- let _ = forwarding_hist. record( sample) ;
610+ for sample in & report. delta. forwarding_samples {
611+ let _ = forwarding_hist. record( * sample) ;
596612 }
597- for sample in report. delta. rtt_samples {
598- let _ = rtt_hist. record( sample) ;
613+ for sample in & report. delta. rtt_samples {
614+ let _ = rtt_hist. record( * sample) ;
599615 }
616+ agent_latest. insert( report. agent_id, report. delta) ;
600617 }
601618 }
602619 }
603620}
604621
605622async fn spawn_agent (
606- _id : usize ,
623+ id : usize ,
607624 api_url : String ,
608625 room : String ,
609626 is_pub : bool ,
@@ -702,7 +719,7 @@ async fn spawn_agent(
702719 delta. forwarding_samples. push( sample) ;
703720 }
704721
705- if stats_tx. try_send( AgentStatReport { delta } ) . is_err( ) {
722+ if stats_tx. try_send( AgentStatReport { agent_id : id , delta } ) . is_err( ) {
706723 tracing:: warn!( "stats channel full, dropping report" ) ;
707724 }
708725 }
@@ -780,6 +797,9 @@ async fn run_connect(
780797 let mut stats_processor = StatsProcessor :: default ( ) ;
781798 let mut stats_interval = tokio:: time:: interval ( Duration :: from_secs ( 1 ) ) ;
782799 let start = Instant :: now ( ) ;
800+ let mut last_stats = start;
801+
802+ let mut prev_history = AgentHistory :: default ( ) ;
783803
784804 let mut forwarding_hist =
785805 Histogram :: < u64 > :: new_with_max ( HISTOGRAM_MAX_US , HISTOGRAM_SIGFIG ) . unwrap ( ) ;
@@ -812,6 +832,14 @@ async fn run_connect(
812832 }
813833
814834 _ = stats_interval. tick( ) => {
835+ let now = Instant :: now( ) ;
836+ let interval_secs = now. duration_since( last_stats) . as_secs_f64( ) ;
837+ last_stats = now;
838+
839+ if interval_secs <= 0.0 {
840+ continue ;
841+ }
842+
815843 let elapsed = start. elapsed( ) . as_secs( ) ;
816844 let Some ( stats) = agent. get_stats( ) . await else { continue } ;
817845
@@ -841,7 +869,25 @@ async fn run_connect(
841869 } )
842870 } ) ;
843871
844- let delta = stats_processor. process( tx_bytes, rx_bytes, tx_iter, rx_iter) ;
872+ let latest = stats_processor. process( tx_bytes, rx_bytes, tx_iter, rx_iter) ;
873+
874+ let d_tx_bytes = latest. tx_bytes. saturating_sub( prev_history. tx_bytes) ;
875+ let d_rx_bytes = latest. rx_bytes. saturating_sub( prev_history. rx_bytes) ;
876+ let d_tx_packets = latest. tx_packets. saturating_sub( prev_history. tx_packets) ;
877+ let d_rx_packets = latest. rx_packets. saturating_sub( prev_history. rx_packets) ;
878+ let d_tx_nacks = latest. tx_nacks. saturating_sub( prev_history. tx_nacks) ;
879+ let d_rx_nacks = latest. rx_nacks. saturating_sub( prev_history. rx_nacks) ;
880+ let d_tx_plis = latest. tx_plis. saturating_sub( prev_history. tx_plis) ;
881+ let d_rx_plis = latest. rx_plis. saturating_sub( prev_history. rx_plis) ;
882+
883+ prev_history. tx_bytes = latest. tx_bytes;
884+ prev_history. rx_bytes = latest. rx_bytes;
885+ prev_history. tx_packets = latest. tx_packets;
886+ prev_history. rx_packets = latest. rx_packets;
887+ prev_history. tx_nacks = latest. tx_nacks;
888+ prev_history. rx_nacks = latest. rx_nacks;
889+ prev_history. tx_plis = latest. tx_plis;
890+ prev_history. rx_plis = latest. rx_plis;
845891
846892 let ( p50, p95, p99) = if forwarding_hist. len( ) > 0 {
847893 (
@@ -863,13 +909,13 @@ async fn run_connect(
863909 ( None , None , None )
864910 } ;
865911
866- let tx_mbps = ( delta . tx_bytes * 8 ) as f64 / 1_000_000.0 ;
867- let rx_mbps = ( delta . rx_bytes * 8 ) as f64 / 1_000_000.0 ;
868- let tx_pps = delta . tx_packets ;
869- let rx_pps = delta . rx_packets ;
912+ let tx_mbps = ( d_tx_bytes as f64 * 8.0 ) / 1_000_000.0 / interval_secs ;
913+ let rx_mbps = ( d_rx_bytes as f64 * 8.0 ) / 1_000_000.0 / interval_secs ;
914+ let tx_pps = d_tx_packets as f64 / interval_secs ;
915+ let rx_pps = d_rx_packets as f64 / interval_secs ;
870916
871- let avg_loss_pct = if delta . rx_loss_count > 0 {
872- delta . rx_loss_sum / delta . rx_loss_count as f32 * 100.0
917+ let avg_loss_pct = if latest . rx_loss_count > 0 {
918+ latest . rx_loss_sum / latest . rx_loss_count as f32 * 100.0
873919 } else {
874920 0.0
875921 } ;
@@ -882,12 +928,12 @@ async fn run_connect(
882928 let rtt99_str = rtt99. map( |v| format!( "{:>10.3}" , v) ) . unwrap_or_else( || " NA" . to_string( ) ) ;
883929
884930 eprintln!(
885- "{:>8} {:>8.2} {:>8.2} {:>10} {:>10} {:>10.2} {:>10} {:>10} {:>10} {:>10} {} {} {} {} {} {} {:>10} {:>10}" ,
931+ "{:>8} {:>8.2} {:>8.2} {:>10.0 } {:>10.0 } {:>10.2} {:>10} {:>10} {:>10} {:>10} {} {} {} {} {} {} {:>10} {:>10}" ,
886932 elapsed, tx_mbps, rx_mbps, tx_pps, rx_pps, avg_loss_pct,
887- delta . tx_nacks , delta . rx_nacks , delta . tx_plis , delta . rx_plis ,
933+ d_tx_nacks , d_rx_nacks , d_tx_plis , d_rx_plis ,
888934 p50_str, p95_str, p99_str,
889935 rtt50_str, rtt95_str, rtt99_str,
890- delta . tx_active, delta . rx_active
936+ latest . tx_active, latest . rx_active
891937 ) ;
892938
893939 forwarding_hist. reset( ) ;
@@ -941,8 +987,10 @@ mod tests {
941987
942988 assert_eq ! ( delta. tx_bytes, 1000 ) ;
943989 assert_eq ! ( delta. rx_bytes, 2000 ) ;
990+ assert_eq ! ( delta. tx_packets, 100 ) ;
944991 assert_eq ! ( delta. tx_nacks, 1 ) ;
945992 assert_eq ! ( delta. tx_plis, 2 ) ;
993+ assert_eq ! ( delta. rx_packets, 200 ) ;
946994 assert_eq ! ( delta. rx_nacks, 3 ) ;
947995 assert_eq ! ( delta. rx_plis, 4 ) ;
948996 assert_eq ! ( delta. tx_active, 1 ) ;
0 commit comments