@@ -19,9 +19,11 @@ use bb8_redis::{
1919 RedisConnectionManager ,
2020} ;
2121use queue:: Queue ;
22+ use rand:: Rng ;
2223pub use session:: { MessageID , SessionID } ;
2324use std:: collections:: HashMap ;
2425use std:: sync:: Arc ;
26+ use std:: time:: SystemTime ;
2527use tokio:: sync:: { Mutex , Semaphore } ;
2628use tokio:: time:: Duration ;
2729use tokio_util:: sync:: { CancellationToken , DropGuard } ;
@@ -67,7 +69,9 @@ pub const DEFAULT_USERS: usize = 100_1000;
6769const READ_COUNT : usize = 100 ;
6870const RETRY_DELAY : Duration = Duration :: from_secs ( 2 ) ;
6971const QUEUE_MAXLEN : usize = 10000 ;
70- const QUEUE_EXPIRE : usize = 3600 ; // queues can live max of 1 hour
72+ const QUEUE_EXPIRE : usize = 1800 ; // 30 minutes (matches MAX_TTL)
73+ const MAX_TTL_MS : u64 = 1800 * 1000 ; // 30 minutes in milliseconds
74+ const XTRIM_SAMPLE_RATE : u32 = 10 ; // 10% of sends trigger XTRIM
7175
7276#[ derive( thiserror:: Error , Debug ) ]
7377pub enum SwitchError {
@@ -315,6 +319,12 @@ impl Sink {
315319 }
316320}
317321
322+ /// A helper function to decide if a probabilistic action should be triggered.
323+ fn passes_chance ( sample_rate : u32 ) -> bool {
324+ // If sample_rate is 10, this creates a 10% chance of being true.
325+ rand:: thread_rng ( ) . gen_range ( 0 ..100 ) < sample_rate
326+ }
327+
318328async fn send (
319329 stream_id : & SessionID ,
320330 pool : & Pool < RedisConnectionManager > ,
@@ -344,6 +354,39 @@ async fn send(
344354 . query_async ( & mut * con)
345355 . await ?;
346356
357+ // The cleanup strategy is multi-layered but it's necessary to ensure
358+ // the system doesn't grow indefinitely.
359+ //
360+ // Without both EXPIRE and XTRIM, the system would be incomplete
361+ // EXPIRE alone can't clean active streams, and XTRIM alone wouldn't remove streams for clients that have disconnected permanently.
362+ // so basically:
363+ // - EXPIRE: It's the garbage collector for abandoned or idle streams.
364+ // - XTRIM MINID: It's the housekeeper for busy streams, preventing them from becoming bloated with expired messages.
365+ //
366+ // Probabilistic time-based trimming (10% of sends)
367+ // This remove messages older than MAX_TTL_MS which is the max time allowed for a messages in transit.
368+ // running XTRIM on every send would needlessly tax the system for no meaningful gain.
369+ // The 10% probabilistic approach remains superior, automatically adapts, moore sends = more XTRIM
370+ if passes_chance ( XTRIM_SAMPLE_RATE ) {
371+ let now_ms = SystemTime :: now ( )
372+ . duration_since ( SystemTime :: UNIX_EPOCH )
373+ . unwrap ( )
374+ . as_millis ( ) as u64 ;
375+
376+ let cutoff_ms = now_ms. saturating_sub ( MAX_TTL_MS ) ;
377+
378+ // XTRIM is best-effort; don't fail send if it errors
379+ if let Err ( e) = cmd ( "XTRIM" )
380+ . arg ( stream_id)
381+ . arg ( "MINID" )
382+ . arg ( cutoff_ms)
383+ . query_async :: < i64 > ( & mut * con)
384+ . await
385+ {
386+ log:: debug!( "XTRIM failed for {}: {}" , stream_id, e) ;
387+ }
388+ }
389+
347390 MESSAGE_RX . inc ( ) ;
348391 MESSAGE_RX_BYTES . inc_by ( msg. len ( ) as u64 ) ;
349392
@@ -359,7 +402,7 @@ mod test {
359402
360403 use crate :: { redis, relay:: switch:: SessionID } ;
361404
362- use super :: { ConnectionSender , MessageID , Switch } ;
405+ use super :: { passes_chance , ConnectionSender , MessageID , Switch } ;
363406
364407 #[ derive( Clone ) ]
365408 struct MessageSender {
@@ -464,4 +507,76 @@ mod test {
464507
465508 assert_eq ! ( count, expected_count) ;
466509 }
510+
511+ #[ test]
512+ fn test_passes_chance_probability ( ) {
513+ const TOTAL_RUNS : u32 = 100_000 ;
514+ const SAMPLE_RATE : u32 = 10 ; // 10%
515+ let mut true_count = 0 ;
516+
517+ for _ in 0 ..TOTAL_RUNS {
518+ if passes_chance ( SAMPLE_RATE ) {
519+ true_count += 1 ;
520+ }
521+ }
522+
523+ let actual_percentage = ( true_count as f64 / TOTAL_RUNS as f64 ) * 100.0 ;
524+ let expected_percentage = SAMPLE_RATE as f64 ;
525+
526+ // We assert that the actual percentage is within a reasonable margin (e.g., 2%)
527+ // of the expected percentage. This avoids flaky tests due to randomness.
528+ let margin = 2.0 ;
529+ assert ! (
530+ ( actual_percentage - expected_percentage) . abs( ) < margin,
531+ "Actual percentage {:.2}% was not within {:.2}% of expected {}%" ,
532+ actual_percentage,
533+ margin,
534+ expected_percentage
535+ ) ;
536+ }
537+
538+ #[ test]
539+ fn test_passes_chance_scales_with_activity ( ) {
540+ const ROUNDS : u32 = 10000 ;
541+ const HIGH_ACTIVITY_PER_ROUND : u32 = 7 ;
542+ const LOW_ACTIVITY_PER_ROUND : u32 = 3 ;
543+ const SAMPLE_RATE : u32 = 10 ; // 10%
544+
545+ let mut high_activity_hits = 0 ;
546+ let mut low_activity_hits = 0 ;
547+
548+ for _ in 0 ..ROUNDS {
549+ // In each round, simulate 7 high-activity and 3 low-activity sends
550+ for _ in 0 ..HIGH_ACTIVITY_PER_ROUND {
551+ if passes_chance ( SAMPLE_RATE ) {
552+ high_activity_hits += 1 ;
553+ }
554+ }
555+ for _ in 0 ..LOW_ACTIVITY_PER_ROUND {
556+ if passes_chance ( SAMPLE_RATE ) {
557+ low_activity_hits += 1 ;
558+ }
559+ }
560+ }
561+
562+ let total_hits = high_activity_hits + low_activity_hits;
563+ // Avoid division by zero if no hits occurred (very unlikely but possible)
564+ if total_hits == 0 {
565+ return ;
566+ }
567+
568+ let high_activity_proportion = high_activity_hits as f64 / total_hits as f64 ;
569+
570+ // The high-activity group was responsible for 70% of the sends (70000 out of 100000).
571+ // We expect it to receive roughly 70% of the cleanup triggers.
572+ let expected_proportion = 0.7 ;
573+ let margin = 0.05 ; // Allow a 5% margin for randomness
574+
575+ assert ! (
576+ ( high_activity_proportion - expected_proportion) . abs( ) < margin,
577+ "High-activity group received {:.2}% of cleanups, expected around {:.2}%" ,
578+ high_activity_proportion * 100.0 ,
579+ expected_proportion * 100.0
580+ ) ;
581+ }
467582}
0 commit comments