11use anyhow:: Result ;
22use clap:: Args ;
3+ use std:: collections:: HashSet ;
4+ use std:: time:: Duration ;
35
4- use crate :: api:: { Api , Dataset , FetchEventsOptions } ;
6+ use crate :: api:: { Api , Dataset , FetchEventsOptions , LogEntry } ;
57use crate :: config:: Config ;
68use crate :: utils:: formatting:: Table ;
79
@@ -14,6 +16,9 @@ const LOG_FIELDS: &[&str] = &[
1416 "message" ,
1517] ;
1618
19+ /// Maximum number of log entries to keep in memory for deduplication
20+ const MAX_DEDUP_BUFFER_SIZE : usize = 10_000 ;
21+
1722/// Arguments for listing logs
1823#[ derive( Args ) ]
1924pub ( super ) struct ListLogsArgs {
@@ -26,12 +31,20 @@ pub(super) struct ListLogsArgs {
2631 project : Option < String > ,
2732
2833 #[ arg( long = "max-rows" , default_value = "100" ) ]
29- #[ arg( help = "Maximum number of log entries to fetch and display (max 1000)." ) ]
34+ #[ arg( help = "Maximum number of log entries to fetch and display per request (max 1000)." ) ]
3035 max_rows : usize ,
3136
3237 #[ arg( long = "query" , default_value = "" ) ]
3338 #[ arg( help = "Query to filter logs. Example: \" level:error\" " ) ]
3439 query : String ,
40+
41+ #[ arg( long = "live" ) ]
42+ #[ arg( help = "Enable live streaming mode to continuously poll for new logs." ) ]
43+ live : bool ,
44+
45+ #[ arg( long = "poll-interval" , default_value = "2" ) ]
46+ #[ arg( help = "Polling interval in seconds for live streaming mode (default: 2)." ) ]
47+ poll_interval : u64 ,
3548}
3649
3750pub ( super ) fn execute ( args : ListLogsArgs ) -> Result < ( ) > {
@@ -63,7 +76,11 @@ pub(super) fn execute(args: ListLogsArgs) -> Result<()> {
6376 Some ( args. query . as_str ( ) )
6477 } ;
6578
66- execute_single_fetch ( & api, & org, & project, query, LOG_FIELDS , & args)
79+ if args. live {
80+ execute_live_streaming ( & api, & org, & project, query, LOG_FIELDS , & args)
81+ } else {
82+ execute_single_fetch ( & api, & org, & project, query, LOG_FIELDS , & args)
83+ }
6784}
6885
6986fn execute_single_fetch (
@@ -117,3 +134,252 @@ fn execute_single_fetch(
117134
118135 Ok ( ( ) )
119136}
137+
138+ /// Manages deduplication of log entries with a bounded buffer
139+ struct LogDeduplicator {
140+ /// Set of seen log IDs for quick lookup
141+ seen_ids : HashSet < String > ,
142+ /// Buffer of log entries in order (for maintaining size limit)
143+ buffer : Vec < LogEntry > ,
144+ /// Maximum size of the buffer
145+ max_size : usize ,
146+ }
147+
148+ impl LogDeduplicator {
149+ fn new ( max_size : usize ) -> Self {
150+ Self {
151+ seen_ids : HashSet :: new ( ) ,
152+ buffer : Vec :: new ( ) ,
153+ max_size,
154+ }
155+ }
156+
157+ /// Add new logs and return only the ones that haven't been seen before
158+ fn add_logs ( & mut self , new_logs : Vec < LogEntry > ) -> Vec < LogEntry > {
159+ let mut unique_logs = Vec :: new ( ) ;
160+
161+ for log in new_logs {
162+ if !self . seen_ids . contains ( & log. item_id ) {
163+ self . seen_ids . insert ( log. item_id . clone ( ) ) ;
164+ self . buffer . push ( log. clone ( ) ) ;
165+ unique_logs. push ( log) ;
166+ }
167+ }
168+
169+ // Maintain buffer size limit by removing oldest entries
170+ while self . buffer . len ( ) > self . max_size {
171+ let removed_log = self . buffer . remove ( 0 ) ;
172+ self . seen_ids . remove ( & removed_log. item_id ) ;
173+ }
174+
175+ unique_logs
176+ }
177+
178+ /// Get the number of unique logs seen so far
179+ fn total_seen ( & self ) -> usize {
180+ self . seen_ids . len ( )
181+ }
182+ }
183+
184+ fn execute_live_streaming (
185+ api : & Api ,
186+ org : & str ,
187+ project : & str ,
188+ query : Option < & str > ,
189+ fields : & [ & str ] ,
190+ args : & ListLogsArgs ,
191+ ) -> Result < ( ) > {
192+ let mut deduplicator = LogDeduplicator :: new ( MAX_DEDUP_BUFFER_SIZE ) ;
193+ let poll_duration = Duration :: from_secs ( args. poll_interval ) ;
194+ let mut consecutive_new_only_count = 0 ;
195+ const WARNING_THRESHOLD : usize = 3 ; // Show warning after 3 consecutive new-only responses
196+
197+ println ! ( "Starting live log streaming..." ) ;
198+ println ! (
199+ "Polling every {} seconds. Press Ctrl+C to stop." ,
200+ args. poll_interval
201+ ) ;
202+
203+ // Print header once
204+ let mut table = Table :: new ( ) ;
205+ table
206+ . title_row ( )
207+ . add ( "Item ID" )
208+ . add ( "Timestamp" )
209+ . add ( "Severity" )
210+ . add ( "Message" )
211+ . add ( "Trace" ) ;
212+
213+ loop {
214+ let options = FetchEventsOptions {
215+ dataset : Dataset :: OurLogs ,
216+ fields,
217+ project_id : Some ( project) ,
218+ cursor : None ,
219+ query,
220+ per_page : Some ( args. max_rows ) ,
221+ stats_period : Some ( "1h" ) ,
222+ sort : Some ( "-timestamp" ) ,
223+ } ;
224+
225+ match api
226+ . authenticated ( ) ?
227+ . fetch_organization_events ( org, & options)
228+ {
229+ Ok ( logs) => {
230+ let unique_logs = deduplicator. add_logs ( logs) ;
231+
232+ if unique_logs. is_empty ( ) {
233+ consecutive_new_only_count += 1 ;
234+
235+ if consecutive_new_only_count >= WARNING_THRESHOLD && args. query . is_empty ( ) {
236+ eprintln ! (
237+ "\n ⚠️ Warning: No new logs found for {consecutive_new_only_count} consecutive polls."
238+ ) ;
239+ eprintln ! ( " Consider using --query to filter logs, as you may be missing some entries." ) ;
240+ eprintln ! (
241+ " Example: --query \" level:error\" or --query \" message:*error*\" "
242+ ) ;
243+
244+ // Reset counter to avoid spam
245+ consecutive_new_only_count = 0 ;
246+ }
247+ } else {
248+ consecutive_new_only_count = 0 ;
249+
250+ // Print new logs
251+ for log in unique_logs {
252+ let row = table. add_row ( ) ;
253+ row. add ( & log. item_id )
254+ . add ( & log. timestamp )
255+ . add ( log. severity . as_deref ( ) . unwrap_or ( "" ) )
256+ . add ( log. message . as_deref ( ) . unwrap_or ( "" ) )
257+ . add ( log. trace . as_deref ( ) . unwrap_or ( "" ) ) ;
258+ }
259+
260+ table. print ( ) ;
261+ println ! ( "Total unique logs seen: {}" , deduplicator. total_seen( ) ) ;
262+ }
263+ }
264+ Err ( e) => {
265+ eprintln ! ( "Error fetching logs: {e}" ) ;
266+ }
267+ }
268+
269+ std:: thread:: sleep ( poll_duration) ;
270+ }
271+ }
272+
273+ #[ cfg( test) ]
274+ mod tests {
275+ use super :: * ;
276+
277+ fn create_test_log ( id : & str , message : & str ) -> LogEntry {
278+ LogEntry {
279+ item_id : id. to_owned ( ) ,
280+ trace : None ,
281+ severity : Some ( "info" . to_owned ( ) ) ,
282+ timestamp : "2025-01-01T00:00:00Z" . to_owned ( ) ,
283+ message : Some ( message. to_owned ( ) ) ,
284+ }
285+ }
286+
287+ #[ test]
288+ fn test_log_deduplicator_new ( ) {
289+ let deduplicator = LogDeduplicator :: new ( 100 ) ;
290+ assert_eq ! ( deduplicator. total_seen( ) , 0 ) ;
291+ }
292+
293+ #[ test]
294+ fn test_log_deduplicator_add_unique_logs ( ) {
295+ let mut deduplicator = LogDeduplicator :: new ( 10 ) ;
296+
297+ let log1 = create_test_log ( "1" , "test message 1" ) ;
298+ let log2 = create_test_log ( "2" , "test message 2" ) ;
299+
300+ let unique_logs = deduplicator. add_logs ( vec ! [ log1. clone( ) , log2. clone( ) ] ) ;
301+
302+ assert_eq ! ( unique_logs. len( ) , 2 ) ;
303+ assert_eq ! ( deduplicator. total_seen( ) , 2 ) ;
304+ }
305+
306+ #[ test]
307+ fn test_log_deduplicator_deduplicate_logs ( ) {
308+ let mut deduplicator = LogDeduplicator :: new ( 10 ) ;
309+
310+ let log1 = create_test_log ( "1" , "test message 1" ) ;
311+ let log2 = create_test_log ( "2" , "test message 2" ) ;
312+
313+ // Add logs first time
314+ let unique_logs1 = deduplicator. add_logs ( vec ! [ log1. clone( ) , log2. clone( ) ] ) ;
315+ assert_eq ! ( unique_logs1. len( ) , 2 ) ;
316+
317+ // Add same logs again
318+ let unique_logs2 = deduplicator. add_logs ( vec ! [ log1. clone( ) , log2. clone( ) ] ) ;
319+ assert_eq ! ( unique_logs2. len( ) , 0 ) ; // Should be empty as logs already seen
320+
321+ assert_eq ! ( deduplicator. total_seen( ) , 2 ) ;
322+ }
323+
324+ #[ test]
325+ fn test_log_deduplicator_buffer_size_limit ( ) {
326+ let mut deduplicator = LogDeduplicator :: new ( 3 ) ;
327+
328+ // Add 5 logs to a buffer with max size 3
329+ let logs = vec ! [
330+ create_test_log( "1" , "test message 1" ) ,
331+ create_test_log( "2" , "test message 2" ) ,
332+ create_test_log( "3" , "test message 3" ) ,
333+ create_test_log( "4" , "test message 4" ) ,
334+ create_test_log( "5" , "test message 5" ) ,
335+ ] ;
336+
337+ let unique_logs = deduplicator. add_logs ( logs) ;
338+ assert_eq ! ( unique_logs. len( ) , 5 ) ;
339+
340+ // After adding 5 logs to a buffer with max size 3, the oldest 2 should be evicted
341+ // So logs 1 and 2 should no longer be in the seen_ids set
342+ // Adding them again should return them as new logs
343+ let duplicate_logs = vec ! [
344+ create_test_log( "1" , "test message 1" ) ,
345+ create_test_log( "2" , "test message 2" ) ,
346+ ] ;
347+ let duplicate_unique_logs = deduplicator. add_logs ( duplicate_logs) ;
348+ assert_eq ! ( duplicate_unique_logs. len( ) , 2 ) ;
349+
350+ // Test that adding new logs still works
351+ let new_logs = vec ! [ create_test_log( "6" , "test message 6" ) ] ;
352+ let new_unique_logs = deduplicator. add_logs ( new_logs) ;
353+ assert_eq ! ( new_unique_logs. len( ) , 1 ) ;
354+ }
355+
356+ #[ test]
357+ fn test_log_deduplicator_mixed_new_and_old_logs ( ) {
358+ let mut deduplicator = LogDeduplicator :: new ( 10 ) ;
359+
360+ // Add initial logs
361+ let initial_logs = vec ! [
362+ create_test_log( "1" , "test message 1" ) ,
363+ create_test_log( "2" , "test message 2" ) ,
364+ ] ;
365+ let unique_logs1 = deduplicator. add_logs ( initial_logs) ;
366+ assert_eq ! ( unique_logs1. len( ) , 2 ) ;
367+
368+ // Add mix of new and old logs
369+ let mixed_logs = vec ! [
370+ create_test_log( "1" , "test message 1" ) , // old
371+ create_test_log( "3" , "test message 3" ) , // new
372+ create_test_log( "2" , "test message 2" ) , // old
373+ create_test_log( "4" , "test message 4" ) , // new
374+ ] ;
375+ let unique_logs2 = deduplicator. add_logs ( mixed_logs) ;
376+
377+ // Should only return the new logs (3 and 4)
378+ assert_eq ! ( unique_logs2. len( ) , 2 ) ;
379+ assert_eq ! ( unique_logs2[ 0 ] . item_id, "3" ) ;
380+ assert_eq ! ( unique_logs2[ 1 ] . item_id, "4" ) ;
381+
382+ assert_eq ! ( deduplicator. total_seen( ) , 4 ) ;
383+ assert_eq ! ( deduplicator. buffer. len( ) , 4 ) ;
384+ }
385+ }
0 commit comments