@@ -80,11 +80,6 @@ impl FiltersPipeline {
8080 }
8181 }
8282
83- /// Get the number of active batches.
84- pub ( super ) fn active_count ( & self ) -> usize {
85- self . coordinator . active_count ( )
86- }
87-
8883 /// Take completed batches with their buffered filter data for processing.
8984 pub ( super ) fn take_completed_batches ( & mut self ) -> BTreeSet < FiltersBatch > {
9085 std:: mem:: take ( & mut self . completed_batches )
@@ -315,7 +310,7 @@ mod tests {
315310 fn test_pipeline_new ( ) {
316311 let pipeline = FiltersPipeline :: new ( ) ;
317312
318- assert_eq ! ( pipeline. active_count( ) , 0 ) ;
313+ assert_eq ! ( pipeline. coordinator . active_count( ) , 0 ) ;
319314 assert ! ( pipeline. batch_trackers. is_empty( ) ) ;
320315 assert ! ( pipeline. completed_batches. is_empty( ) ) ;
321316 assert_eq ! ( pipeline. target_height, 0 ) ;
@@ -328,7 +323,7 @@ mod tests {
328323 let default_pipeline = FiltersPipeline :: default ( ) ;
329324 let new_pipeline = FiltersPipeline :: new ( ) ;
330325
331- assert_eq ! ( default_pipeline. active_count( ) , new_pipeline. active_count( ) ) ;
326+ assert_eq ! ( default_pipeline. coordinator . active_count( ) , new_pipeline. coordinator . active_count( ) ) ;
332327 assert_eq ! ( default_pipeline. target_height, new_pipeline. target_height) ;
333328 }
334329
@@ -360,7 +355,7 @@ mod tests {
360355
361356 assert ! ( pipeline. batch_trackers. is_empty( ) ) ;
362357 assert ! ( pipeline. completed_batches. is_empty( ) ) ;
363- assert_eq ! ( pipeline. active_count( ) , 0 ) ;
358+ assert_eq ! ( pipeline. coordinator . active_count( ) , 0 ) ;
364359 assert_eq ! ( pipeline. filters_received, 0 ) ;
365360 // 1 batch queued for heights 200-300
366361 assert_eq ! ( pipeline. coordinator. pending_count( ) , 1 ) ;
@@ -609,7 +604,7 @@ mod tests {
609604 assert_eq ! ( timed_out, vec![ 0 ] ) ;
610605 // Batch should be re-queued in coordinator's pending queue
611606 assert_eq ! ( pipeline. coordinator. pending_count( ) , 1 ) ;
612- assert_eq ! ( pipeline. active_count( ) , 0 ) ;
607+ assert_eq ! ( pipeline. coordinator . active_count( ) , 0 ) ;
613608 }
614609
615610 #[ test]
@@ -660,7 +655,7 @@ mod tests {
660655 pipeline. batch_trackers . insert ( 2000 , BatchTracker :: new ( 2999 ) ) ;
661656 pipeline. coordinator . mark_sent ( & [ 0 , 1000 , 2000 ] ) ;
662657
663- assert_eq ! ( pipeline. active_count( ) , 3 ) ;
658+ assert_eq ! ( pipeline. coordinator . active_count( ) , 3 ) ;
664659 assert_eq ! ( pipeline. coordinator. pending_count( ) , 0 ) ;
665660
666661 // Wait for timeout
@@ -672,7 +667,7 @@ mod tests {
672667
673668 // All 3 batches should be in the pending queue, not duplicated
674669 assert_eq ! ( pipeline. coordinator. pending_count( ) , 3 ) ;
675- assert_eq ! ( pipeline. active_count( ) , 0 ) ;
670+ assert_eq ! ( pipeline. coordinator . active_count( ) , 0 ) ;
676671
677672 // Take pending items - should get exactly 3, not more
678673 let pending = pipeline. coordinator . take_pending ( 10 ) ;
@@ -701,7 +696,7 @@ mod tests {
701696 let count = pipeline. send_pending ( & sender, & storage) . await . unwrap ( ) ;
702697
703698 assert_eq ! ( count, 1 ) ;
704- assert_eq ! ( pipeline. active_count( ) , 1 ) ;
699+ assert_eq ! ( pipeline. coordinator . active_count( ) , 1 ) ;
705700 assert ! ( pipeline. batch_trackers. contains_key( & 0 ) ) ;
706701 // No more pending since the single batch was sent
707702 assert_eq ! ( pipeline. coordinator. pending_count( ) , 0 ) ;
@@ -735,7 +730,7 @@ mod tests {
735730 // Should respect MAX_CONCURRENT_FILTER_BATCHES (20)
736731 // 25 batches needed, but only 20 can be in-flight at once
737732 assert_eq ! ( count, MAX_CONCURRENT_FILTER_BATCHES ) ;
738- assert_eq ! ( pipeline. active_count( ) , MAX_CONCURRENT_FILTER_BATCHES ) ;
733+ assert_eq ! ( pipeline. coordinator . active_count( ) , MAX_CONCURRENT_FILTER_BATCHES ) ;
739734 assert_eq ! ( pipeline. batch_trackers. len( ) , MAX_CONCURRENT_FILTER_BATCHES ) ;
740735 // 5 batches still pending
741736 assert_eq ! ( pipeline. coordinator. pending_count( ) , 5 ) ;
@@ -783,7 +778,7 @@ mod tests {
783778
784779 // Should send all 3 batches: 0-999, 1000-1999, 2000-2500
785780 assert_eq ! ( count, 3 ) ;
786- assert_eq ! ( pipeline. active_count( ) , 3 ) ;
781+ assert_eq ! ( pipeline. coordinator . active_count( ) , 3 ) ;
787782 assert_eq ! ( pipeline. coordinator. pending_count( ) , 0 ) ;
788783 }
789784
@@ -827,7 +822,7 @@ mod tests {
827822 // Send request
828823 let sent = pipeline. send_pending ( & sender, & storage) . await . unwrap ( ) ;
829824 assert_eq ! ( sent, 1 ) ;
830- assert_eq ! ( pipeline. active_count( ) , 1 ) ;
825+ assert_eq ! ( pipeline. coordinator . active_count( ) , 1 ) ;
831826
832827 // Receive all filters
833828 for h in 0 ..=99 {
@@ -836,7 +831,7 @@ mod tests {
836831 }
837832
838833 // Batch should be complete
839- assert_eq ! ( pipeline. active_count( ) , 0 ) ;
834+ assert_eq ! ( pipeline. coordinator . active_count( ) , 0 ) ;
840835 assert_eq ! ( pipeline. completed_batches. len( ) , 1 ) ;
841836 assert_eq ! ( pipeline. filters_received, 100 ) ;
842837 assert_eq ! ( pipeline. highest_received, 99 ) ;
@@ -861,7 +856,7 @@ mod tests {
861856
862857 // Send initial request
863858 pipeline. send_pending ( & sender, & storage) . await . unwrap ( ) ;
864- assert_eq ! ( pipeline. active_count( ) , 1 ) ;
859+ assert_eq ! ( pipeline. coordinator . active_count( ) , 1 ) ;
865860 assert_eq ! ( pipeline. coordinator. pending_count( ) , 0 ) ;
866861
867862 // Wait for timeout
@@ -871,14 +866,14 @@ mod tests {
871866 let timed_out = pipeline. handle_timeouts ( ) ;
872867 assert_eq ! ( timed_out. len( ) , 1 ) ;
873868 assert_eq ! ( pipeline. coordinator. pending_count( ) , 1 ) ;
874- assert_eq ! ( pipeline. active_count( ) , 0 ) ;
869+ assert_eq ! ( pipeline. coordinator . active_count( ) , 0 ) ;
875870
876871 // Tracker should still exist for late arrivals
877872 assert ! ( pipeline. batch_trackers. contains_key( & 0 ) ) ;
878873
879874 // Can retry by sending again
880875 pipeline. send_pending ( & sender, & storage) . await . unwrap ( ) ;
881- assert_eq ! ( pipeline. active_count( ) , 1 ) ;
876+ assert_eq ! ( pipeline. coordinator . active_count( ) , 1 ) ;
882877
883878 // Existing tracker is reused (not replaced)
884879 assert ! ( pipeline. batch_trackers. contains_key( & 0 ) ) ;
0 commit comments