Skip to content

Commit 53a9b6b

Browse files
lym953claude
andcommitted
[SVLS-8583] Set durable_function.execution_status on END platform logs
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 8de97e0 commit 53a9b6b

File tree

7 files changed

+70
-4
lines changed

7 files changed

+70
-4
lines changed

bottlecap/src/lifecycle/invocation/processor.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1405,6 +1405,7 @@ impl Processor {
14051405
execution_id: &str,
14061406
execution_name: &str,
14071407
first_invocation: Option<bool>,
1408+
execution_status: Option<String>,
14081409
) {
14091410
if let Err(e) = self
14101411
.durable_context_tx
@@ -1413,6 +1414,7 @@ impl Processor {
14131414
execution_id: execution_id.to_owned(),
14141415
execution_name: execution_name.to_owned(),
14151416
first_invocation,
1417+
execution_status,
14161418
})
14171419
.await
14181420
{

bottlecap/src/lifecycle/invocation/processor_service.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ pub enum ProcessorCommand {
115115
execution_id: String,
116116
execution_name: String,
117117
first_invocation: Option<bool>,
118+
execution_status: Option<String>,
118119
},
119120
OnOutOfMemoryError {
120121
timestamp: i64,
@@ -391,13 +392,15 @@ impl InvocationProcessorHandle {
391392
execution_id: String,
392393
execution_name: String,
393394
first_invocation: Option<bool>,
395+
execution_status: Option<String>,
394396
) -> Result<(), mpsc::error::SendError<ProcessorCommand>> {
395397
self.sender
396398
.send(ProcessorCommand::ForwardDurableContext {
397399
request_id,
398400
execution_id,
399401
execution_name,
400402
first_invocation,
403+
execution_status,
401404
})
402405
.await
403406
}
@@ -617,13 +620,15 @@ impl InvocationProcessorService {
617620
execution_id,
618621
execution_name,
619622
first_invocation,
623+
execution_status,
620624
} => {
621625
self.processor
622626
.forward_durable_context(
623627
&request_id,
624628
&execution_id,
625629
&execution_name,
626630
first_invocation,
631+
execution_status,
627632
)
628633
.await;
629634
}

bottlecap/src/logs/lambda/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ pub struct DurableContextUpdate {
99
pub execution_id: String,
1010
pub execution_name: String,
1111
pub first_invocation: Option<bool>,
12+
pub execution_status: Option<String>,
1213
}
1314

1415
/// Durable execution context stored per `request_id` in `LambdaProcessor::durable_context_map`.
@@ -17,6 +18,7 @@ pub struct DurableExecutionContext {
1718
pub execution_id: String,
1819
pub execution_name: String,
1920
pub first_invocation: Option<bool>,
21+
pub execution_status: Option<String>,
2022
}
2123

2224
///
@@ -66,6 +68,11 @@ pub struct Lambda {
6668
skip_serializing_if = "Option::is_none"
6769
)]
6870
pub first_invocation: Option<bool>,
71+
#[serde(
72+
rename = "durable_function.execution_status",
73+
skip_serializing_if = "Option::is_none"
74+
)]
75+
pub durable_execution_status: Option<String>,
6976
}
7077

7178
impl Message {

bottlecap/src/logs/lambda/processor.rs

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -546,10 +546,11 @@ impl LambdaProcessor {
546546
/// `request_id`.
547547
pub fn insert_to_durable_context_map(
548548
&mut self,
549-
request_id: &str, // key
550-
execution_id: &str, // value
551-
execution_name: &str, // value
552-
first_invocation: Option<bool>, // value
549+
request_id: &str, // key
550+
execution_id: &str, // value
551+
execution_name: &str, // value
552+
first_invocation: Option<bool>, // value
553+
execution_status: Option<String>, // value
553554
) {
554555
if self.durable_context_map.contains_key(request_id) {
555556
error!("LOGS | insert_to_durable_context_map: request_id={request_id} already in map");
@@ -567,6 +568,7 @@ impl LambdaProcessor {
567568
execution_id: execution_id.to_string(),
568569
execution_name: execution_name.to_string(),
569570
first_invocation,
571+
execution_status,
570572
},
571573
);
572574
self.drain_held_logs_for_request_id(request_id);
@@ -660,6 +662,9 @@ impl LambdaProcessor {
660662
if is_platform_log(&log.message.message) {
661663
log.message.lambda.first_invocation = ctx.first_invocation;
662664
}
665+
if log.message.message.starts_with("END RequestId:") {
666+
log.message.lambda.durable_execution_status = ctx.execution_status.clone();
667+
}
663668
if let Ok(s) = serde_json::to_string(&log) {
664669
// explicitly drop log so we don't accidentally re-use it and push
665670
// duplicate logs to the aggregator
@@ -2578,4 +2583,41 @@ mod tests {
25782583
let batches = aggregator_handle.get_batches().await.unwrap();
25792584
assert!(batches.is_empty());
25802585
}
2586+
2587+
#[tokio::test]
2588+
async fn test_execution_status_on_end_log() {
2589+
for (execution_status, expected) in [
2590+
(Some("SUCCEEDED"), serde_json::json!("SUCCEEDED")),
2591+
(None, serde_json::Value::Null),
2592+
] {
2593+
let mut processor = make_processor_for_durable_arn_tests();
2594+
processor.is_durable_function = Some(true);
2595+
processor.invocation_context.request_id = "req-end".to_string();
2596+
processor.insert_to_durable_context_map(
2597+
"req-end",
2598+
"exec-id-123",
2599+
"exec-name-abc",
2600+
Some(false),
2601+
execution_status.map(str::to_string),
2602+
);
2603+
let event = TelemetryEvent {
2604+
time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(),
2605+
record: TelemetryRecord::PlatformRuntimeDone {
2606+
request_id: "req-end".to_string(),
2607+
status: Status::Success,
2608+
error_type: None,
2609+
metrics: None,
2610+
},
2611+
};
2612+
let (aggregator_service, aggregator_handle) = AggregatorService::default();
2613+
tokio::spawn(async move { aggregator_service.run().await });
2614+
processor.process(event, &aggregator_handle).await;
2615+
let batches = aggregator_handle.get_batches().await.unwrap();
2616+
let logs: Vec<serde_json::Value> = serde_json::from_slice(&batches[0]).unwrap();
2617+
assert_eq!(
2618+
logs[0]["message"]["lambda"]["durable_function.execution_status"],
2619+
expected
2620+
);
2621+
}
2622+
}
25812623
}

bottlecap/src/logs/processor.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ impl LogsProcessor {
5353
&update.execution_id,
5454
&update.execution_name,
5555
update.first_invocation,
56+
update.execution_status,
5657
);
5758
let ready_logs = self.take_ready_logs();
5859
if !ready_logs.is_empty()
@@ -68,6 +69,7 @@ impl LogsProcessor {
6869
execution_id: &str,
6970
execution_name: &str,
7071
first_invocation: Option<bool>,
72+
execution_status: Option<String>,
7173
) {
7274
match self {
7375
LogsProcessor::Lambda(p) => {
@@ -76,6 +78,7 @@ impl LogsProcessor {
7678
execution_id,
7779
execution_name,
7880
first_invocation,
81+
execution_status,
7982
);
8083
}
8184
}

bottlecap/src/tags/lambda/tags.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ pub const DURABLE_EXECUTION_ID_KEY: &str = "aws_lambda.durable_function.executio
5656
pub const DURABLE_EXECUTION_NAME_KEY: &str = "aws_lambda.durable_function.execution_name";
5757
pub const DURABLE_FUNCTION_FIRST_INVOCATION_KEY: &str =
5858
"aws_lambda.durable_function.first_invocation";
59+
pub const DURABLE_FUNCTION_EXECUTION_STATUS_KEY: &str =
60+
"aws_lambda.durable_function.execution_status";
5961

6062
const AWS_ACCOUNT_KEY: &str = "aws_account";
6163
const RESOURCE_KEY: &str = "resource";

bottlecap/src/traces/trace_agent.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,12 +606,17 @@ impl TraceAgent {
606606
.meta
607607
.get(tags::lambda::tags::DURABLE_FUNCTION_FIRST_INVOCATION_KEY)
608608
.map(|v| v == "true");
609+
let execution_status = span
610+
.meta
611+
.get(tags::lambda::tags::DURABLE_FUNCTION_EXECUTION_STATUS_KEY)
612+
.cloned();
609613
if let Err(e) = invocation_processor_handle
610614
.forward_durable_context(
611615
request_id.clone(),
612616
execution_id.clone(),
613617
execution_name.clone(),
614618
first_invocation,
619+
execution_status.clone(),
615620
)
616621
.await
617622
{

0 commit comments

Comments
 (0)