diff --git a/Cargo.lock b/Cargo.lock index 010d0a3463..e4d3e39ac5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -60,6 +60,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -953,6 +959,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -1195,6 +1207,17 @@ dependencies = [ "serde", ] +[[package]] +name = "hashbrown" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash", +] + [[package]] name = "heck" version = "0.5.0" @@ -1760,6 +1783,15 @@ version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +[[package]] +name = "lru" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86ea4e65087ff52f3862caff188d489f1fab49a0cb09e01b2e3f1a617b10aaed" +dependencies = [ + "hashbrown 0.15.5", +] + [[package]] name = "lzma-rs" version = "0.3.0" @@ -2723,6 +2755,7 @@ dependencies = [ "lazy_static", "libc", "log", + "lru", "mac-process-info", "magic_string", "mockito", diff --git a/Cargo.toml b/Cargo.toml index 5aa663d144..3f0b91a5cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,6 +80,7 @@ data-encoding = "2.3.3" magic_string = "0.3.4" chrono-tz = "0.8.4" secrecy = "0.8.0" +lru = "0.16.0" [dev-dependencies] assert_cmd = "2.0.11" diff --git a/src/api/mod.rs b/src/api/mod.rs index 41ac6f822c..bc8ac2b0d7 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -2519,7 +2519,7 @@ struct LogsResponse { } /// Log entry structure from the logs API -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Clone)] pub struct LogEntry { #[serde(rename = "sentry.item_id")] pub item_id: String, diff --git a/src/commands/logs/list.rs b/src/commands/logs/list.rs index a21b0840fc..64b97b6d72 100644 --- a/src/commands/logs/list.rs +++ b/src/commands/logs/list.rs @@ -1,18 +1,21 @@ use std::borrow::Cow; +use std::num::NonZeroUsize; +use std::time::Duration; use anyhow::Result; use clap::Args; +use lru::LruCache; -use crate::api::{Api, Dataset, FetchEventsOptions}; +use crate::api::{Api, Dataset, FetchEventsOptions, LogEntry}; use crate::config::Config; use crate::utils::formatting::Table; const MAX_ROWS_RANGE: std::ops::RangeInclusive = 1..=1000; /// Validate that max_rows is in the allowed range -fn validate_max_rows(s: &str) -> Result { - let value = s.parse()?; +fn validate_max_rows(s: &str) -> Result { + let value = s.parse::()?; if MAX_ROWS_RANGE.contains(&value) { - Ok(value) + NonZeroUsize::new(value).ok_or_else(|| anyhow::anyhow!("max-rows must be greater than 0")) } else { Err(anyhow::anyhow!( "max-rows must be between {} and {}", @@ -22,6 +25,16 @@ fn validate_max_rows(s: &str) -> Result { } } +/// Validate that poll-interval is a positive integer (> 0) +fn validate_poll_interval(s: &str) -> Result { + let value = s.parse()?; + if value > 0 { + Ok(value) + } else { + Err(anyhow::anyhow!("poll-interval must be a positive integer")) + } +} + /// Check if a project identifier is numeric (project ID) or string (project slug) fn is_numeric_project_id(project: &str) -> bool { !project.is_empty() && project.chars().all(|c| c.is_ascii_digit()) @@ -50,11 +63,20 @@ pub(super) struct ListLogsArgs { #[arg(long = "max-rows", default_value = "100")] #[arg(value_parser = validate_max_rows)] #[arg(help = format!("Maximum number of log entries to fetch and display (max {}).", MAX_ROWS_RANGE.end()))] - max_rows: usize, + max_rows: NonZeroUsize, #[arg(long = "query", default_value = "")] #[arg(help = "Query to filter logs. Example: \"level:error\"")] query: String, + + #[arg(long = "live")] + #[arg(help = "Live stream logs.")] + live: bool, + + #[arg(long = "poll-interval", default_value = "2")] + #[arg(value_parser = validate_poll_interval)] + #[arg(help = "Poll interval in seconds (must be > 0). Only used when --live is specified.")] + poll_interval: u64, } pub(super) fn execute(args: ListLogsArgs) -> Result<()> { @@ -90,7 +112,11 @@ pub(super) fn execute(args: ListLogsArgs) -> Result<()> { (Cow::Owned(query), None) }; - execute_single_fetch(&api, org, project_id, &query, LOG_FIELDS, &args) + if args.live { + execute_live_streaming(&api, org, project_id, &query, LOG_FIELDS, &args) + } else { + execute_single_fetch(&api, org, project_id, &query, LOG_FIELDS, &args) + } } fn execute_single_fetch( @@ -107,7 +133,7 @@ fn execute_single_fetch( project_id, cursor: None, query, - per_page: args.max_rows, + per_page: args.max_rows.get(), stats_period: "90d", sort: "-timestamp", }; @@ -125,7 +151,7 @@ fn execute_single_fetch( .add("Message") .add("Trace"); - for log in logs.iter().take(args.max_rows) { + for log in logs.iter().take(args.max_rows.get()) { let row = table.add_row(); row.add(&log.item_id) .add(&log.timestamp) @@ -143,10 +169,289 @@ fn execute_single_fetch( Ok(()) } +/// Manages deduplication of log entries using an LRU cache +struct LogDeduplicator { + /// LRU cache of seen log IDs + seen_ids: LruCache, +} + +impl LogDeduplicator { + fn new(max_size: NonZeroUsize) -> Self { + Self { + seen_ids: LruCache::new(max_size), + } + } + + /// Add new logs and return an iterator overonly the ones that haven't been seen before + fn add_logs<'a>(&'a mut self, new_logs: &'a [LogEntry]) -> impl Iterator { + new_logs + .iter() + .filter(|log| match self.seen_ids.get(&log.item_id) { + // If log ID is in the cache, we have seen it already + Some(_) => false, + + // If log ID is not in the cache, we have not seen it yet + None => { + self.seen_ids.put(log.item_id.clone(), ()); + true + } + }) + } +} + +/// Tracks consecutive batches of all-new logs and manages warning state. +/// +/// A batch is "all-new" when every fetched log is unique (no duplicates). +/// This struct tracks how many consecutive all-new batches we've seen and +/// warns when the count reaches the threshold, suggesting the user might be +/// missing some logs due to overly broad filtering. +#[derive(Debug)] +struct ConsecutiveNewOnlyTracker { + consecutive_count: usize, + warning_threshold: usize, +} + +impl ConsecutiveNewOnlyTracker { + /// Creates a new tracker with the specified warning threshold. + fn new(warning_threshold: usize) -> Self { + Self { + consecutive_count: 0, + warning_threshold, + } + } + + /// Processes a new batch and returns whether to show a warning. + /// + /// A batch is considered "all-new" if `fetched_count > 0` and `unique_count == fetched_count`. + /// Returns `true` when the warning threshold is reached, `false` otherwise. + fn process_batch(&mut self, fetched_count: usize, unique_count: usize) -> bool { + let is_all_new_batch = fetched_count > 0 && unique_count == fetched_count; + + if is_all_new_batch { + self.consecutive_count += 1; + if self.consecutive_count >= self.warning_threshold { + self.consecutive_count = 0; // Reset counter + true // Show warning + } else { + false // No warning yet + } + } else { + self.consecutive_count = 0; // Reset counter + false // No warning + } + } + + /// Gets the current consecutive count (useful for debugging/testing). + #[cfg(test)] + fn consecutive_count(&self) -> usize { + self.consecutive_count + } +} + +fn execute_live_streaming( + api: &Api, + org: &str, + project: Option<&str>, + query: &str, + fields: &[&str], + args: &ListLogsArgs, +) -> Result<()> { + let mut deduplicator = LogDeduplicator::new(args.max_rows); + let poll_duration = Duration::from_secs(args.poll_interval); + let mut new_only_tracker = ConsecutiveNewOnlyTracker::new(3); // Warn after 3 consecutive batches of only new logs + + println!("Starting live log streaming..."); + println!( + "Polling every {} seconds. Press ⌃C to stop.", + args.poll_interval + ); + + loop { + let options = FetchEventsOptions { + dataset: Dataset::Logs, + fields, + project_id: project, + cursor: None, + query, + per_page: args.max_rows.get(), + stats_period: "10m", + sort: "-timestamp", + }; + + match api + .authenticated()? + .fetch_organization_events(org, &options) + { + Ok(logs) => { + let fetched_count = logs.len(); + let unique_logs = deduplicator.add_logs(&logs).collect::>(); + + let should_warn = new_only_tracker.process_batch(fetched_count, unique_logs.len()); + + // Print new logs in human-readable format + for log in unique_logs { + println!( + "{} | {} | {} | {}", + log.timestamp, + log.severity.as_deref().unwrap_or(""), + log.trace.as_deref().unwrap_or(""), + log.message.as_deref().unwrap_or("") + ); + } + + // Print any pending warning AFTER the batch logs to maximize visibility + if should_warn { + // compute warning message + let suggestion_suffix = if args.query.trim().is_empty() { + "" + } else { + &format!(" (current filter: \"{}\")", args.query) + }; + + let msg = format!( + "Only new logs received in the last {} polls. You may be missing some logs. Consider narrowing your query filter{suggestion_suffix}.", + new_only_tracker.warning_threshold + ); + + // Style: bold black text on bright yellow background, with spacing and banner + const BANNER_WIDTH: usize = 100; + let line = "=".repeat(BANNER_WIDTH); + let reset = "\x1b[0m"; + let style = "\x1b[30;103;1m"; // black on bright yellow, bold + eprintln!("\n\n{line}\n{style} {msg} {reset}\n{line}\n\n"); + } + } + Err(e) => { + eprintln!("Error fetching logs: {e}"); + } + } + + std::thread::sleep(poll_duration); + } +} + #[cfg(test)] mod tests { use super::*; + fn create_test_log(id: &str, message: &str) -> LogEntry { + LogEntry { + item_id: id.to_owned(), + trace: None, + severity: Some("info".to_owned()), + timestamp: "2025-01-01T00:00:00Z".to_owned(), + message: Some(message.to_owned()), + } + } + + #[test] + fn test_log_deduplicator_new() { + let deduplicator = LogDeduplicator::new(NonZeroUsize::new(100).unwrap()); + assert_eq!(deduplicator.seen_ids.len(), 0); + } + + #[test] + fn test_log_deduplicator_add_unique_logs() { + let mut deduplicator = LogDeduplicator::new(NonZeroUsize::new(10).unwrap()); + + let log1 = create_test_log("1", "test message 1"); + let log2 = create_test_log("2", "test message 2"); + + let logs = vec![log1.clone(), log2.clone()]; + let unique_logs = deduplicator.add_logs(&logs).collect::>(); + + assert_eq!(unique_logs.len(), 2); + assert_eq!(deduplicator.seen_ids.len(), 2); + } + + #[test] + fn test_log_deduplicator_deduplicate_logs() { + let mut deduplicator = LogDeduplicator::new(NonZeroUsize::new(10).unwrap()); + + let log1 = create_test_log("1", "test message 1"); + let log2 = create_test_log("2", "test message 2"); + + // Add logs first time + let logs = vec![log1.clone(), log2.clone()]; + let unique_logs1 = deduplicator.add_logs(&logs).collect::>(); + assert_eq!(unique_logs1.len(), 2); + + // Add same logs again + let unique_logs2 = deduplicator.add_logs(&logs).collect::>(); + assert_eq!(unique_logs2.len(), 0); // Should be empty as logs already seen + + assert_eq!(deduplicator.seen_ids.len(), 2); + } + + #[test] + fn test_log_deduplicator_buffer_size_limit() { + let mut deduplicator = LogDeduplicator::new(NonZeroUsize::new(3).unwrap()); + + // Add 5 logs to a buffer with max size 3 + let logs = vec![ + create_test_log("1", "test message 1"), + create_test_log("2", "test message 2"), + create_test_log("3", "test message 3"), + create_test_log("4", "test message 4"), + create_test_log("5", "test message 5"), + ]; + + let unique_logs = deduplicator.add_logs(&logs).collect::>(); + assert_eq!(unique_logs.len(), 5); + + // After adding 5 logs to a buffer with max size 3, the oldest 2 should be evicted + // So logs 1 and 2 should no longer be in the seen_ids set + // Adding them again should return them as new logs + let duplicate_logs = vec![ + create_test_log("1", "test message 1"), + create_test_log("2", "test message 2"), + ]; + let duplicate_unique_logs = deduplicator.add_logs(&duplicate_logs).collect::>(); + assert_eq!(duplicate_unique_logs.len(), 2); + + // Test that adding new logs still works + let new_logs = vec![create_test_log("6", "test message 6")]; + let new_unique_logs = deduplicator.add_logs(&new_logs).collect::>(); + assert_eq!(new_unique_logs.len(), 1); + } + + #[test] + fn test_consecutive_new_only_tracker_creation() { + let tracker = ConsecutiveNewOnlyTracker::new(5); + assert_eq!(tracker.consecutive_count(), 0); + assert_eq!(tracker.warning_threshold, 5); + } + + #[test] + fn test_consecutive_new_only_tracker_increments_and_warns() { + let mut tracker = ConsecutiveNewOnlyTracker::new(3); + + // First all-new batch + let warn1 = tracker.process_batch(5, 5); + assert_eq!(tracker.consecutive_count(), 1); + assert!(!warn1); + + // Second all-new batch + let warn2 = tracker.process_batch(2, 2); + assert_eq!(tracker.consecutive_count(), 2); + assert!(!warn2); + + // Third all-new batch should warn and reset + let warn3 = tracker.process_batch(10, 10); + assert_eq!(tracker.consecutive_count(), 0); + assert!(warn3); + + // Non all-new batch resets + let warn4 = tracker.process_batch(4, 3); + assert_eq!(tracker.consecutive_count(), 0); + assert!(!warn4); + + // Empty fetch resets + let warn5 = tracker.process_batch(0, 0); + assert_eq!(tracker.consecutive_count(), 0); + assert!(!warn5); + } + #[test] fn test_is_numeric_project_id_purely_numeric() { assert!(is_numeric_project_id("123456")); diff --git a/tests/integration/_cases/logs/logs-list-help.trycmd b/tests/integration/_cases/logs/logs-list-help.trycmd index dc4e47d0c4..8329c4874c 100644 --- a/tests/integration/_cases/logs/logs-list-help.trycmd +++ b/tests/integration/_cases/logs/logs-list-help.trycmd @@ -33,9 +33,17 @@ Options: [default: ] + --live + Live stream logs. + --log-level Set the log output verbosity. [possible values: trace, debug, info, warn, error] + --poll-interval + Poll interval in seconds (must be > 0). Only used when --live is specified. + + [default: 2] + --quiet Do not print any output while preserving correct exit code. This flag is currently implemented only for selected subcommands. @@ -45,4 +53,4 @@ Options: -h, --help Print help (see a summary with '-h') -``` \ No newline at end of file +```