@@ -550,6 +550,7 @@ impl LambdaProcessor {
550550 execution_id : & str , // value
551551 execution_name : & str , // value
552552 first_invocation : Option < bool > , // value
553+ execution_status : Option < String > , // value
553554 ) {
554555 if self . durable_context_map . contains_key ( request_id) {
555556 error ! ( "LOGS | insert_to_durable_context_map: request_id={request_id} already in map" ) ;
@@ -567,6 +568,7 @@ impl LambdaProcessor {
567568 execution_id : execution_id. to_string ( ) ,
568569 execution_name : execution_name. to_string ( ) ,
569570 first_invocation,
571+ execution_status,
570572 } ,
571573 ) ;
572574 self . drain_held_logs_for_request_id ( request_id) ;
@@ -660,6 +662,9 @@ impl LambdaProcessor {
660662 if is_platform_log ( & log. message . message ) {
661663 log. message . lambda . first_invocation = ctx. first_invocation ;
662664 }
665+ if log. message . message . starts_with ( "END RequestId:" ) {
666+ log. message . lambda . durable_execution_status = ctx. execution_status . clone ( ) ;
667+ }
663668 if let Ok ( s) = serde_json:: to_string ( & log) {
664669 // explicitly drop log so we don't accidentally re-use it and push
665670 // duplicate logs to the aggregator
@@ -2578,4 +2583,41 @@ mod tests {
25782583 let batches = aggregator_handle. get_batches ( ) . await . unwrap ( ) ;
25792584 assert ! ( batches. is_empty( ) ) ;
25802585 }
2586+
2587+ #[ tokio:: test]
2588+ async fn test_execution_status_on_end_log ( ) {
2589+ for ( execution_status, expected) in [
2590+ ( Some ( "SUCCEEDED" ) , serde_json:: json!( "SUCCEEDED" ) ) ,
2591+ ( None , serde_json:: Value :: Null ) ,
2592+ ] {
2593+ let mut processor = make_processor_for_durable_arn_tests ( ) ;
2594+ processor. is_durable_function = Some ( true ) ;
2595+ processor. invocation_context . request_id = "req-end" . to_string ( ) ;
2596+ processor. insert_to_durable_context_map (
2597+ "req-end" ,
2598+ "exec-id-123" ,
2599+ "exec-name-abc" ,
2600+ Some ( false ) ,
2601+ execution_status. map ( str:: to_string) ,
2602+ ) ;
2603+ let event = TelemetryEvent {
2604+ time : Utc . with_ymd_and_hms ( 2023 , 1 , 7 , 3 , 23 , 47 ) . unwrap ( ) ,
2605+ record : TelemetryRecord :: PlatformRuntimeDone {
2606+ request_id : "req-end" . to_string ( ) ,
2607+ status : Status :: Success ,
2608+ error_type : None ,
2609+ metrics : None ,
2610+ } ,
2611+ } ;
2612+ let ( aggregator_service, aggregator_handle) = AggregatorService :: default ( ) ;
2613+ tokio:: spawn ( async move { aggregator_service. run ( ) . await } ) ;
2614+ processor. process ( event, & aggregator_handle) . await ;
2615+ let batches = aggregator_handle. get_batches ( ) . await . unwrap ( ) ;
2616+ let logs: Vec < serde_json:: Value > = serde_json:: from_slice ( & batches[ 0 ] ) . unwrap ( ) ;
2617+ assert_eq ! (
2618+ logs[ 0 ] [ "message" ] [ "lambda" ] [ "durable_function.execution_status" ] ,
2619+ expected
2620+ ) ;
2621+ }
2622+ }
25812623}
0 commit comments