@@ -381,9 +381,11 @@ impl LambdaProcessor {
381381 lambda_message. message . clone ( ) ,
382382 ) ;
383383
384- // Extract log level from JSON (AWS JSON log format / Powertools)
384+ // Extract log level from JSON (AWS JSON log format / Powertools).
385+ // Try "level" first (standard), then fall back to "status" (Datadog convention).
385386 let status = json_obj
386387 . get ( "level" )
388+ . or_else ( || json_obj. get ( "status" ) )
387389 . and_then ( |v| v. as_str ( ) )
388390 . and_then ( map_log_level_to_status)
389391 . map_or (
@@ -1956,4 +1958,168 @@ mod tests {
19561958 let intake_log = processor. get_intake_log ( lambda_message) . unwrap ( ) ;
19571959 assert_eq ! ( intake_log. message. status, "debug" ) ;
19581960 }
1961+
1962+ #[ tokio:: test]
1963+ async fn test_get_intake_log_non_string_level_defaults_to_info ( ) {
1964+ let config = Arc :: new ( config:: Config {
1965+ service : Some ( "test-service" . to_string ( ) ) ,
1966+ tags : HashMap :: from ( [ ( "test" . to_string ( ) , "tags" . to_string ( ) ) ] ) ,
1967+ ..config:: Config :: default ( )
1968+ } ) ;
1969+
1970+ let tags_provider = Arc :: new ( provider:: Provider :: new (
1971+ Arc :: clone ( & config) ,
1972+ LAMBDA_RUNTIME_SLUG . to_string ( ) ,
1973+ & HashMap :: from ( [ ( "function_arn" . to_string ( ) , "test-arn" . to_string ( ) ) ] ) ,
1974+ ) ) ;
1975+
1976+ let ( tx, _rx) = tokio:: sync:: mpsc:: channel ( 2 ) ;
1977+ let mut processor =
1978+ LambdaProcessor :: new ( tags_provider, Arc :: clone ( & config) , tx. clone ( ) , false ) ;
1979+
1980+ // Set request_id
1981+ let start_event = TelemetryEvent {
1982+ time : Utc . with_ymd_and_hms ( 2023 , 1 , 7 , 3 , 23 , 47 ) . unwrap ( ) ,
1983+ record : TelemetryRecord :: PlatformStart {
1984+ request_id : "test-request-id" . to_string ( ) ,
1985+ version : Some ( "test" . to_string ( ) ) ,
1986+ } ,
1987+ } ;
1988+ let start_msg = processor. get_message ( start_event) . await . unwrap ( ) ;
1989+ processor. get_intake_log ( start_msg) . unwrap ( ) ;
1990+
1991+ // level as a number
1992+ let event = TelemetryEvent {
1993+ time : Utc . with_ymd_and_hms ( 2023 , 1 , 7 , 3 , 23 , 48 ) . unwrap ( ) ,
1994+ record : TelemetryRecord :: Function ( Value :: String (
1995+ r#"{"level":42,"message":"numeric level"}"# . to_string ( ) ,
1996+ ) ) ,
1997+ } ;
1998+ let lambda_message = processor. get_message ( event) . await . unwrap ( ) ;
1999+ let intake_log = processor. get_intake_log ( lambda_message) . unwrap ( ) ;
2000+ assert_eq ! ( intake_log. message. status, "info" ) ;
2001+
2002+ // level as null
2003+ let event = TelemetryEvent {
2004+ time : Utc . with_ymd_and_hms ( 2023 , 1 , 7 , 3 , 23 , 49 ) . unwrap ( ) ,
2005+ record : TelemetryRecord :: Function ( Value :: String (
2006+ r#"{"level":null,"message":"null level"}"# . to_string ( ) ,
2007+ ) ) ,
2008+ } ;
2009+ let lambda_message = processor. get_message ( event) . await . unwrap ( ) ;
2010+ let intake_log = processor. get_intake_log ( lambda_message) . unwrap ( ) ;
2011+ assert_eq ! ( intake_log. message. status, "info" ) ;
2012+
2013+ // level as a boolean
2014+ let event = TelemetryEvent {
2015+ time : Utc . with_ymd_and_hms ( 2023 , 1 , 7 , 3 , 23 , 50 ) . unwrap ( ) ,
2016+ record : TelemetryRecord :: Function ( Value :: String (
2017+ r#"{"level":true,"message":"bool level"}"# . to_string ( ) ,
2018+ ) ) ,
2019+ } ;
2020+ let lambda_message = processor. get_message ( event) . await . unwrap ( ) ;
2021+ let intake_log = processor. get_intake_log ( lambda_message) . unwrap ( ) ;
2022+ assert_eq ! ( intake_log. message. status, "info" ) ;
2023+ }
2024+
2025+ #[ tokio:: test]
2026+ async fn test_get_intake_log_ddtags_and_level_combined ( ) {
2027+ let config = Arc :: new ( config:: Config {
2028+ service : Some ( "test-service" . to_string ( ) ) ,
2029+ tags : HashMap :: from ( [ ( "test" . to_string ( ) , "tags" . to_string ( ) ) ] ) ,
2030+ ..config:: Config :: default ( )
2031+ } ) ;
2032+
2033+ let tags_provider = Arc :: new ( provider:: Provider :: new (
2034+ Arc :: clone ( & config) ,
2035+ LAMBDA_RUNTIME_SLUG . to_string ( ) ,
2036+ & HashMap :: from ( [ ( "function_arn" . to_string ( ) , "test-arn" . to_string ( ) ) ] ) ,
2037+ ) ) ;
2038+
2039+ let ( tx, _rx) = tokio:: sync:: mpsc:: channel ( 2 ) ;
2040+ let mut processor =
2041+ LambdaProcessor :: new ( tags_provider, Arc :: clone ( & config) , tx. clone ( ) , false ) ;
2042+
2043+ // Set request_id
2044+ let start_event = TelemetryEvent {
2045+ time : Utc . with_ymd_and_hms ( 2023 , 1 , 7 , 3 , 23 , 47 ) . unwrap ( ) ,
2046+ record : TelemetryRecord :: PlatformStart {
2047+ request_id : "test-request-id" . to_string ( ) ,
2048+ version : Some ( "test" . to_string ( ) ) ,
2049+ } ,
2050+ } ;
2051+ let start_msg = processor. get_message ( start_event) . await . unwrap ( ) ;
2052+ processor. get_intake_log ( start_msg) . unwrap ( ) ;
2053+
2054+ // JSON log with both ddtags and level
2055+ let event = TelemetryEvent {
2056+ time : Utc . with_ymd_and_hms ( 2023 , 1 , 7 , 3 , 23 , 48 ) . unwrap ( ) ,
2057+ record : TelemetryRecord :: Function ( Value :: String (
2058+ r#"{"level":"WARN","message":"warning with tags","ddtags":"env:staging,team:backend"}"#
2059+ . to_string ( ) ,
2060+ ) ) ,
2061+ } ;
2062+ let lambda_message = processor. get_message ( event) . await . unwrap ( ) ;
2063+ let intake_log = processor. get_intake_log ( lambda_message) . unwrap ( ) ;
2064+
2065+ // Level should be extracted
2066+ assert_eq ! ( intake_log. message. status, "warn" ) ;
2067+ // Tags should be extracted and appended
2068+ assert ! ( intake_log. tags. contains( "env:staging" ) ) ;
2069+ assert ! ( intake_log. tags. contains( "team:backend" ) ) ;
2070+ // ddtags should be removed from the message
2071+ assert ! ( !intake_log. message. message. contains( "ddtags" ) ) ;
2072+ }
2073+
2074+ #[ tokio:: test]
2075+ async fn test_get_intake_log_status_field_fallback ( ) {
2076+ let config = Arc :: new ( config:: Config {
2077+ service : Some ( "test-service" . to_string ( ) ) ,
2078+ tags : HashMap :: from ( [ ( "test" . to_string ( ) , "tags" . to_string ( ) ) ] ) ,
2079+ ..config:: Config :: default ( )
2080+ } ) ;
2081+
2082+ let tags_provider = Arc :: new ( provider:: Provider :: new (
2083+ Arc :: clone ( & config) ,
2084+ LAMBDA_RUNTIME_SLUG . to_string ( ) ,
2085+ & HashMap :: from ( [ ( "function_arn" . to_string ( ) , "test-arn" . to_string ( ) ) ] ) ,
2086+ ) ) ;
2087+
2088+ let ( tx, _rx) = tokio:: sync:: mpsc:: channel ( 2 ) ;
2089+ let mut processor =
2090+ LambdaProcessor :: new ( tags_provider, Arc :: clone ( & config) , tx. clone ( ) , false ) ;
2091+
2092+ // Set request_id
2093+ let start_event = TelemetryEvent {
2094+ time : Utc . with_ymd_and_hms ( 2023 , 1 , 7 , 3 , 23 , 47 ) . unwrap ( ) ,
2095+ record : TelemetryRecord :: PlatformStart {
2096+ request_id : "test-request-id" . to_string ( ) ,
2097+ version : Some ( "test" . to_string ( ) ) ,
2098+ } ,
2099+ } ;
2100+ let start_msg = processor. get_message ( start_event) . await . unwrap ( ) ;
2101+ processor. get_intake_log ( start_msg) . unwrap ( ) ;
2102+
2103+ // JSON log with "status" field instead of "level" (Datadog convention)
2104+ let event = TelemetryEvent {
2105+ time : Utc . with_ymd_and_hms ( 2023 , 1 , 7 , 3 , 23 , 48 ) . unwrap ( ) ,
2106+ record : TelemetryRecord :: Function ( Value :: String (
2107+ r#"{"status":"error","message":"something failed"}"# . to_string ( ) ,
2108+ ) ) ,
2109+ } ;
2110+ let lambda_message = processor. get_message ( event) . await . unwrap ( ) ;
2111+ let intake_log = processor. get_intake_log ( lambda_message) . unwrap ( ) ;
2112+ assert_eq ! ( intake_log. message. status, "error" ) ;
2113+
2114+ // "level" takes priority over "status" when both are present
2115+ let event = TelemetryEvent {
2116+ time : Utc . with_ymd_and_hms ( 2023 , 1 , 7 , 3 , 23 , 49 ) . unwrap ( ) ,
2117+ record : TelemetryRecord :: Function ( Value :: String (
2118+ r#"{"level":"WARN","status":"error","message":"both fields"}"# . to_string ( ) ,
2119+ ) ) ,
2120+ } ;
2121+ let lambda_message = processor. get_message ( event) . await . unwrap ( ) ;
2122+ let intake_log = processor. get_intake_log ( lambda_message) . unwrap ( ) ;
2123+ assert_eq ! ( intake_log. message. status, "warn" ) ;
2124+ }
19592125}
0 commit comments