@@ -200,6 +200,30 @@ impl LogDeduplicator {
200200 }
201201}
202202
203+ /// Returns the updated consecutive all-new batch count and whether we should warn.
204+ ///
205+ /// A batch is considered "all-new" if `fetched_count > 0` and `unique_count == fetched_count`.
206+ /// - If the batch is all-new, the counter increments; when it reaches `threshold`, we reset it to 0 and return `should_warn = true`.
207+ /// - If the batch is not all-new (including `fetched_count == 0`), the counter resets to 0 and `should_warn = false`.
208+ fn evaluate_all_new_batch_state (
209+ previous_count : usize ,
210+ fetched_count : usize ,
211+ unique_count : usize ,
212+ threshold : usize ,
213+ ) -> ( usize , bool ) {
214+ let all_new_batch = fetched_count > 0 && unique_count == fetched_count;
215+ if all_new_batch {
216+ let updated = previous_count + 1 ;
217+ if updated >= threshold {
218+ ( 0 , true )
219+ } else {
220+ ( updated, false )
221+ }
222+ } else {
223+ ( 0 , false )
224+ }
225+ }
226+
203227fn execute_live_streaming (
204228 api : & Api ,
205229 org : & str ,
@@ -211,7 +235,7 @@ fn execute_live_streaming(
211235 let mut deduplicator = LogDeduplicator :: new ( MAX_DEDUP_BUFFER_SIZE ) ;
212236 let poll_duration = Duration :: from_secs ( args. poll_interval ) ;
213237 let mut consecutive_new_only_count = 0 ;
214- const WARNING_THRESHOLD : usize = 3 ; // Show message every 3 consecutive empty polls
238+ const WARNING_THRESHOLD : usize = 3 ; // Show message every 3 consecutive all-new polls
215239
216240 println ! ( "Starting live log streaming..." ) ;
217241 println ! (
@@ -230,6 +254,8 @@ fn execute_live_streaming(
230254 . add ( "Trace" ) ;
231255
232256 let mut header_printed = false ;
257+ // Holds a warning message to be printed after the current batch of rows for visibility
258+ let mut pending_warning: Option < String > = None ;
233259
234260 loop {
235261 let options = FetchEventsOptions {
@@ -248,28 +274,31 @@ fn execute_live_streaming(
248274 . fetch_organization_events ( org, & options)
249275 {
250276 Ok ( logs) => {
277+ let fetched_count = logs. len ( ) ;
251278 let unique_logs = deduplicator. add_logs ( logs) ;
252279
253- if unique_logs. is_empty ( ) {
254- consecutive_new_only_count += 1 ;
255-
256- if consecutive_new_only_count >= WARNING_THRESHOLD {
257- if args. query . trim ( ) . is_empty ( ) {
258- eprintln ! ( "\n No logs found in the last {WARNING_THRESHOLD} polls." ) ;
259- } else {
260- eprintln ! (
261- "\n No logs found in the last {WARNING_THRESHOLD} polls. Consider adjusting your query filter: \" {}\" " ,
262- args. query
263- ) ;
264- }
265-
266- // Reset counter to show again after the next threshold
267- consecutive_new_only_count = 0 ;
268- }
269- } else {
270- consecutive_new_only_count = 0 ;
280+ let ( new_count, should_warn) = evaluate_all_new_batch_state (
281+ consecutive_new_only_count,
282+ fetched_count,
283+ unique_logs. len ( ) ,
284+ WARNING_THRESHOLD ,
285+ ) ;
286+ consecutive_new_only_count = new_count;
287+ if should_warn {
288+ let suggestion_suffix = if args. query . trim ( ) . is_empty ( ) {
289+ Cow :: Borrowed ( "" )
290+ } else {
291+ Cow :: Owned ( format ! ( " (current filter: \" {}\" )" , args. query) )
292+ } ;
293+ let msg = format ! (
294+ "Only new logs received in the last {WARNING_THRESHOLD} polls. You may be missing some logs. Consider narrowing your query filter{}." ,
295+ suggestion_suffix
296+ ) ;
297+ pending_warning = Some ( msg) ;
298+ }
271299
272- // Add new logs to table
300+ // Add new logs to table (if any)
301+ if !unique_logs. is_empty ( ) {
273302 for log in unique_logs {
274303 let row = table. add_row ( ) ;
275304 row. add ( & log. item_id )
@@ -290,6 +319,16 @@ fn execute_live_streaming(
290319 // Clear rows to free memory but keep the table structure for reuse
291320 table. clear_rows ( ) ;
292321 }
322+
323+ // Print any pending warning AFTER the batch rows to maximize visibility
324+ if let Some ( msg) = pending_warning. take ( ) {
325+ // Style: bold black text on bright yellow background, with spacing and banner
326+ const BANNER_WIDTH : usize = 100 ;
327+ let line = "=" . repeat ( BANNER_WIDTH ) ;
328+ let reset = "\x1b [0m" ;
329+ let style = "\x1b [30;103;1m" ; // black on bright yellow, bold
330+ eprintln ! ( "\n \n {}\n {} {} {}\n {}\n \n " , line, style, msg, reset, line) ;
331+ }
293332 }
294333 Err ( e) => {
295334 eprintln ! ( "Error fetching logs: {e}" ) ;
@@ -413,6 +452,35 @@ mod tests {
413452 assert_eq ! ( deduplicator. buffer. len( ) , 4 ) ;
414453 }
415454
455+ #[ test]
456+ fn test_evaluate_all_new_batch_state_increments_and_warns ( ) {
457+ let threshold = 3 ;
458+ // First all-new batch
459+ let ( count1, warn1) = evaluate_all_new_batch_state ( 0 , 5 , 5 , threshold) ;
460+ assert_eq ! ( count1, 1 ) ;
461+ assert ! ( !warn1) ;
462+
463+ // Second all-new batch
464+ let ( count2, warn2) = evaluate_all_new_batch_state ( count1, 2 , 2 , threshold) ;
465+ assert_eq ! ( count2, 2 ) ;
466+ assert ! ( !warn2) ;
467+
468+ // Third all-new batch should warn and reset
469+ let ( count3, warn3) = evaluate_all_new_batch_state ( count2, 10 , 10 , threshold) ;
470+ assert_eq ! ( count3, 0 ) ;
471+ assert ! ( warn3) ;
472+
473+ // Non all-new batch resets
474+ let ( count4, warn4) = evaluate_all_new_batch_state ( 2 , 4 , 3 , threshold) ;
475+ assert_eq ! ( count4, 0 ) ;
476+ assert ! ( !warn4) ;
477+
478+ // Empty fetch resets
479+ let ( count5, warn5) = evaluate_all_new_batch_state ( 2 , 0 , 0 , threshold) ;
480+ assert_eq ! ( count5, 0 ) ;
481+ assert ! ( !warn5) ;
482+ }
483+
416484 #[ test]
417485 fn test_is_numeric_project_id_purely_numeric ( ) {
418486 assert ! ( is_numeric_project_id( "123456" ) ) ;
0 commit comments