diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index 401524849..75137ab9a 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -3783,6 +3783,16 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-serde" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1" +dependencies = [ + "serde", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.22" @@ -3792,11 +3802,14 @@ dependencies = [ "matchers", "once_cell", "regex-automata", + "serde", + "serde_json", "sharded-slab", "thread_local", "tracing", "tracing-core", "tracing-log", + "tracing-serde", ] [[package]] diff --git a/bottlecap/Cargo.toml b/bottlecap/Cargo.toml index e3745d32a..b7b825b96 100644 --- a/bottlecap/Cargo.toml +++ b/bottlecap/Cargo.toml @@ -34,7 +34,7 @@ tokio = { version = "1.47", default-features = false, features = ["macros", "rt- tokio-util = { version = "0.7", default-features = false } tracing = { version = "0.1", default-features = false } tracing-core = { version = "0.1", default-features = false } -tracing-subscriber = { version = "0.3", default-features = false, features = ["std", "registry", "fmt", "env-filter", "tracing-log"] } +tracing-subscriber = { version = "0.3", default-features = false, features = ["std", "registry", "fmt", "env-filter", "tracing-log", "json"] } hmac = { version = "0.12", default-features = false } sha2 = { version = "0.10", default-features = false } hex = { version = "0.4", default-features = false, features = ["std"] } diff --git a/bottlecap/LICENSE-3rdparty.csv b/bottlecap/LICENSE-3rdparty.csv index 85e333834..7eb8750fd 100644 --- a/bottlecap/LICENSE-3rdparty.csv +++ b/bottlecap/LICENSE-3rdparty.csv @@ -235,6 +235,7 @@ tracing,https://github.com/tokio-rs/tracing,MIT,"Eliza Weisman , Eliza Weisman , David Barsky " tracing-core,https://github.com/tokio-rs/tracing,MIT,Tokio Contributors tracing-log,https://github.com/tokio-rs/tracing,MIT,Tokio Contributors +tracing-serde,https://github.com/tokio-rs/tracing,MIT,Tokio Contributors tracing-subscriber,https://github.com/tokio-rs/tracing,MIT,"Eliza Weisman , David Barsky , Tokio Contributors " try-lock,https://github.com/seanmonstar/try-lock,MIT,Sean McArthur twoway,https://github.com/bluss/twoway,MIT OR Apache-2.0,bluss diff --git a/bottlecap/src/logger.rs b/bottlecap/src/logger.rs index c28587a84..72d7c111f 100644 --- a/bottlecap/src/logger.rs +++ b/bottlecap/src/logger.rs @@ -9,6 +9,23 @@ use tracing_subscriber::registry::LookupSpan; #[derive(Debug, Clone, Copy)] pub struct Formatter; +/// Visitor that captures the message from tracing event fields. +struct MessageVisitor(String); + +impl tracing::field::Visit for MessageVisitor { + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn fmt::Debug) { + if field.name() == "message" { + self.0 = format!("{value:?}"); + } + } + + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + if field.name() == "message" { + self.0 = value.to_string(); + } + } +} + impl FormatEvent for Formatter where S: Subscriber + for<'a> LookupSpan<'a>, @@ -20,36 +37,166 @@ where mut writer: format::Writer<'_>, event: &Event<'_>, ) -> fmt::Result { - // Format values from the event's's metadata: let metadata = event.metadata(); - write!(&mut writer, "DD_EXTENSION | {} | ", metadata.level())?; + let level = metadata.level(); - // Format all the spans in the event's span context. + let mut visitor = MessageVisitor(String::new()); + event.record(&mut visitor); + + // Build span context prefix + let mut span_prefix = String::new(); if let Some(scope) = ctx.event_scope() { for span in scope.from_root() { - write!(writer, "{}", span.name())?; - - // `FormattedFields` is a formatted representation of the span's - // fields, which is stored in its extensions by the `fmt` layer's - // `new_span` method. The fields will have been formatted - // by the same field formatter that's provided to the event - // formatter in the `FmtContext`. + span_prefix.push_str(span.name()); let ext = span.extensions(); - let fields = &ext - .get::>() - .expect("will never be `None`"); - - // Skip formatting the fields if the span had no fields. - if !fields.is_empty() { - write!(writer, "{{{fields}}}")?; + if let Some(fields) = ext.get::>() + && !fields.is_empty() + { + span_prefix.push('{'); + span_prefix.push_str(fields); + span_prefix.push('}'); } - write!(writer, ": ")?; + span_prefix.push_str(": "); } } - // Write fields on the event - ctx.field_format().format_fields(writer.by_ref(), event)?; + let message = format!("DD_EXTENSION | {level} | {span_prefix}{}", visitor.0); + + // Use serde_json for safe serialization (handles escaping automatically) + let output = serde_json::json!({ + "level": level.to_string(), + "message": message, + }); + + writeln!(writer, "{output}") + } +} + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +mod tests { + use super::*; + use tracing::subscriber::with_default; + use tracing_subscriber::fmt::Subscriber; + + /// Captures all output from a tracing subscriber using our `Formatter`. + fn capture_log(f: F) -> String { + let buf = std::sync::Arc::new(std::sync::Mutex::new(Vec::new())); + let buf_clone = buf.clone(); + + let subscriber = Subscriber::builder() + .with_writer(move || -> Box { + Box::new(WriterGuard(buf_clone.clone())) + }) + .with_max_level(tracing::Level::TRACE) + .with_level(true) + .with_target(false) + .without_time() + .event_format(Formatter) + .finish(); + + with_default(subscriber, f); + + let lock = buf.lock().expect("test lock poisoned"); + String::from_utf8(lock.clone()).expect("invalid UTF-8 in log output") + } + + struct WriterGuard(std::sync::Arc>>); + + impl std::io::Write for WriterGuard { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.0 + .lock() + .expect("write lock poisoned") + .extend_from_slice(buf); + Ok(buf.len()) + } + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } + } + + #[test] + fn test_formatter_outputs_valid_json_with_level() { + let output = capture_log(|| { + tracing::info!("hello world"); + }); + + let parsed: serde_json::Value = + serde_json::from_str(output.trim()).expect("output should be valid JSON"); + + assert_eq!(parsed["level"], "INFO"); + assert!( + parsed["message"] + .as_str() + .unwrap() + .contains("DD_EXTENSION | INFO | hello world") + ); + } + + #[test] + fn test_formatter_error_level() { + let output = capture_log(|| { + tracing::error!("something broke"); + }); + + let parsed: serde_json::Value = + serde_json::from_str(output.trim()).expect("output should be valid JSON"); + assert_eq!(parsed["level"], "ERROR"); + assert!( + parsed["message"] + .as_str() + .unwrap() + .contains("DD_EXTENSION | ERROR | something broke") + ); + } + + #[test] + fn test_formatter_debug_level() { + let output = capture_log(|| { + tracing::debug!("debug details"); + }); + + let parsed: serde_json::Value = + serde_json::from_str(output.trim()).expect("output should be valid JSON"); + assert_eq!(parsed["level"], "DEBUG"); + assert!( + parsed["message"] + .as_str() + .unwrap() + .contains("DD_EXTENSION | DEBUG | debug details") + ); + } + + #[test] + fn test_formatter_escapes_special_characters() { + let output = capture_log(|| { + tracing::info!("message with \"quotes\" and a\nnewline"); + }); + + // The raw output must contain escaped quotes and newlines to be valid JSON + assert!( + output.contains(r#"\"quotes\""#), + "quotes should be escaped in raw JSON" + ); + assert!( + output.contains(r"\n"), + "newline should be escaped in raw JSON" + ); - writeln!(writer) + // And it must parse as valid JSON + let parsed: serde_json::Value = serde_json::from_str(output.trim()) + .expect("output with special chars should be valid JSON"); + let msg = parsed["message"] + .as_str() + .expect("message field should be a string"); + assert!( + msg.contains("\"quotes\""), + "parsed message should contain literal quotes" + ); + assert!( + msg.contains('\n'), + "parsed message should contain literal newline" + ); } } diff --git a/bottlecap/src/logs/lambda/processor.rs b/bottlecap/src/logs/lambda/processor.rs index 5aeb5970f..8325dde32 100644 --- a/bottlecap/src/logs/lambda/processor.rs +++ b/bottlecap/src/logs/lambda/processor.rs @@ -55,6 +55,22 @@ fn is_oom_error(error_msg: &str) -> bool { .any(|&oom_str| error_msg.contains(oom_str)) } +/// Maps AWS/common log level strings to Datadog log status values. +/// Case-insensitive and accepts both short and long forms +/// (e.g. "WARN"/"WARNING", "INFO"/"INFORMATION", "ERR"/"ERROR"). +/// Returns `None` for unrecognized levels so callers can fall back to a default. +fn map_log_level_to_status(level: &str) -> Option<&'static str> { + match level.to_uppercase().as_str() { + "FATAL" | "CRITICAL" => Some("critical"), + "ERROR" | "ERR" => Some("error"), + "WARN" | "WARNING" => Some("warn"), + "INFO" | "INFORMATION" => Some("info"), + "DEBUG" => Some("debug"), + "TRACE" => Some("trace"), + _ => None, + } +} + impl Processor for LambdaProcessor {} impl LambdaProcessor { @@ -366,6 +382,18 @@ impl LambdaProcessor { lambda_message.message.clone(), ); + // Extract log level from JSON (AWS JSON log format / Powertools). + // Try "level" first (standard), then fall back to "status" (Datadog convention). + let status = json_obj + .get("level") + .or_else(|| json_obj.get("status")) + .and_then(|v| v.as_str()) + .and_then(map_log_level_to_status) + .map_or( + lambda_message.status.clone(), + std::string::ToString::to_string, + ); + IntakeLog { hostname: self.function_arn.clone(), source: LAMBDA_RUNTIME_SLUG.to_string(), @@ -375,7 +403,7 @@ impl LambdaProcessor { message: final_message, lambda: lambda_message.lambda, timestamp: lambda_message.timestamp, - status: lambda_message.status, + status, }, } } else { @@ -1635,4 +1663,464 @@ mod tests { // Should be None because we're not in LMI mode assert_eq!(result.lambda.request_id, None); } + + #[test] + fn test_map_log_level_to_status() { + // AWS JSON log format levels (uppercase) + assert_eq!(map_log_level_to_status("WARN"), Some("warn")); + assert_eq!(map_log_level_to_status("ERROR"), Some("error")); + assert_eq!(map_log_level_to_status("INFO"), Some("info")); + assert_eq!(map_log_level_to_status("DEBUG"), Some("debug")); + assert_eq!(map_log_level_to_status("FATAL"), Some("critical")); + assert_eq!(map_log_level_to_status("TRACE"), Some("trace")); + + // Case-insensitive (lowercase, mixed case, PascalCase) + assert_eq!(map_log_level_to_status("warn"), Some("warn")); + assert_eq!(map_log_level_to_status("error"), Some("error")); + assert_eq!(map_log_level_to_status("Warn"), Some("warn")); + assert_eq!(map_log_level_to_status("Info"), Some("info")); + assert_eq!(map_log_level_to_status("debug"), Some("debug")); + assert_eq!(map_log_level_to_status("Fatal"), Some("critical")); + assert_eq!(map_log_level_to_status("trace"), Some("trace")); + + // Short-form aliases + assert_eq!(map_log_level_to_status("ERR"), Some("error")); + assert_eq!(map_log_level_to_status("err"), Some("error")); + + // Long-form variants (.NET LogLevel names, syslog, etc.) + assert_eq!(map_log_level_to_status("WARNING"), Some("warn")); + assert_eq!(map_log_level_to_status("Warning"), Some("warn")); + assert_eq!(map_log_level_to_status("warning"), Some("warn")); + assert_eq!(map_log_level_to_status("INFORMATION"), Some("info")); + assert_eq!(map_log_level_to_status("Information"), Some("info")); + assert_eq!(map_log_level_to_status("information"), Some("info")); + assert_eq!(map_log_level_to_status("CRITICAL"), Some("critical")); + assert_eq!(map_log_level_to_status("Critical"), Some("critical")); + + // Unrecognized levels + assert_eq!(map_log_level_to_status("UNKNOWN"), None); + assert_eq!(map_log_level_to_status("VERBOSE"), None); + assert_eq!(map_log_level_to_status(""), None); + } + + #[tokio::test] + async fn test_get_intake_log_extracts_level_from_json() { + let config = Arc::new(config::Config { + service: Some("test-service".to_string()), + tags: HashMap::from([("test".to_string(), "tags".to_string())]), + ..config::Config::default() + }); + + let tags_provider = Arc::new(provider::Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + )); + + let (tx, _rx) = tokio::sync::mpsc::channel(2); + let mut processor = + LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone(), false); + + // Set request_id so logs are not orphaned + let start_event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + record: TelemetryRecord::PlatformStart { + request_id: "test-request-id".to_string(), + version: Some("test".to_string()), + }, + }; + let start_msg = processor.get_message(start_event).await.unwrap(); + processor.get_intake_log(start_msg).unwrap(); + + // Test WARN level + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 48).unwrap(), + record: TelemetryRecord::Function(Value::String( + r#"{"timestamp":"2025-08-27T10:25:22.244Z","level":"WARN","message":"This is a warning"}"#.to_string(), + )), + }; + let lambda_message = processor.get_message(event).await.unwrap(); + let intake_log = processor.get_intake_log(lambda_message).unwrap(); + assert_eq!(intake_log.message.status, "warn"); + + // Test ERROR level + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 49).unwrap(), + record: TelemetryRecord::Function(Value::String( + r#"{"timestamp":"2025-08-27T10:25:22.244Z","level":"ERROR","message":"This is an error"}"#.to_string(), + )), + }; + let lambda_message = processor.get_message(event).await.unwrap(); + let intake_log = processor.get_intake_log(lambda_message).unwrap(); + assert_eq!(intake_log.message.status, "error"); + + // Test FATAL level + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 50).unwrap(), + record: TelemetryRecord::Function(Value::String( + r#"{"timestamp":"2025-08-27T10:25:22.244Z","level":"FATAL","message":"Fatal error"}"#.to_string(), + )), + }; + let lambda_message = processor.get_message(event).await.unwrap(); + let intake_log = processor.get_intake_log(lambda_message).unwrap(); + assert_eq!(intake_log.message.status, "critical"); + + // Test DEBUG level + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 51).unwrap(), + record: TelemetryRecord::Function(Value::String( + r#"{"timestamp":"2025-08-27T10:25:22.244Z","level":"DEBUG","message":"Debug info"}"#.to_string(), + )), + }; + let lambda_message = processor.get_message(event).await.unwrap(); + let intake_log = processor.get_intake_log(lambda_message).unwrap(); + assert_eq!(intake_log.message.status, "debug"); + + // Test INFO level (should remain "info") + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 52).unwrap(), + record: TelemetryRecord::Function(Value::String( + r#"{"timestamp":"2025-08-27T10:25:22.244Z","level":"INFO","message":"Info message"}"#.to_string(), + )), + }; + let lambda_message = processor.get_message(event).await.unwrap(); + let intake_log = processor.get_intake_log(lambda_message).unwrap(); + assert_eq!(intake_log.message.status, "info"); + } + + #[tokio::test] + async fn test_get_intake_log_no_level_defaults_to_info() { + let config = Arc::new(config::Config { + service: Some("test-service".to_string()), + tags: HashMap::from([("test".to_string(), "tags".to_string())]), + ..config::Config::default() + }); + + let tags_provider = Arc::new(provider::Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + )); + + let (tx, _rx) = tokio::sync::mpsc::channel(2); + let mut processor = + LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone(), false); + + // Set request_id + let start_event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + record: TelemetryRecord::PlatformStart { + request_id: "test-request-id".to_string(), + version: Some("test".to_string()), + }, + }; + let start_msg = processor.get_message(start_event).await.unwrap(); + processor.get_intake_log(start_msg).unwrap(); + + // JSON without level field + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 48).unwrap(), + record: TelemetryRecord::Function(Value::String( + r#"{"message":"No level field here"}"#.to_string(), + )), + }; + let lambda_message = processor.get_message(event).await.unwrap(); + let intake_log = processor.get_intake_log(lambda_message).unwrap(); + assert_eq!(intake_log.message.status, "info"); + } + + #[tokio::test] + async fn test_get_intake_log_unrecognized_level_defaults_to_info() { + let config = Arc::new(config::Config { + service: Some("test-service".to_string()), + tags: HashMap::from([("test".to_string(), "tags".to_string())]), + ..config::Config::default() + }); + + let tags_provider = Arc::new(provider::Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + )); + + let (tx, _rx) = tokio::sync::mpsc::channel(2); + let mut processor = + LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone(), false); + + // Set request_id + let start_event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + record: TelemetryRecord::PlatformStart { + request_id: "test-request-id".to_string(), + version: Some("test".to_string()), + }, + }; + let start_msg = processor.get_message(start_event).await.unwrap(); + processor.get_intake_log(start_msg).unwrap(); + + // JSON with unrecognized level + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 48).unwrap(), + record: TelemetryRecord::Function(Value::String( + r#"{"level":"VERBOSE","message":"Unknown level"}"#.to_string(), + )), + }; + let lambda_message = processor.get_message(event).await.unwrap(); + let intake_log = processor.get_intake_log(lambda_message).unwrap(); + assert_eq!(intake_log.message.status, "info"); + } + + #[tokio::test] + async fn test_platform_event_status_not_overridden_by_level() { + let config = Arc::new(config::Config { + service: Some("test-service".to_string()), + tags: HashMap::from([("test".to_string(), "tags".to_string())]), + ..config::Config::default() + }); + + let tags_provider = Arc::new(provider::Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + )); + + let (tx, _rx) = tokio::sync::mpsc::channel(2); + let mut processor = + LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone(), false); + + // PlatformRuntimeDone with timeout should keep "error" status + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + record: TelemetryRecord::PlatformRuntimeDone { + request_id: "test-request-id".to_string(), + status: Status::Timeout, + error_type: None, + metrics: Some(RuntimeDoneMetrics { + duration_ms: 5000.0, + produced_bytes: Some(42), + }), + }, + }; + + let lambda_message = processor.get_message(event).await.unwrap(); + assert_eq!(lambda_message.status, "error"); + + // The intake log should preserve the "error" status (message is not JSON) + let intake_log = processor.get_intake_log(lambda_message).unwrap(); + assert_eq!(intake_log.message.status, "error"); + } + + #[tokio::test] + async fn test_extension_json_log_extracts_level() { + let config = Arc::new(config::Config { + service: Some("test-service".to_string()), + tags: HashMap::from([("test".to_string(), "tags".to_string())]), + ..config::Config::default() + }); + + let tags_provider = Arc::new(provider::Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + )); + + let (tx, _rx) = tokio::sync::mpsc::channel(2); + let mut processor = + LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone(), true); + + // Extension log from our JSON formatter: {"level":"ERROR","message":"DD_EXTENSION | ERROR | ..."} + // Arrives as a string since it was written to stderr as a JSON line + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + record: TelemetryRecord::Extension(Value::String( + r#"{"level":"ERROR","message":"DD_EXTENSION | ERROR | Extension loop failed"}"# + .to_string(), + )), + }; + let lambda_message = processor.get_message(event).await.unwrap(); + let intake_log = processor.get_intake_log(lambda_message).unwrap(); + assert_eq!(intake_log.message.status, "error"); + assert!( + intake_log + .message + .message + .contains("DD_EXTENSION | ERROR |") + ); + + // DEBUG level + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 48).unwrap(), + record: TelemetryRecord::Extension(Value::String( + r#"{"level":"DEBUG","message":"DD_EXTENSION | DEBUG | Starting extension"}"# + .to_string(), + )), + }; + let lambda_message = processor.get_message(event).await.unwrap(); + let intake_log = processor.get_intake_log(lambda_message).unwrap(); + assert_eq!(intake_log.message.status, "debug"); + } + + #[tokio::test] + async fn test_get_intake_log_non_string_level_defaults_to_info() { + let config = Arc::new(config::Config { + service: Some("test-service".to_string()), + tags: HashMap::from([("test".to_string(), "tags".to_string())]), + ..config::Config::default() + }); + + let tags_provider = Arc::new(provider::Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + )); + + let (tx, _rx) = tokio::sync::mpsc::channel(2); + let mut processor = + LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone(), false); + + // Set request_id + let start_event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + record: TelemetryRecord::PlatformStart { + request_id: "test-request-id".to_string(), + version: Some("test".to_string()), + }, + }; + let start_msg = processor.get_message(start_event).await.unwrap(); + processor.get_intake_log(start_msg).unwrap(); + + // level as a number + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 48).unwrap(), + record: TelemetryRecord::Function(Value::String( + r#"{"level":42,"message":"numeric level"}"#.to_string(), + )), + }; + let lambda_message = processor.get_message(event).await.unwrap(); + let intake_log = processor.get_intake_log(lambda_message).unwrap(); + assert_eq!(intake_log.message.status, "info"); + + // level as null + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 49).unwrap(), + record: TelemetryRecord::Function(Value::String( + r#"{"level":null,"message":"null level"}"#.to_string(), + )), + }; + let lambda_message = processor.get_message(event).await.unwrap(); + let intake_log = processor.get_intake_log(lambda_message).unwrap(); + assert_eq!(intake_log.message.status, "info"); + + // level as a boolean + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 50).unwrap(), + record: TelemetryRecord::Function(Value::String( + r#"{"level":true,"message":"bool level"}"#.to_string(), + )), + }; + let lambda_message = processor.get_message(event).await.unwrap(); + let intake_log = processor.get_intake_log(lambda_message).unwrap(); + assert_eq!(intake_log.message.status, "info"); + } + + #[tokio::test] + async fn test_get_intake_log_ddtags_and_level_combined() { + let config = Arc::new(config::Config { + service: Some("test-service".to_string()), + tags: HashMap::from([("test".to_string(), "tags".to_string())]), + ..config::Config::default() + }); + + let tags_provider = Arc::new(provider::Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + )); + + let (tx, _rx) = tokio::sync::mpsc::channel(2); + let mut processor = + LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone(), false); + + // Set request_id + let start_event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + record: TelemetryRecord::PlatformStart { + request_id: "test-request-id".to_string(), + version: Some("test".to_string()), + }, + }; + let start_msg = processor.get_message(start_event).await.unwrap(); + processor.get_intake_log(start_msg).unwrap(); + + // JSON log with both ddtags and level + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 48).unwrap(), + record: TelemetryRecord::Function(Value::String( + r#"{"level":"WARN","message":"warning with tags","ddtags":"env:staging,team:backend"}"# + .to_string(), + )), + }; + let lambda_message = processor.get_message(event).await.unwrap(); + let intake_log = processor.get_intake_log(lambda_message).unwrap(); + + // Level should be extracted + assert_eq!(intake_log.message.status, "warn"); + // Tags should be extracted and appended + assert!(intake_log.tags.contains("env:staging")); + assert!(intake_log.tags.contains("team:backend")); + // ddtags should be removed from the message + assert!(!intake_log.message.message.contains("ddtags")); + } + + #[tokio::test] + async fn test_get_intake_log_status_field_fallback() { + let config = Arc::new(config::Config { + service: Some("test-service".to_string()), + tags: HashMap::from([("test".to_string(), "tags".to_string())]), + ..config::Config::default() + }); + + let tags_provider = Arc::new(provider::Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + )); + + let (tx, _rx) = tokio::sync::mpsc::channel(2); + let mut processor = + LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone(), false); + + // Set request_id + let start_event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + record: TelemetryRecord::PlatformStart { + request_id: "test-request-id".to_string(), + version: Some("test".to_string()), + }, + }; + let start_msg = processor.get_message(start_event).await.unwrap(); + processor.get_intake_log(start_msg).unwrap(); + + // JSON log with "status" field instead of "level" (Datadog convention) + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 48).unwrap(), + record: TelemetryRecord::Function(Value::String( + r#"{"status":"error","message":"something failed"}"#.to_string(), + )), + }; + let lambda_message = processor.get_message(event).await.unwrap(); + let intake_log = processor.get_intake_log(lambda_message).unwrap(); + assert_eq!(intake_log.message.status, "error"); + + // "level" takes priority over "status" when both are present + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 49).unwrap(), + record: TelemetryRecord::Function(Value::String( + r#"{"level":"WARN","status":"error","message":"both fields"}"#.to_string(), + )), + }; + let lambda_message = processor.get_message(event).await.unwrap(); + let intake_log = processor.get_intake_log(lambda_message).unwrap(); + assert_eq!(intake_log.message.status, "warn"); + } }