diff --git a/src/config/user.rs b/src/config/user.rs index 8c21827..6be3ba5 100644 --- a/src/config/user.rs +++ b/src/config/user.rs @@ -35,6 +35,13 @@ pub fn max_event_payload_size() -> usize { } } +pub fn log_json_body_max_attributes() -> usize { + match std::env::var("DASH0_LOG_JSON_BODY_MAX_ATTRIBUTES") { + Ok(val) => val.parse::().unwrap_or(25), + Err(_) => 25, + } +} + pub fn request_timeout_ms() -> u64 { match std::env::var("DASH0_REQUEST_TIMEOUT") { Ok(val) => val.parse::().unwrap_or(2000), @@ -117,7 +124,7 @@ mod tests { use super::{ is_auto_instrumented_disabled, is_send_on_invocation_end, is_telemetry_log_collection_disabled, is_telemetry_metrics_disabled, - is_telemetry_traces_disabled, max_event_payload_size, + is_telemetry_traces_disabled, log_json_body_max_attributes, max_event_payload_size, }; use serial_test::serial; @@ -288,4 +295,35 @@ mod tests { std::env::remove_var("DASH0_DISABLE_TELEMETRY_TRACES"); std::env::remove_var("AWS_LAMBDA_EXEC_WRAPPER"); } + + #[test] + #[serial] + fn log_json_body_max_attributes_defaults_to_25() { + std::env::remove_var("DASH0_LOG_JSON_BODY_MAX_ATTRIBUTES"); + assert_eq!(log_json_body_max_attributes(), 25); + } + + #[test] + #[serial] + fn log_json_body_max_attributes_parses_value() { + std::env::set_var("DASH0_LOG_JSON_BODY_MAX_ATTRIBUTES", "10"); + assert_eq!(log_json_body_max_attributes(), 10); + std::env::remove_var("DASH0_LOG_JSON_BODY_MAX_ATTRIBUTES"); + } + + #[test] + #[serial] + fn log_json_body_max_attributes_handles_invalid_value() { + std::env::set_var("DASH0_LOG_JSON_BODY_MAX_ATTRIBUTES", "not_a_number"); + assert_eq!(log_json_body_max_attributes(), 25); + std::env::remove_var("DASH0_LOG_JSON_BODY_MAX_ATTRIBUTES"); + } + + #[test] + #[serial] + fn log_json_body_max_attributes_zero_is_valid() { + std::env::set_var("DASH0_LOG_JSON_BODY_MAX_ATTRIBUTES", "0"); + assert_eq!(log_json_body_max_attributes(), 0); + std::env::remove_var("DASH0_LOG_JSON_BODY_MAX_ATTRIBUTES"); + } } diff --git a/src/otlp/log_mutations.rs b/src/otlp/log_mutations.rs index 008c120..170f325 100644 --- a/src/otlp/log_mutations.rs +++ b/src/otlp/log_mutations.rs @@ -1,7 +1,7 @@ use crate::state::invocation_data::TelemetryLog; use crate::state::invocation_entry; use chrono::DateTime; -use opentelemetry_proto::tonic::common::v1::AnyValue; +use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue}; use opentelemetry_proto::tonic::logs::v1::LogRecord; /// Formats a platform.report log record into a CloudWatch-style REPORT message. @@ -139,6 +139,50 @@ fn severity_text_to_number(severity: &str) -> i32 { } } +fn json_value_to_any_value(v: &serde_json::Value) -> AnyValue { + use opentelemetry_proto::tonic::common::v1::any_value::Value; + let inner = match v { + serde_json::Value::String(s) => Value::StringValue(s.clone()), + serde_json::Value::Bool(b) => Value::BoolValue(*b), + serde_json::Value::Number(n) => { + if let Some(i) = n.as_i64() { + Value::IntValue(i) + } else if let Some(f) = n.as_f64() { + Value::DoubleValue(f) + } else { + Value::StringValue(n.to_string()) + } + } + serde_json::Value::Null => Value::StringValue("null".to_string()), + other => Value::StringValue( + serde_json::to_string(other).unwrap_or_else(|_| other.to_string()), + ), + }; + AnyValue { value: Some(inner) } +} + +fn try_expand_json_body_to_attributes( + body: &str, + max_attributes: usize, +) -> Option> { + if max_attributes == 0 { + return None; + } + let parsed = serde_json::from_str::(body.trim()).ok()?; + let obj = parsed.as_object()?; + if obj.len() > max_attributes { + return None; + } + Some( + obj.iter() + .map(|(k, v)| KeyValue { + key: k.clone(), + value: Some(json_value_to_any_value(v)), + }) + .collect(), + ) +} + pub fn build_payload_log( payload: &str, payload_type: &str, @@ -292,6 +336,13 @@ pub fn map_logs_to_otlp(logs: &[TelemetryLog]) -> Vec { "INFO".to_string() }; + if !is_platform_log { + let max_attrs = crate::config::user::log_json_body_max_attributes(); + if let Some(json_attrs) = try_expand_json_body_to_attributes(&body_message, max_attrs) { + attributes.extend(json_attrs); + } + } + let log_record = LogRecord { time_unix_nano: timestamp_nanos, observed_time_unix_nano: timestamp_nanos, @@ -318,6 +369,7 @@ pub fn map_logs_to_otlp(logs: &[TelemetryLog]) -> Vec { mod tests { use super::*; use opentelemetry_proto::tonic::common::v1::any_value::Value; + use serial_test::serial; use serde_json::json; fn get_string_value(any_value: &Option) -> Option { @@ -1134,4 +1186,342 @@ mod tests { .unwrap(); assert_eq!(body, "just a plain log message"); } + + // --- json_value_to_any_value unit tests --- + + fn get_any_value_kind( + av: &AnyValue, + ) -> &opentelemetry_proto::tonic::common::v1::any_value::Value { + av.value.as_ref().unwrap() + } + + #[test] + fn test_json_value_to_any_value_string() { + let v = serde_json::Value::String("hello".to_string()); + let av = json_value_to_any_value(&v); + match get_any_value_kind(&av) { + Value::StringValue(s) => assert_eq!(s, "hello"), + _ => panic!("expected StringValue"), + } + } + + #[test] + fn test_json_value_to_any_value_bool_true() { + let v = serde_json::Value::Bool(true); + let av = json_value_to_any_value(&v); + match get_any_value_kind(&av) { + Value::BoolValue(b) => assert!(*b), + _ => panic!("expected BoolValue"), + } + } + + #[test] + fn test_json_value_to_any_value_bool_false() { + let v = serde_json::Value::Bool(false); + let av = json_value_to_any_value(&v); + match get_any_value_kind(&av) { + Value::BoolValue(b) => assert!(!b), + _ => panic!("expected BoolValue"), + } + } + + #[test] + fn test_json_value_to_any_value_integer() { + let v = serde_json::Value::Number(42.into()); + let av = json_value_to_any_value(&v); + match get_any_value_kind(&av) { + Value::IntValue(i) => assert_eq!(*i, 42), + _ => panic!("expected IntValue"), + } + } + + #[test] + fn test_json_value_to_any_value_float() { + let v: serde_json::Value = serde_json::from_str("3.14").unwrap(); + let av = json_value_to_any_value(&v); + match get_any_value_kind(&av) { + Value::DoubleValue(f) => assert!((f - 3.14).abs() < 1e-9), + _ => panic!("expected DoubleValue"), + } + } + + #[test] + fn test_json_value_to_any_value_null() { + let v = serde_json::Value::Null; + let av = json_value_to_any_value(&v); + match get_any_value_kind(&av) { + Value::StringValue(s) => assert_eq!(s, "null"), + _ => panic!("expected StringValue(null)"), + } + } + + #[test] + fn test_json_value_to_any_value_nested_object() { + let v: serde_json::Value = json!({"a": 1}); + let av = json_value_to_any_value(&v); + match get_any_value_kind(&av) { + Value::StringValue(s) => { + let parsed: serde_json::Value = serde_json::from_str(s).unwrap(); + assert_eq!(parsed, json!({"a": 1})); + } + _ => panic!("expected StringValue for nested object"), + } + } + + #[test] + fn test_json_value_to_any_value_array() { + let v: serde_json::Value = json!([1, 2, 3]); + let av = json_value_to_any_value(&v); + match get_any_value_kind(&av) { + Value::StringValue(s) => { + let parsed: serde_json::Value = serde_json::from_str(s).unwrap(); + assert_eq!(parsed, json!([1, 2, 3])); + } + _ => panic!("expected StringValue for array"), + } + } + + // --- try_expand_json_body_to_attributes unit tests --- + + #[test] + fn test_try_expand_plain_string_returns_none() { + assert!(try_expand_json_body_to_attributes("hello world", 25).is_none()); + } + + #[test] + fn test_try_expand_json_array_returns_none() { + assert!(try_expand_json_body_to_attributes("[1,2,3]", 25).is_none()); + } + + #[test] + fn test_try_expand_json_number_returns_none() { + assert!(try_expand_json_body_to_attributes("42", 25).is_none()); + } + + #[test] + fn test_try_expand_empty_object() { + let result = try_expand_json_body_to_attributes("{}", 25); + assert!(result.is_some()); + assert!(result.unwrap().is_empty()); + } + + #[test] + fn test_try_expand_small_object() { + let result = try_expand_json_body_to_attributes(r#"{"a":"1","b":"2"}"#, 25); + assert!(result.is_some()); + let attrs = result.unwrap(); + assert_eq!(attrs.len(), 2); + let keys: Vec<&str> = attrs.iter().map(|kv| kv.key.as_str()).collect(); + assert!(keys.contains(&"a")); + assert!(keys.contains(&"b")); + } + + #[test] + fn test_try_expand_exceeds_threshold_returns_none() { + let obj: serde_json::Map = (0..26) + .map(|i| (format!("key{}", i), serde_json::Value::String(i.to_string()))) + .collect(); + let body = serde_json::to_string(&obj).unwrap(); + assert!(try_expand_json_body_to_attributes(&body, 25).is_none()); + } + + #[test] + fn test_try_expand_at_threshold() { + let obj: serde_json::Map = (0..25) + .map(|i| (format!("key{}", i), serde_json::Value::String(i.to_string()))) + .collect(); + let body = serde_json::to_string(&obj).unwrap(); + let result = try_expand_json_body_to_attributes(&body, 25); + assert!(result.is_some()); + assert_eq!(result.unwrap().len(), 25); + } + + #[test] + fn test_try_expand_threshold_zero_returns_none() { + assert!(try_expand_json_body_to_attributes(r#"{"a":"1"}"#, 0).is_none()); + } + + #[test] + fn test_try_expand_handles_trailing_newline() { + let result = try_expand_json_body_to_attributes("{\"k\":\"v\"}\n", 25); + assert!(result.is_some()); + assert_eq!(result.unwrap().len(), 1); + } + + // --- map_logs_to_otlp integration tests for JSON expansion --- + + #[test] + fn test_json_body_expands_attributes_and_keeps_body() { + let logs = vec![TelemetryLog { + time: "2023-10-26T12:00:00.000Z".to_string(), + r#type: "function".to_string(), + record: json!(r#"{"level":"info","msg":"hello"}"#), + invocation_id: Some("inv-json-1".to_string()), + trace_id: None, + span_id: None, + custom_attributes: vec![], + }]; + + let result = map_logs_to_otlp(&logs); + assert_eq!(result.len(), 1); + let log = &result[0]; + + // Body should be the original JSON string + assert_eq!( + get_string_value(&log.body), + Some(r#"{"level":"info","msg":"hello"}"#.to_string()) + ); + + // Attributes: faas.invocation_id + level + msg + assert_eq!(log.attributes.len(), 3); + assert_eq!(log.attributes[0].key, "faas.invocation_id"); + let json_keys: Vec<&str> = log.attributes[1..] + .iter() + .map(|kv| kv.key.as_str()) + .collect(); + assert!(json_keys.contains(&"level")); + assert!(json_keys.contains(&"msg")); + } + + #[test] + fn test_non_json_body_no_extra_attributes() { + let logs = vec![TelemetryLog { + time: "2023-10-26T12:00:00.000Z".to_string(), + r#type: "function".to_string(), + record: json!("plain log message"), + invocation_id: Some("inv-plain-1".to_string()), + trace_id: None, + span_id: None, + custom_attributes: vec![], + }]; + + let result = map_logs_to_otlp(&logs); + assert_eq!(result.len(), 1); + // Only faas.invocation_id, no JSON expansion + assert_eq!(result[0].attributes.len(), 1); + assert_eq!(result[0].attributes[0].key, "faas.invocation_id"); + } + + #[test] + fn test_json_array_body_no_extra_attributes() { + let logs = vec![TelemetryLog { + time: "2023-10-26T12:00:00.000Z".to_string(), + r#type: "function".to_string(), + record: json!("[1,2,3]"), + invocation_id: Some("inv-arr-1".to_string()), + trace_id: None, + span_id: None, + custom_attributes: vec![], + }]; + + let result = map_logs_to_otlp(&logs); + assert_eq!(result.len(), 1); + assert_eq!(result[0].attributes.len(), 1); + assert_eq!(result[0].attributes[0].key, "faas.invocation_id"); + } + + #[test] + fn test_node_prefix_with_json_body_expands_attributes() { + let logs = vec![TelemetryLog { + time: "2026-03-02T11:53:38.040Z".to_string(), + r#type: "function".to_string(), + record: json!( + "2026-03-02T11:53:38.040Z\tf0ae1bc7-ae4b-4317-abf9-3a562e3127ef\tINFO\t{\"key\":\"val\"}" + ), + invocation_id: Some("inv-node-json".to_string()), + trace_id: None, + span_id: None, + custom_attributes: vec![], + }]; + + let result = map_logs_to_otlp(&logs); + assert_eq!(result.len(), 1); + let log = &result[0]; + + // Severity should be extracted from Node prefix + assert_eq!(log.severity_text, "INFO"); + // Body should be the JSON part after prefix stripping + assert_eq!(get_string_value(&log.body), Some(r#"{"key":"val"}"#.to_string())); + // Attributes: faas.invocation_id + key + assert_eq!(log.attributes.len(), 2); + let json_key_attr = log.attributes.iter().find(|kv| kv.key == "key"); + assert!(json_key_attr.is_some()); + assert_eq!( + get_string_value(&json_key_attr.unwrap().value), + Some("val".to_string()) + ); + } + + #[test] + fn test_platform_log_no_json_expansion() { + let logs = vec![TelemetryLog { + time: "2025-12-07T12:09:06.523Z".to_string(), + r#type: "platform.start".to_string(), + record: json!({ + "requestId": "test-no-expand", + "version": "$LATEST" + }), + invocation_id: Some("inv-plat-1".to_string()), + trace_id: None, + span_id: None, + custom_attributes: vec![], + }]; + + let result = map_logs_to_otlp(&logs); + assert_eq!(result.len(), 1); + // Only faas.invocation_id — platform logs must not trigger JSON expansion + assert_eq!(result[0].attributes.len(), 1); + assert_eq!(result[0].attributes[0].key, "faas.invocation_id"); + } + + #[test] + #[serial] + fn test_json_body_exceeds_configured_threshold_no_expansion() { + std::env::set_var("DASH0_LOG_JSON_BODY_MAX_ATTRIBUTES", "2"); + + let logs = vec![TelemetryLog { + time: "2023-10-26T12:00:00.000Z".to_string(), + r#type: "function".to_string(), + record: json!(r#"{"a":"1","b":"2","c":"3"}"#), + invocation_id: Some("inv-thresh-1".to_string()), + trace_id: None, + span_id: None, + custom_attributes: vec![], + }]; + + let result = map_logs_to_otlp(&logs); + std::env::remove_var("DASH0_LOG_JSON_BODY_MAX_ATTRIBUTES"); + + assert_eq!(result.len(), 1); + // Body kept as-is, no JSON expansion + assert_eq!( + get_string_value(&result[0].body), + Some(r#"{"a":"1","b":"2","c":"3"}"#.to_string()) + ); + assert_eq!(result[0].attributes.len(), 1); + assert_eq!(result[0].attributes[0].key, "faas.invocation_id"); + } + + #[test] + #[serial] + fn test_json_expansion_disabled_with_zero_threshold() { + std::env::set_var("DASH0_LOG_JSON_BODY_MAX_ATTRIBUTES", "0"); + + let logs = vec![TelemetryLog { + time: "2023-10-26T12:00:00.000Z".to_string(), + r#type: "function".to_string(), + record: json!(r#"{"a":"1"}"#), + invocation_id: Some("inv-zero-1".to_string()), + trace_id: None, + span_id: None, + custom_attributes: vec![], + }]; + + let result = map_logs_to_otlp(&logs); + std::env::remove_var("DASH0_LOG_JSON_BODY_MAX_ATTRIBUTES"); + + assert_eq!(result.len(), 1); + assert_eq!(result[0].attributes.len(), 1); + assert_eq!(result[0].attributes[0].key, "faas.invocation_id"); + } }