@@ -188,20 +188,20 @@ impl LogDeduplicator {
188188 }
189189 }
190190
191- /// Add new logs and return only the ones that haven't been seen before
192- fn add_logs ( & mut self , new_logs : Vec < LogEntry > ) -> Vec < LogEntry > {
193- let mut unique_logs = Vec :: new ( ) ;
194-
195- for log in new_logs {
196- // If the log ID is not in the cache, it's a new log
197- if self . seen_ids . get ( & log . item_id ) . is_none ( ) {
198- // Add to cache (this will evict oldest entries if at capacity)
199- self . seen_ids . put ( log. item_id . clone ( ) , ( ) ) ;
200- unique_logs . push ( log ) ;
201- }
202- }
203-
204- unique_logs
191+ /// Add new logs and return an iterator overonly the ones that haven't been seen before
192+ fn add_logs < ' a > ( & ' a mut self , new_logs : & ' a [ LogEntry ] ) -> impl Iterator < Item = & ' a LogEntry > {
193+ new_logs
194+ . iter ( )
195+ . filter ( | log| match self . seen_ids . get ( & log . item_id ) {
196+ // If log ID is in the cache, we have seen it already
197+ Some ( _ ) => false ,
198+
199+ // If log ID is not in the cache, we have not seen it yet
200+ None => {
201+ self . seen_ids . put ( log . item_id . clone ( ) , ( ) ) ;
202+ true
203+ }
204+ } )
205205 }
206206}
207207
@@ -268,13 +268,10 @@ fn execute_live_streaming(
268268
269269 println ! ( "Starting live log streaming..." ) ;
270270 println ! (
271- "Polling every {} seconds. Press Ctrl+ C to stop." ,
271+ "Polling every {} seconds. Press ⌃ C to stop." ,
272272 args. poll_interval
273273 ) ;
274274
275- // Holds a warning message to be printed after the current batch of logs for visibility
276- let mut pending_warning: Option < String > = None ;
277-
278275 loop {
279276 let options = FetchEventsOptions {
280277 dataset : Dataset :: Logs ,
@@ -293,37 +290,35 @@ fn execute_live_streaming(
293290 {
294291 Ok ( logs) => {
295292 let fetched_count = logs. len ( ) ;
296- let unique_logs = deduplicator. add_logs ( logs) ;
293+ let unique_logs = deduplicator. add_logs ( & logs) . collect :: < Vec < _ > > ( ) ;
297294
298295 let should_warn = new_only_tracker. process_batch ( fetched_count, unique_logs. len ( ) ) ;
296+
297+ // Print new logs in human-readable format
298+ for log in unique_logs {
299+ println ! (
300+ "{} | {} | {} | {}" ,
301+ log. timestamp,
302+ log. severity. as_deref( ) . unwrap_or( "" ) ,
303+ log. trace. as_deref( ) . unwrap_or( "" ) ,
304+ log. message. as_deref( ) . unwrap_or( "" )
305+ ) ;
306+ }
307+
308+ // Print any pending warning AFTER the batch logs to maximize visibility
299309 if should_warn {
310+ // compute warning message
300311 let suggestion_suffix = if args. query . trim ( ) . is_empty ( ) {
301312 ""
302313 } else {
303314 & format ! ( " (current filter: \" {}\" )" , args. query)
304315 } ;
316+
305317 let msg = format ! (
306318 "Only new logs received in the last {} polls. You may be missing some logs. Consider narrowing your query filter{suggestion_suffix}." ,
307319 new_only_tracker. warning_threshold
308320 ) ;
309- pending_warning = Some ( msg) ;
310- }
311321
312- // Print new logs in human-readable format
313- if !unique_logs. is_empty ( ) {
314- for log in unique_logs {
315- println ! (
316- "{} | {} | {} | {}" ,
317- log. timestamp,
318- log. severity. as_deref( ) . unwrap_or( "" ) ,
319- log. trace. as_deref( ) . unwrap_or( "" ) ,
320- log. message. as_deref( ) . unwrap_or( "" )
321- ) ;
322- }
323- }
324-
325- // Print any pending warning AFTER the batch logs to maximize visibility
326- if let Some ( msg) = pending_warning. take ( ) {
327322 // Style: bold black text on bright yellow background, with spacing and banner
328323 const BANNER_WIDTH : usize = 100 ;
329324 let line = "=" . repeat ( BANNER_WIDTH ) ;
@@ -368,7 +363,8 @@ mod tests {
368363 let log1 = create_test_log ( "1" , "test message 1" ) ;
369364 let log2 = create_test_log ( "2" , "test message 2" ) ;
370365
371- let unique_logs = deduplicator. add_logs ( vec ! [ log1. clone( ) , log2. clone( ) ] ) ;
366+ let logs = vec ! [ log1. clone( ) , log2. clone( ) ] ;
367+ let unique_logs = deduplicator. add_logs ( & logs) . collect :: < Vec < _ > > ( ) ;
372368
373369 assert_eq ! ( unique_logs. len( ) , 2 ) ;
374370 assert_eq ! ( deduplicator. seen_ids. len( ) , 2 ) ;
@@ -382,11 +378,12 @@ mod tests {
382378 let log2 = create_test_log ( "2" , "test message 2" ) ;
383379
384380 // Add logs first time
385- let unique_logs1 = deduplicator. add_logs ( vec ! [ log1. clone( ) , log2. clone( ) ] ) ;
381+ let logs = vec ! [ log1. clone( ) , log2. clone( ) ] ;
382+ let unique_logs1 = deduplicator. add_logs ( & logs) . collect :: < Vec < _ > > ( ) ;
386383 assert_eq ! ( unique_logs1. len( ) , 2 ) ;
387384
388385 // Add same logs again
389- let unique_logs2 = deduplicator. add_logs ( vec ! [ log1 . clone ( ) , log2 . clone ( ) ] ) ;
386+ let unique_logs2 = deduplicator. add_logs ( & logs ) . collect :: < Vec < _ > > ( ) ;
390387 assert_eq ! ( unique_logs2. len( ) , 0 ) ; // Should be empty as logs already seen
391388
392389 assert_eq ! ( deduplicator. seen_ids. len( ) , 2 ) ;
@@ -405,7 +402,7 @@ mod tests {
405402 create_test_log( "5" , "test message 5" ) ,
406403 ] ;
407404
408- let unique_logs = deduplicator. add_logs ( logs) ;
405+ let unique_logs = deduplicator. add_logs ( & logs) . collect :: < Vec < _ > > ( ) ;
409406 assert_eq ! ( unique_logs. len( ) , 5 ) ;
410407
411408 // After adding 5 logs to a buffer with max size 3, the oldest 2 should be evicted
@@ -415,12 +412,12 @@ mod tests {
415412 create_test_log( "1" , "test message 1" ) ,
416413 create_test_log( "2" , "test message 2" ) ,
417414 ] ;
418- let duplicate_unique_logs = deduplicator. add_logs ( duplicate_logs) ;
415+ let duplicate_unique_logs = deduplicator. add_logs ( & duplicate_logs) . collect :: < Vec < _ > > ( ) ;
419416 assert_eq ! ( duplicate_unique_logs. len( ) , 2 ) ;
420417
421418 // Test that adding new logs still works
422419 let new_logs = vec ! [ create_test_log( "6" , "test message 6" ) ] ;
423- let new_unique_logs = deduplicator. add_logs ( new_logs) ;
420+ let new_unique_logs = deduplicator. add_logs ( & new_logs) . collect :: < Vec < _ > > ( ) ;
424421 assert_eq ! ( new_unique_logs. len( ) , 1 ) ;
425422 }
426423
0 commit comments