Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1405,6 +1405,7 @@ impl Processor {
execution_id: &str,
execution_name: &str,
first_invocation: Option<bool>,
execution_status: Option<String>,
) {
if let Err(e) = self
.durable_context_tx
Expand All @@ -1413,6 +1414,7 @@ impl Processor {
execution_id: execution_id.to_owned(),
execution_name: execution_name.to_owned(),
first_invocation,
execution_status,
})
.await
{
Expand Down
5 changes: 5 additions & 0 deletions bottlecap/src/lifecycle/invocation/processor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ pub enum ProcessorCommand {
execution_id: String,
execution_name: String,
first_invocation: Option<bool>,
execution_status: Option<String>,
},
OnOutOfMemoryError {
timestamp: i64,
Expand Down Expand Up @@ -391,13 +392,15 @@ impl InvocationProcessorHandle {
execution_id: String,
execution_name: String,
first_invocation: Option<bool>,
execution_status: Option<String>,
) -> Result<(), mpsc::error::SendError<ProcessorCommand>> {
self.sender
.send(ProcessorCommand::ForwardDurableContext {
request_id,
execution_id,
execution_name,
first_invocation,
execution_status,
})
.await
}
Expand Down Expand Up @@ -617,13 +620,15 @@ impl InvocationProcessorService {
execution_id,
execution_name,
first_invocation,
execution_status,
} => {
self.processor
.forward_durable_context(
&request_id,
&execution_id,
&execution_name,
first_invocation,
execution_status,
)
.await;
}
Expand Down
7 changes: 7 additions & 0 deletions bottlecap/src/logs/lambda/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub struct DurableContextUpdate {
pub execution_id: String,
pub execution_name: String,
pub first_invocation: Option<bool>,
pub execution_status: Option<String>,
}

/// Durable execution context stored per `request_id` in `LambdaProcessor::durable_context_map`.
Expand All @@ -17,6 +18,7 @@ pub struct DurableExecutionContext {
pub execution_id: String,
pub execution_name: String,
pub first_invocation: Option<bool>,
pub execution_status: Option<String>,
}

///
Expand Down Expand Up @@ -66,6 +68,11 @@ pub struct Lambda {
skip_serializing_if = "Option::is_none"
)]
pub first_invocation: Option<bool>,
#[serde(
rename = "durable_function.execution_status",
skip_serializing_if = "Option::is_none"
)]
pub durable_execution_status: Option<String>,
}

impl Message {
Expand Down
53 changes: 49 additions & 4 deletions bottlecap/src/logs/lambda/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,10 +546,11 @@ impl LambdaProcessor {
/// `request_id`.
pub fn insert_to_durable_context_map(
&mut self,
request_id: &str, // key
execution_id: &str, // value
execution_name: &str, // value
first_invocation: Option<bool>, // value
request_id: &str, // key
execution_id: &str, // value
execution_name: &str, // value
first_invocation: Option<bool>, // value
execution_status: Option<String>, // value
) {
if self.durable_context_map.contains_key(request_id) {
error!("LOGS | insert_to_durable_context_map: request_id={request_id} already in map");
Expand All @@ -567,6 +568,7 @@ impl LambdaProcessor {
execution_id: execution_id.to_string(),
execution_name: execution_name.to_string(),
first_invocation,
execution_status,
},
);
self.drain_held_logs_for_request_id(request_id);
Expand Down Expand Up @@ -660,6 +662,12 @@ impl LambdaProcessor {
if is_platform_log(&log.message.message) {
log.message.lambda.first_invocation = ctx.first_invocation;
}
if log.message.message.starts_with("END RequestId:") {
log.message
.lambda
.durable_execution_status
.clone_from(&ctx.execution_status);
}
Comment thread
lym953 marked this conversation as resolved.
if let Ok(s) = serde_json::to_string(&log) {
// explicitly drop log so we don't accidentally re-use it and push
// duplicate logs to the aggregator
Expand Down Expand Up @@ -2578,4 +2586,41 @@ mod tests {
let batches = aggregator_handle.get_batches().await.unwrap();
assert!(batches.is_empty());
}

#[tokio::test]
async fn test_execution_status_on_end_log() {
for (execution_status, expected) in [
(Some("SUCCEEDED"), serde_json::json!("SUCCEEDED")),
(None, serde_json::Value::Null),
] {
let mut processor = make_processor_for_durable_arn_tests();
processor.is_durable_function = Some(true);
processor.invocation_context.request_id = "req-end".to_string();
processor.insert_to_durable_context_map(
"req-end",
"exec-id-123",
"exec-name-abc",
Some(false),
execution_status.map(str::to_string),
);
let event = TelemetryEvent {
time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(),
record: TelemetryRecord::PlatformRuntimeDone {
request_id: "req-end".to_string(),
status: Status::Success,
error_type: None,
metrics: None,
},
};
let (aggregator_service, aggregator_handle) = AggregatorService::default();
tokio::spawn(async move { aggregator_service.run().await });
processor.process(event, &aggregator_handle).await;
let batches = aggregator_handle.get_batches().await.unwrap();
let logs: Vec<serde_json::Value> = serde_json::from_slice(&batches[0]).unwrap();
assert_eq!(
logs[0]["message"]["lambda"]["durable_function.execution_status"],
expected
);
}
Comment thread
lym953 marked this conversation as resolved.
}
}
3 changes: 3 additions & 0 deletions bottlecap/src/logs/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ impl LogsProcessor {
&update.execution_id,
&update.execution_name,
update.first_invocation,
update.execution_status,
);
let ready_logs = self.take_ready_logs();
if !ready_logs.is_empty()
Expand All @@ -68,6 +69,7 @@ impl LogsProcessor {
execution_id: &str,
execution_name: &str,
first_invocation: Option<bool>,
execution_status: Option<String>,
) {
match self {
LogsProcessor::Lambda(p) => {
Expand All @@ -76,6 +78,7 @@ impl LogsProcessor {
execution_id,
execution_name,
first_invocation,
execution_status,
);
}
}
Expand Down
2 changes: 2 additions & 0 deletions bottlecap/src/tags/lambda/tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub const DURABLE_EXECUTION_ID_KEY: &str = "aws_lambda.durable_function.executio
pub const DURABLE_EXECUTION_NAME_KEY: &str = "aws_lambda.durable_function.execution_name";
pub const DURABLE_FUNCTION_FIRST_INVOCATION_KEY: &str =
"aws_lambda.durable_function.first_invocation";
pub const DURABLE_FUNCTION_EXECUTION_STATUS_KEY: &str =
"aws_lambda.durable_function.execution_status";

const AWS_ACCOUNT_KEY: &str = "aws_account";
const RESOURCE_KEY: &str = "resource";
Expand Down
5 changes: 5 additions & 0 deletions bottlecap/src/traces/trace_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,12 +606,17 @@ impl TraceAgent {
.meta
.get(tags::lambda::tags::DURABLE_FUNCTION_FIRST_INVOCATION_KEY)
.map(|v| v == "true");
let execution_status = span
.meta
.get(tags::lambda::tags::DURABLE_FUNCTION_EXECUTION_STATUS_KEY)
.cloned();
if let Err(e) = invocation_processor_handle
.forward_durable_context(
request_id.clone(),
execution_id.clone(),
execution_name.clone(),
first_invocation,
execution_status,
)
.await
{
Expand Down
Loading