@@ -211,27 +211,52 @@ impl LogDeduplicator {
211211 }
212212}
213213
214- /// Returns the updated consecutive all-new batch count and whether we should warn .
214+ /// Tracks consecutive batches of all-new logs and manages warning state .
215215///
216- /// A batch is considered "all-new" if `fetched_count > 0` and `unique_count == fetched_count`.
217- /// - If the batch is all-new, the counter increments; when it reaches `threshold`, we reset it to 0 and return `should_warn = true`.
218- /// - If the batch is not all-new (including `fetched_count == 0`), the counter resets to 0 and `should_warn = false`.
219- fn evaluate_all_new_batch_state (
220- previous_count : usize ,
221- fetched_count : usize ,
222- unique_count : usize ,
223- threshold : usize ,
224- ) -> ( usize , bool ) {
225- let all_new_batch = fetched_count > 0 && unique_count == fetched_count;
226- if all_new_batch {
227- let updated = previous_count + 1 ;
228- if updated >= threshold {
229- ( 0 , true )
216+ /// A batch is "all-new" when every fetched log is unique (no duplicates).
217+ /// This struct tracks how many consecutive all-new batches we've seen and
218+ /// warns when the count reaches the threshold, suggesting the user might be
219+ /// missing some logs due to overly broad filtering.
220+ #[ derive( Debug ) ]
221+ struct ConsecutiveNewOnlyTracker {
222+ consecutive_count : usize ,
223+ warning_threshold : usize ,
224+ }
225+
226+ impl ConsecutiveNewOnlyTracker {
227+ /// Creates a new tracker with the specified warning threshold.
228+ fn new ( warning_threshold : usize ) -> Self {
229+ Self {
230+ consecutive_count : 0 ,
231+ warning_threshold,
232+ }
233+ }
234+
235+ /// Processes a new batch and returns whether to show a warning.
236+ ///
237+ /// A batch is considered "all-new" if `fetched_count > 0` and `unique_count == fetched_count`.
238+ /// Returns `true` when the warning threshold is reached, `false` otherwise.
239+ fn process_batch ( & mut self , fetched_count : usize , unique_count : usize ) -> bool {
240+ let is_all_new_batch = fetched_count > 0 && unique_count == fetched_count;
241+
242+ if is_all_new_batch {
243+ self . consecutive_count += 1 ;
244+ if self . consecutive_count >= self . warning_threshold {
245+ self . consecutive_count = 0 ; // Reset counter
246+ true // Show warning
247+ } else {
248+ false // No warning yet
249+ }
230250 } else {
231- ( updated, false )
251+ self . consecutive_count = 0 ; // Reset counter
252+ false // No warning
232253 }
233- } else {
234- ( 0 , false )
254+ }
255+
256+ /// Gets the current consecutive count (useful for debugging/testing).
257+ #[ cfg( test) ]
258+ fn consecutive_count ( & self ) -> usize {
259+ self . consecutive_count
235260 }
236261}
237262
@@ -245,8 +270,7 @@ fn execute_live_streaming(
245270) -> Result < ( ) > {
246271 let mut deduplicator = LogDeduplicator :: new ( MAX_DEDUP_BUFFER_SIZE ) ;
247272 let poll_duration = Duration :: from_secs ( args. poll_interval ) ;
248- let mut consecutive_new_only_count = 0 ;
249- const WARNING_THRESHOLD : usize = 3 ; // Show message every 3 consecutive all-new polls
273+ let mut new_only_tracker = ConsecutiveNewOnlyTracker :: new ( 3 ) ; // Warn after 3 consecutive batches of only new logs
250274
251275 println ! ( "Starting live log streaming..." ) ;
252276 println ! (
@@ -288,21 +312,16 @@ fn execute_live_streaming(
288312 let fetched_count = logs. len ( ) ;
289313 let unique_logs = deduplicator. add_logs ( logs) ;
290314
291- let ( new_count, should_warn) = evaluate_all_new_batch_state (
292- consecutive_new_only_count,
293- fetched_count,
294- unique_logs. len ( ) ,
295- WARNING_THRESHOLD ,
296- ) ;
297- consecutive_new_only_count = new_count;
315+ let should_warn = new_only_tracker. process_batch ( fetched_count, unique_logs. len ( ) ) ;
298316 if should_warn {
299317 let suggestion_suffix = if args. query . trim ( ) . is_empty ( ) {
300- Cow :: Borrowed ( "" )
318+ ""
301319 } else {
302- Cow :: Owned ( format ! ( " (current filter: \" {}\" )" , args. query) )
320+ & format ! ( " (current filter: \" {}\" )" , args. query)
303321 } ;
304322 let msg = format ! (
305- "Only new logs received in the last {WARNING_THRESHOLD} polls. You may be missing some logs. Consider narrowing your query filter{suggestion_suffix}."
323+ "Only new logs received in the last {} polls. You may be missing some logs. Consider narrowing your query filter{suggestion_suffix}." ,
324+ new_only_tracker. warning_threshold
306325 ) ;
307326 pending_warning = Some ( msg) ;
308327 }
@@ -463,31 +482,39 @@ mod tests {
463482 }
464483
465484 #[ test]
466- fn test_evaluate_all_new_batch_state_increments_and_warns ( ) {
467- let threshold = 3 ;
485+ fn test_consecutive_new_only_tracker_creation ( ) {
486+ let tracker = ConsecutiveNewOnlyTracker :: new ( 5 ) ;
487+ assert_eq ! ( tracker. consecutive_count( ) , 0 ) ;
488+ assert_eq ! ( tracker. warning_threshold, 5 ) ;
489+ }
490+
491+ #[ test]
492+ fn test_consecutive_new_only_tracker_increments_and_warns ( ) {
493+ let mut tracker = ConsecutiveNewOnlyTracker :: new ( 3 ) ;
494+
468495 // First all-new batch
469- let ( count1 , warn1) = evaluate_all_new_batch_state ( 0 , 5 , 5 , threshold ) ;
470- assert_eq ! ( count1 , 1 ) ;
496+ let warn1 = tracker . process_batch ( 5 , 5 ) ;
497+ assert_eq ! ( tracker . consecutive_count ( ) , 1 ) ;
471498 assert ! ( !warn1) ;
472499
473500 // Second all-new batch
474- let ( count2 , warn2) = evaluate_all_new_batch_state ( count1 , 2 , 2 , threshold ) ;
475- assert_eq ! ( count2 , 2 ) ;
501+ let warn2 = tracker . process_batch ( 2 , 2 ) ;
502+ assert_eq ! ( tracker . consecutive_count ( ) , 2 ) ;
476503 assert ! ( !warn2) ;
477504
478505 // Third all-new batch should warn and reset
479- let ( count3 , warn3) = evaluate_all_new_batch_state ( count2 , 10 , 10 , threshold ) ;
480- assert_eq ! ( count3 , 0 ) ;
506+ let warn3 = tracker . process_batch ( 10 , 10 ) ;
507+ assert_eq ! ( tracker . consecutive_count ( ) , 0 ) ;
481508 assert ! ( warn3) ;
482509
483510 // Non all-new batch resets
484- let ( count4 , warn4) = evaluate_all_new_batch_state ( 2 , 4 , 3 , threshold ) ;
485- assert_eq ! ( count4 , 0 ) ;
511+ let warn4 = tracker . process_batch ( 4 , 3 ) ;
512+ assert_eq ! ( tracker . consecutive_count ( ) , 0 ) ;
486513 assert ! ( !warn4) ;
487514
488515 // Empty fetch resets
489- let ( count5 , warn5) = evaluate_all_new_batch_state ( 2 , 0 , 0 , threshold ) ;
490- assert_eq ! ( count5 , 0 ) ;
516+ let warn5 = tracker . process_batch ( 0 , 0 ) ;
517+ assert_eq ! ( tracker . consecutive_count ( ) , 0 ) ;
491518 assert ! ( !warn5) ;
492519 }
493520
0 commit comments