Skip to content

Commit 5e92189

Browse files
lym953claude
andauthored
[SVLS-8722] feat(logs): add aws_lambda.durable_function.first_invocation to START/END/REPORT logs (#1158)
## Summary - Extracts the `aws_lambda.durable_function.first_invocation` tag from the `aws.lambda` span (sent by the tracer) and adds it as an attribute on `START`, `END`, and `REPORT` platform logs for durable functions. This tag will be used by UI to identify durable execution start time and apply time filter properly. - The tag flows through the existing `DurableContextUpdate` pipeline: `trace_agent` → `processor_service` → lifecycle processor → logs agent - `START`/`END`/`REPORT` logs are already held until durable context arrives (from #1053); `first_invocation` arrives in the same update, so no additional holding logic is needed - Non-platform (function) logs are unaffected - Renames serialized durable log attributes to the `durable_function.*` namespace for consistency with tag spans ## Test plan ### Automated testing The touched unit tests passed. ### Manual testing `START` and `END` logs for the first invocation have `first_invocation: true`. <img width="532" height="338" alt="image" src="https://github.com/user-attachments/assets/1df450c6-d54f-4e81-89ee-fbedc5e4a48e" /> `START` and `END` logs for subsequent invocations have `first_invocation: false`. <img width="522" height="289" alt="image" src="https://github.com/user-attachments/assets/2488d3cd-d05e-4824-9ab8-707c3249f907" /> Other logs are not affected <img width="518" height="397" alt="image" src="https://github.com/user-attachments/assets/e64d231e-26de-4f2b-a8f0-799ca7fc1dcd" /> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent d668142 commit 5e92189

File tree

7 files changed

+88
-35
lines changed

7 files changed

+88
-35
lines changed

bottlecap/src/lifecycle/invocation/processor.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1375,13 +1375,15 @@ impl Processor {
13751375
request_id: &str,
13761376
execution_id: &str,
13771377
execution_name: &str,
1378+
first_invocation: Option<bool>,
13781379
) {
13791380
if let Err(e) = self
13801381
.durable_context_tx
13811382
.send(DurableContextUpdate {
13821383
request_id: request_id.to_owned(),
13831384
execution_id: execution_id.to_owned(),
13841385
execution_name: execution_name.to_owned(),
1386+
first_invocation,
13851387
})
13861388
.await
13871389
{

bottlecap/src/lifecycle/invocation/processor_service.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ pub enum ProcessorCommand {
115115
request_id: String,
116116
execution_id: String,
117117
execution_name: String,
118+
first_invocation: Option<bool>,
118119
},
119120
OnOutOfMemoryError {
120121
timestamp: i64,
@@ -392,12 +393,14 @@ impl InvocationProcessorHandle {
392393
request_id: String,
393394
execution_id: String,
394395
execution_name: String,
396+
first_invocation: Option<bool>,
395397
) -> Result<(), mpsc::error::SendError<ProcessorCommand>> {
396398
self.sender
397399
.send(ProcessorCommand::ForwardDurableContext {
398400
request_id,
399401
execution_id,
400402
execution_name,
403+
first_invocation,
401404
})
402405
.await
403406
}
@@ -619,9 +622,15 @@ impl InvocationProcessorService {
619622
request_id,
620623
execution_id,
621624
execution_name,
625+
first_invocation,
622626
} => {
623627
self.processor
624-
.forward_durable_context(&request_id, &execution_id, &execution_name)
628+
.forward_durable_context(
629+
&request_id,
630+
&execution_id,
631+
&execution_name,
632+
first_invocation,
633+
)
625634
.await;
626635
}
627636
ProcessorCommand::OnOutOfMemoryError { timestamp } => {

bottlecap/src/logs/lambda/mod.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,15 @@ pub struct DurableContextUpdate {
88
pub request_id: String,
99
pub execution_id: String,
1010
pub execution_name: String,
11+
pub first_invocation: Option<bool>,
1112
}
1213

1314
/// Durable execution context stored per `request_id` in `LambdaProcessor::durable_context_map`.
1415
#[derive(Clone, Debug)]
1516
pub struct DurableExecutionContext {
1617
pub execution_id: String,
1718
pub execution_name: String,
19+
pub first_invocation: Option<bool>,
1820
}
1921

2022
///
@@ -49,10 +51,21 @@ pub struct Message {
4951
pub struct Lambda {
5052
pub arn: String,
5153
pub request_id: Option<String>,
52-
#[serde(skip_serializing_if = "Option::is_none")]
54+
#[serde(
55+
rename = "durable_function.execution_id",
56+
skip_serializing_if = "Option::is_none"
57+
)]
5358
pub durable_execution_id: Option<String>,
54-
#[serde(skip_serializing_if = "Option::is_none")]
59+
#[serde(
60+
rename = "durable_function.execution_name",
61+
skip_serializing_if = "Option::is_none"
62+
)]
5563
pub durable_execution_name: Option<String>,
64+
#[serde(
65+
rename = "durable_function.first_invocation",
66+
skip_serializing_if = "Option::is_none"
67+
)]
68+
pub first_invocation: Option<bool>,
5669
}
5770

5871
impl Message {

bottlecap/src/logs/lambda/processor.rs

Lines changed: 41 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,14 @@ fn is_oom_error(error_msg: &str) -> bool {
8686
.any(|&oom_str| error_msg.contains(oom_str))
8787
}
8888

89+
/// Returns `true` for START, END, and REPORT platform logs.
90+
/// These are the only logs that carry the `first_invocation` attribute.
91+
fn is_platform_log(message: &str) -> bool {
92+
message.starts_with("START RequestId:")
93+
|| message.starts_with("END RequestId:")
94+
|| message.starts_with("REPORT RequestId:")
95+
}
96+
8997
/// Parses a Lambda durable execution ARN and returns `(execution_id, execution_name)`.
9098
///
9199
/// Expected format:
@@ -538,9 +546,10 @@ impl LambdaProcessor {
538546
/// `request_id`.
539547
pub fn insert_to_durable_context_map(
540548
&mut self,
541-
request_id: &str, // key
542-
execution_id: &str, // value
543-
execution_name: &str, // value
549+
request_id: &str, // key
550+
execution_id: &str, // value
551+
execution_name: &str, // value
552+
first_invocation: Option<bool>, // value
544553
) {
545554
if self.durable_context_map.contains_key(request_id) {
546555
error!("LOGS | insert_to_durable_context_map: request_id={request_id} already in map");
@@ -557,6 +566,7 @@ impl LambdaProcessor {
557566
DurableExecutionContext {
558567
execution_id: execution_id.to_string(),
559568
execution_name: execution_name.to_string(),
569+
first_invocation,
560570
},
561571
);
562572
self.drain_held_logs_for_request_id(request_id);
@@ -589,13 +599,8 @@ impl LambdaProcessor {
589599
self.held_logs_order.retain(|r| r != request_id);
590600
let durable_ctx = self.durable_context_map.get(request_id).cloned();
591601
if let Some(ctx) = durable_ctx {
592-
for mut log in held {
593-
log.message.lambda.durable_execution_id = Some(ctx.execution_id.clone());
594-
log.message.lambda.durable_execution_name = Some(ctx.execution_name.clone());
595-
if let Ok(s) = serde_json::to_string(&log) {
596-
drop(log);
597-
self.ready_logs.push(s);
598-
}
602+
for log in held {
603+
self.set_durable_context_and_mark_ready(log, &ctx);
599604
}
600605
}
601606
}
@@ -614,13 +619,8 @@ impl LambdaProcessor {
614619
if let Some(ctx) = durable_ctx {
615620
// If the request_id is in the durable context map, set durable execution id
616621
// and execution name, and add logs to ready_logs.
617-
for mut log in logs {
618-
log.message.lambda.durable_execution_id = Some(ctx.execution_id.clone());
619-
log.message.lambda.durable_execution_name =
620-
Some(ctx.execution_name.clone());
621-
if let Ok(s) = serde_json::to_string(&log) {
622-
self.ready_logs.push(s);
623-
}
622+
for log in logs {
623+
self.set_durable_context_and_mark_ready(log, &ctx);
624624
}
625625
} else {
626626
// No context yet — keep logs in held_logs until the aws.lambda span arrives.
@@ -648,6 +648,26 @@ impl LambdaProcessor {
648648
}
649649
}
650650

651+
/// Applies durable execution context to a log and pushes it to `ready_logs`.
652+
/// `first_invocation` is set only for platform logs (START/END/REPORT).
653+
fn set_durable_context_and_mark_ready(
654+
&mut self,
655+
mut log: IntakeLog,
656+
ctx: &DurableExecutionContext,
657+
) {
658+
log.message.lambda.durable_execution_id = Some(ctx.execution_id.clone());
659+
log.message.lambda.durable_execution_name = Some(ctx.execution_name.clone());
660+
if is_platform_log(&log.message.message) {
661+
log.message.lambda.first_invocation = ctx.first_invocation;
662+
}
663+
if let Ok(s) = serde_json::to_string(&log) {
664+
// explicitly drop log so we don't accidentally re-use it and push
665+
// duplicate logs to the aggregator
666+
drop(log);
667+
self.ready_logs.push(s);
668+
}
669+
}
670+
651671
/// Stashes a log in `held_logs` under `request_id`, waiting for durable context.
652672
///
653673
/// If `held_logs` is at capacity and `request_id` is a new key, the oldest key is evicted:
@@ -685,7 +705,7 @@ impl LambdaProcessor {
685705
/// - `Some(false)` → serialize and push straight to `ready_logs`.
686706
/// - `Some(true)` → mark this log as ready to be aggregated if its `request_id` is already in `durable_context_map`
687707
/// (context was populated by an `aws.lambda` span); otherwise stash in `held_logs`.
688-
fn queue_log_after_rules(&mut self, mut log: IntakeLog) {
708+
fn queue_log_after_rules(&mut self, log: IntakeLog) {
689709
// Durable execution SDK logs already carry execution context extracted from executionArn.
690710
if log.message.lambda.durable_execution_id.is_some() {
691711
if let Ok(serialized_log) = serde_json::to_string(&log) {
@@ -730,14 +750,7 @@ impl LambdaProcessor {
730750

731751
match durable_ctx {
732752
Some(ctx) => {
733-
log.message.lambda.durable_execution_id = Some(ctx.execution_id);
734-
log.message.lambda.durable_execution_name = Some(ctx.execution_name);
735-
if let Ok(serialized_log) = serde_json::to_string(&log) {
736-
// explicitly drop log so we don't accidentally re-use it and push
737-
// duplicate logs to the aggregator
738-
drop(log);
739-
self.ready_logs.push(serialized_log);
740-
}
753+
self.set_durable_context_and_mark_ready(log, &ctx);
741754
}
742755
None => {
743756
if let Some(rid) = log.message.lambda.request_id.clone() {
@@ -2515,11 +2528,11 @@ mod tests {
25152528
assert_eq!(batches.len(), 1);
25162529
let logs: Vec<serde_json::Value> = serde_json::from_slice(&batches[0]).unwrap();
25172530
assert_eq!(
2518-
logs[0]["message"]["lambda"]["durable_execution_id"],
2531+
logs[0]["message"]["lambda"]["durable_function.execution_id"],
25192532
"my-id"
25202533
);
25212534
assert_eq!(
2522-
logs[0]["message"]["lambda"]["durable_execution_name"],
2535+
logs[0]["message"]["lambda"]["durable_function.execution_name"],
25232536
"my-name"
25242537
);
25252538
}

bottlecap/src/logs/processor.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ impl LogsProcessor {
5252
&update.request_id,
5353
&update.execution_id,
5454
&update.execution_name,
55+
update.first_invocation,
5556
);
5657
let ready_logs = self.take_ready_logs();
5758
if !ready_logs.is_empty()
@@ -66,10 +67,16 @@ impl LogsProcessor {
6667
request_id: &str,
6768
execution_id: &str,
6869
execution_name: &str,
70+
first_invocation: Option<bool>,
6971
) {
7072
match self {
7173
LogsProcessor::Lambda(p) => {
72-
p.insert_to_durable_context_map(request_id, execution_id, execution_name);
74+
p.insert_to_durable_context_map(
75+
request_id,
76+
execution_id,
77+
execution_name,
78+
first_invocation,
79+
);
7380
}
7481
}
7582
}

bottlecap/src/tags/lambda/tags.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ const ACCOUNT_ID_KEY: &str = "account_id";
5454
pub const REQUEST_ID_KEY: &str = "request_id";
5555
pub const DURABLE_EXECUTION_ID_KEY: &str = "aws_lambda.durable_function.execution_id";
5656
pub const DURABLE_EXECUTION_NAME_KEY: &str = "aws_lambda.durable_function.execution_name";
57+
pub const DURABLE_FUNCTION_FIRST_INVOCATION_KEY: &str =
58+
"aws_lambda.durable_function.first_invocation";
5759

5860
const AWS_ACCOUNT_KEY: &str = "aws_account";
5961
const RESOURCE_KEY: &str = "resource";

bottlecap/src/traces/trace_agent.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -599,15 +599,22 @@ impl TraceAgent {
599599
span.meta
600600
.get(tags::lambda::tags::DURABLE_EXECUTION_NAME_KEY),
601601
)
602-
&& let Err(e) = invocation_processor_handle
602+
{
603+
let first_invocation = span
604+
.meta
605+
.get(tags::lambda::tags::DURABLE_FUNCTION_FIRST_INVOCATION_KEY)
606+
.map(|v| v == "true");
607+
if let Err(e) = invocation_processor_handle
603608
.forward_durable_context(
604609
request_id.clone(),
605610
execution_id.clone(),
606611
execution_name.clone(),
612+
first_invocation,
607613
)
608614
.await
609-
{
610-
error!("Failed to forward durable context to processor: {e}");
615+
{
616+
error!("Failed to forward durable context to processor: {e}");
617+
}
611618
}
612619

613620
handle_reparenting(&mut reparenting_info, &mut span);

0 commit comments

Comments
 (0)