11use anyhow:: Result ;
22use clap:: Args ;
3+ use std:: collections:: HashSet ;
4+ use std:: time:: Duration ;
35
4- use crate :: api:: { Api , Dataset , FetchEventsOptions } ;
6+ use crate :: api:: { Api , Dataset , FetchEventsOptions , LogEntry } ;
57use crate :: config:: Config ;
68use crate :: utils:: formatting:: Table ;
79
@@ -26,6 +28,9 @@ const LOG_FIELDS: &[&str] = &[
2628 "message" ,
2729] ;
2830
31+ /// Maximum number of log entries to keep in memory for deduplication
32+ const MAX_DEDUP_BUFFER_SIZE : usize = 10_000 ;
33+
2934/// Arguments for listing logs
3035#[ derive( Args ) ]
3136pub ( super ) struct ListLogsArgs {
@@ -45,6 +50,14 @@ pub(super) struct ListLogsArgs {
4550 #[ arg( long = "query" , default_value = "" ) ]
4651 #[ arg( help = "Query to filter logs. Example: \" level:error\" " ) ]
4752 query : String ,
53+
54+ #[ arg( long = "live" ) ]
55+ #[ arg( help = "Enable live streaming mode to continuously poll for new logs." ) ]
56+ live : bool ,
57+
58+ #[ arg( long = "poll-interval" , default_value = "2" ) ]
59+ #[ arg( help = "Polling interval in seconds for live streaming mode." ) ]
60+ poll_interval : u64 ,
4861}
4962
5063pub ( super ) fn execute ( args : ListLogsArgs ) -> Result < ( ) > {
@@ -76,7 +89,11 @@ pub(super) fn execute(args: ListLogsArgs) -> Result<()> {
7689 Some ( args. query . as_str ( ) )
7790 } ;
7891
79- execute_single_fetch ( & api, & org, & project, query, LOG_FIELDS , & args)
92+ if args. live {
93+ execute_live_streaming ( & api, & org, & project, query, LOG_FIELDS , & args)
94+ } else {
95+ execute_single_fetch ( & api, & org, & project, query, LOG_FIELDS , & args)
96+ }
8097}
8198
8299fn execute_single_fetch (
@@ -129,3 +146,253 @@ fn execute_single_fetch(
129146
130147 Ok ( ( ) )
131148}
149+
150+ /// Manages deduplication of log entries with a bounded buffer
151+ struct LogDeduplicator {
152+ /// Set of seen log IDs for quick lookup
153+ seen_ids : HashSet < String > ,
154+ /// Buffer of log entries in order (for maintaining size limit)
155+ buffer : Vec < LogEntry > ,
156+ /// Maximum size of the buffer
157+ max_size : usize ,
158+ }
159+
160+ impl LogDeduplicator {
161+ fn new ( max_size : usize ) -> Self {
162+ Self {
163+ seen_ids : HashSet :: new ( ) ,
164+ buffer : Vec :: new ( ) ,
165+ max_size,
166+ }
167+ }
168+
169+ /// Add new logs and return only the ones that haven't been seen before
170+ fn add_logs ( & mut self , new_logs : Vec < LogEntry > ) -> Vec < LogEntry > {
171+ let mut unique_logs = Vec :: new ( ) ;
172+
173+ for log in new_logs {
174+ if !self . seen_ids . contains ( & log. item_id ) {
175+ self . seen_ids . insert ( log. item_id . clone ( ) ) ;
176+ self . buffer . push ( log. clone ( ) ) ;
177+ unique_logs. push ( log) ;
178+ }
179+ }
180+
181+ // Maintain buffer size limit by removing oldest entries
182+ while self . buffer . len ( ) > self . max_size {
183+ let removed_log = self . buffer . remove ( 0 ) ;
184+ self . seen_ids . remove ( & removed_log. item_id ) ;
185+ }
186+
187+ unique_logs
188+ }
189+ }
190+
191+ fn execute_live_streaming (
192+ api : & Api ,
193+ org : & str ,
194+ project : & str ,
195+ query : Option < & str > ,
196+ fields : & [ & str ] ,
197+ args : & ListLogsArgs ,
198+ ) -> Result < ( ) > {
199+ let mut deduplicator = LogDeduplicator :: new ( MAX_DEDUP_BUFFER_SIZE ) ;
200+ let poll_duration = Duration :: from_secs ( args. poll_interval ) ;
201+ let mut consecutive_new_only_count = 0 ;
202+ const WARNING_THRESHOLD : usize = 3 ; // Show warning after 3 consecutive new-only responses
203+
204+ println ! ( "Starting live log streaming..." ) ;
205+ println ! (
206+ "Polling every {} seconds. Press Ctrl+C to stop." ,
207+ args. poll_interval
208+ ) ;
209+
210+ // Set up table with headers and print header once
211+ let mut table = Table :: new ( ) ;
212+ table
213+ . title_row ( )
214+ . add ( "Item ID" )
215+ . add ( "Timestamp" )
216+ . add ( "Severity" )
217+ . add ( "Message" )
218+ . add ( "Trace" ) ;
219+
220+ let mut header_printed = false ;
221+
222+ loop {
223+ let options = FetchEventsOptions {
224+ dataset : Dataset :: OurLogs ,
225+ fields,
226+ project_id : Some ( project) ,
227+ cursor : None ,
228+ query,
229+ per_page : Some ( args. max_rows ) ,
230+ stats_period : Some ( "1h" ) ,
231+ sort : Some ( "-timestamp" ) ,
232+ } ;
233+
234+ match api
235+ . authenticated ( ) ?
236+ . fetch_organization_events ( org, & options)
237+ {
238+ Ok ( logs) => {
239+ let unique_logs = deduplicator. add_logs ( logs) ;
240+
241+ if unique_logs. is_empty ( ) {
242+ consecutive_new_only_count += 1 ;
243+
244+ if consecutive_new_only_count >= WARNING_THRESHOLD && args. query . is_empty ( ) {
245+ eprintln ! (
246+ "\n ⚠️ Warning: No new logs found for {consecutive_new_only_count} consecutive polls."
247+ ) ;
248+
249+ // Reset counter to avoid spam
250+ consecutive_new_only_count = 0 ;
251+ }
252+ } else {
253+ consecutive_new_only_count = 0 ;
254+
255+ // Add new logs to table
256+ for log in unique_logs {
257+ let row = table. add_row ( ) ;
258+ row. add ( & log. item_id )
259+ . add ( & log. timestamp )
260+ . add ( log. severity . as_deref ( ) . unwrap_or ( "" ) )
261+ . add ( log. message . as_deref ( ) . unwrap_or ( "" ) )
262+ . add ( log. trace . as_deref ( ) . unwrap_or ( "" ) ) ;
263+ }
264+
265+ if !header_printed {
266+ // Print header with first data batch so column widths match actual data
267+ table. print_table_start ( ) ;
268+ header_printed = true ;
269+ } else {
270+ // Print only the rows (without table borders) for subsequent batches
271+ table. print_rows_only ( ) ;
272+ }
273+ // Clear rows to free memory but keep the table structure for reuse
274+ table. clear_rows ( ) ;
275+ }
276+ }
277+ Err ( e) => {
278+ eprintln ! ( "Error fetching logs: {e}" ) ;
279+ }
280+ }
281+
282+ std:: thread:: sleep ( poll_duration) ;
283+ }
284+ }
285+
286+ #[ cfg( test) ]
287+ mod tests {
288+ use super :: * ;
289+
290+ fn create_test_log ( id : & str , message : & str ) -> LogEntry {
291+ LogEntry {
292+ item_id : id. to_owned ( ) ,
293+ trace : None ,
294+ severity : Some ( "info" . to_owned ( ) ) ,
295+ timestamp : "2025-01-01T00:00:00Z" . to_owned ( ) ,
296+ message : Some ( message. to_owned ( ) ) ,
297+ }
298+ }
299+
300+ #[ test]
301+ fn test_log_deduplicator_new ( ) {
302+ let deduplicator = LogDeduplicator :: new ( 100 ) ;
303+ assert_eq ! ( deduplicator. seen_ids. len( ) , 0 ) ;
304+ }
305+
306+ #[ test]
307+ fn test_log_deduplicator_add_unique_logs ( ) {
308+ let mut deduplicator = LogDeduplicator :: new ( 10 ) ;
309+
310+ let log1 = create_test_log ( "1" , "test message 1" ) ;
311+ let log2 = create_test_log ( "2" , "test message 2" ) ;
312+
313+ let unique_logs = deduplicator. add_logs ( vec ! [ log1. clone( ) , log2. clone( ) ] ) ;
314+
315+ assert_eq ! ( unique_logs. len( ) , 2 ) ;
316+ assert_eq ! ( deduplicator. seen_ids. len( ) , 2 ) ;
317+ }
318+
319+ #[ test]
320+ fn test_log_deduplicator_deduplicate_logs ( ) {
321+ let mut deduplicator = LogDeduplicator :: new ( 10 ) ;
322+
323+ let log1 = create_test_log ( "1" , "test message 1" ) ;
324+ let log2 = create_test_log ( "2" , "test message 2" ) ;
325+
326+ // Add logs first time
327+ let unique_logs1 = deduplicator. add_logs ( vec ! [ log1. clone( ) , log2. clone( ) ] ) ;
328+ assert_eq ! ( unique_logs1. len( ) , 2 ) ;
329+
330+ // Add same logs again
331+ let unique_logs2 = deduplicator. add_logs ( vec ! [ log1. clone( ) , log2. clone( ) ] ) ;
332+ assert_eq ! ( unique_logs2. len( ) , 0 ) ; // Should be empty as logs already seen
333+
334+ assert_eq ! ( deduplicator. seen_ids. len( ) , 2 ) ;
335+ }
336+
337+ #[ test]
338+ fn test_log_deduplicator_buffer_size_limit ( ) {
339+ let mut deduplicator = LogDeduplicator :: new ( 3 ) ;
340+
341+ // Add 5 logs to a buffer with max size 3
342+ let logs = vec ! [
343+ create_test_log( "1" , "test message 1" ) ,
344+ create_test_log( "2" , "test message 2" ) ,
345+ create_test_log( "3" , "test message 3" ) ,
346+ create_test_log( "4" , "test message 4" ) ,
347+ create_test_log( "5" , "test message 5" ) ,
348+ ] ;
349+
350+ let unique_logs = deduplicator. add_logs ( logs) ;
351+ assert_eq ! ( unique_logs. len( ) , 5 ) ;
352+
353+ // After adding 5 logs to a buffer with max size 3, the oldest 2 should be evicted
354+ // So logs 1 and 2 should no longer be in the seen_ids set
355+ // Adding them again should return them as new logs
356+ let duplicate_logs = vec ! [
357+ create_test_log( "1" , "test message 1" ) ,
358+ create_test_log( "2" , "test message 2" ) ,
359+ ] ;
360+ let duplicate_unique_logs = deduplicator. add_logs ( duplicate_logs) ;
361+ assert_eq ! ( duplicate_unique_logs. len( ) , 2 ) ;
362+
363+ // Test that adding new logs still works
364+ let new_logs = vec ! [ create_test_log( "6" , "test message 6" ) ] ;
365+ let new_unique_logs = deduplicator. add_logs ( new_logs) ;
366+ assert_eq ! ( new_unique_logs. len( ) , 1 ) ;
367+ }
368+
369+ #[ test]
370+ fn test_log_deduplicator_mixed_new_and_old_logs ( ) {
371+ let mut deduplicator = LogDeduplicator :: new ( 10 ) ;
372+
373+ // Add initial logs
374+ let initial_logs = vec ! [
375+ create_test_log( "1" , "test message 1" ) ,
376+ create_test_log( "2" , "test message 2" ) ,
377+ ] ;
378+ let unique_logs1 = deduplicator. add_logs ( initial_logs) ;
379+ assert_eq ! ( unique_logs1. len( ) , 2 ) ;
380+
381+ // Add mix of new and old logs
382+ let mixed_logs = vec ! [
383+ create_test_log( "1" , "test message 1" ) , // old
384+ create_test_log( "3" , "test message 3" ) , // new
385+ create_test_log( "2" , "test message 2" ) , // old
386+ create_test_log( "4" , "test message 4" ) , // new
387+ ] ;
388+ let unique_logs2 = deduplicator. add_logs ( mixed_logs) ;
389+
390+ // Should only return the new logs (3 and 4)
391+ assert_eq ! ( unique_logs2. len( ) , 2 ) ;
392+ assert_eq ! ( unique_logs2[ 0 ] . item_id, "3" ) ;
393+ assert_eq ! ( unique_logs2[ 1 ] . item_id, "4" ) ;
394+
395+ assert_eq ! ( deduplicator. seen_ids. len( ) , 4 ) ;
396+ assert_eq ! ( deduplicator. buffer. len( ) , 4 ) ;
397+ }
398+ }
0 commit comments