Skip to content

Commit a3d3b1f

Browse files
authored
Combine handle_next_invocation and next_event. Don't block for PlatformReport on timeout shutdowns (#692)
1. Refactors handle_next_invocation to call next_event. The methods are separate because we call next_event in an idle loop, but otherwise we always call them together from main, so this simplifies things 2. In the shutdown loop, only block for the report line if the shutdown isn't a timeout. On timeouts, we won't get a report log. That's not a problem for most use cases where the sandbox will re-initailize, the telemetry API will re-pass events, and then we can forward events on the next invocation. But if a function continuosly times out, we may not forward the custom `task timed out` log until a few invocations down the line. ~I'll verify if this is true with OOMs~ OOMs can be variable, so i've added a new line to the END log to explain the status reason. For node it's a runtimeExit: <img width="941" alt="image" src="https://github.com/user-attachments/assets/e22a56e1-9276-488d-b333-29ec644489bd" />
1 parent 1bd3b67 commit a3d3b1f

3 files changed

Lines changed: 52 additions & 29 deletions

File tree

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 38 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -496,12 +496,11 @@ async fn extension_loop_active(
496496
start_time.elapsed().as_millis().to_string()
497497
);
498498
// first invoke we must call next
499-
let next_lambda_response = next_event(client, &r.extension_id).await;
500499
let mut pending_flush_handles = PendingFlushHandles::new();
501500
let mut last_continuous_flush_error = false;
502-
handle_next_invocation(next_lambda_response, invocation_processor.clone()).await;
501+
handle_next_invocation(client, &r.extension_id, invocation_processor.clone()).await;
503502
loop {
504-
let shutdown;
503+
let maybe_shutdown_event;
505504

506505
let current_flush_decision = flush_control.evaluate_flush_decision();
507506
if current_flush_decision == FlushDecision::End {
@@ -542,8 +541,8 @@ async fn extension_loop_active(
542541
&mut race_flush_interval,
543542
)
544543
.await;
545-
let next_response = next_event(client, &r.extension_id).await;
546-
shutdown = handle_next_invocation(next_response, invocation_processor.clone()).await;
544+
maybe_shutdown_event =
545+
handle_next_invocation(client, &r.extension_id, invocation_processor.clone()).await;
547546
} else {
548547
//Periodic flush scenario, flush at top of invocation
549548
if current_flush_decision == FlushDecision::Continuous && !last_continuous_flush_error {
@@ -593,7 +592,8 @@ async fn extension_loop_active(
593592
// If we get platform.runtimeDone or platform.runtimeReport
594593
// That's fine, we still wait to break until we get the response from next
595594
// and then we break to determine if we'll flush or not
596-
let next_lambda_response = next_event(client, &r.extension_id);
595+
let next_lambda_response =
596+
handle_next_invocation(client, &r.extension_id, invocation_processor.clone());
597597
tokio::pin!(next_lambda_response);
598598
'next_invocation: loop {
599599
tokio::select! {
@@ -607,7 +607,7 @@ async fn extension_loop_active(
607607
race_flush_interval.reset();
608608
// Thank you for not removing race_flush_interval.reset();
609609

610-
shutdown = handle_next_invocation(next_response, invocation_processor.clone()).await;
610+
maybe_shutdown_event= next_response;
611611
// Need to break here to re-call next
612612
break 'next_invocation;
613613
}
@@ -629,19 +629,26 @@ async fn extension_loop_active(
629629
}
630630
}
631631

632-
if shutdown {
632+
if let NextEventResponse::Shutdown {
633+
shutdown_reason, ..
634+
} = maybe_shutdown_event
635+
{
633636
// Redrive/block on any failed payloads
634637
let tf = trace_flusher.clone();
635638
pending_flush_handles
636639
.await_flush_handles(&logs_flusher.clone(), &tf, &metrics_flusher)
637640
.await;
638-
'shutdown: loop {
639-
tokio::select! {
640-
Some(event) = event_bus.rx.recv() => {
641-
if let Some(telemetry_event) = handle_event_bus_event(event, invocation_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await {
642-
if let TelemetryRecord::PlatformReport{ .. } = telemetry_event.record {
643-
// Wait for the report event before shutting down
644-
break 'shutdown;
641+
// The Shutdown event we get during a timeout will
642+
// never include a report log
643+
if shutdown_reason != "timeout" {
644+
'shutdown: loop {
645+
tokio::select! {
646+
Some(event) = event_bus.rx.recv() => {
647+
if let Some(telemetry_event) = handle_event_bus_event(event, invocation_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await {
648+
if let TelemetryRecord::PlatformReport{ .. } = telemetry_event.record {
649+
// Wait for the report event before shutting down
650+
break 'shutdown;
651+
}
645652
}
646653
}
647654
}
@@ -759,39 +766,44 @@ async fn handle_event_bus_event(
759766
}
760767

761768
async fn handle_next_invocation(
762-
next_response: Result<NextEventResponse>,
769+
client: &Client,
770+
ext_id: &str,
763771
invocation_processor: Arc<TokioMutex<InvocationProcessor>>,
764-
) -> bool {
772+
) -> NextEventResponse {
773+
let next_response = next_event(client, ext_id).await;
765774
match next_response {
766775
Ok(NextEventResponse::Invoke {
767-
request_id,
776+
ref request_id,
768777
deadline_ms,
769-
invoked_function_arn,
778+
ref invoked_function_arn,
770779
}) => {
771780
debug!(
772781
"Invoke event {}; deadline: {}, invoked_function_arn: {}",
773-
request_id, deadline_ms, invoked_function_arn
782+
request_id.clone(),
783+
deadline_ms,
784+
invoked_function_arn.clone()
774785
);
775786
let mut p = invocation_processor.lock().await;
776-
p.on_invoke_event(request_id);
787+
p.on_invoke_event(request_id.into());
777788
drop(p);
778-
false
779789
}
780790
Ok(NextEventResponse::Shutdown {
781-
shutdown_reason,
791+
ref shutdown_reason,
782792
deadline_ms,
783793
}) => {
784794
let mut p = invocation_processor.lock().await;
785795
p.on_shutdown_event();
786796
println!("Exiting: {shutdown_reason}, deadline: {deadline_ms}");
787-
true
788797
}
789-
Err(err) => {
798+
Err(ref err) => {
790799
eprintln!("Error: {err:?}");
791800
println!("Exiting");
792-
true
793801
}
794802
}
803+
next_response.unwrap_or(NextEventResponse::Shutdown {
804+
shutdown_reason: "panic".into(),
805+
deadline_ms: 0,
806+
})
795807
}
796808

797809
fn setup_tag_provider(

bottlecap/src/logs/lambda/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ impl Message {
4343
request_id: Option<String>,
4444
function_arn: String,
4545
timestamp: i64,
46+
status: Option<String>,
4647
) -> Message {
4748
Message {
4849
message,
@@ -51,7 +52,7 @@ impl Message {
5152
request_id,
5253
},
5354
timestamp,
54-
status: "info".to_string(),
55+
status: status.unwrap_or("info".to_string()),
5556
}
5657
}
5758
}

bottlecap/src/logs/lambda/processor.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ impl LambdaProcessor {
103103
None,
104104
self.function_arn.clone(),
105105
event.time.timestamp_millis(),
106+
None,
106107
));
107108
}
108109

@@ -125,6 +126,7 @@ impl LambdaProcessor {
125126
None,
126127
self.function_arn.clone(),
127128
event.time.timestamp_millis(),
129+
None,
128130
))
129131
},
130132
// TODO: check if we could do anything with the fields from `PlatformInitReport`
@@ -153,18 +155,24 @@ impl LambdaProcessor {
153155
Some(request_id),
154156
self.function_arn.clone(),
155157
event.time.timestamp_millis(),
158+
None,
156159
))
157160
},
158-
TelemetryRecord::PlatformRuntimeDone { request_id, status, metrics, .. } => { // TODO: check what to do with rest of the fields
161+
TelemetryRecord::PlatformRuntimeDone { request_id, status, metrics, error_type, .. } => { // TODO: check what to do with rest of the fields
159162
if let Err(e) = self.event_bus.send(Event::Telemetry(copy)).await {
160163
error!("Failed to send PlatformRuntimeDone to the main event bus: {}", e);
161164
}
162165

163166
let mut message = format!("END RequestId: {request_id}");
167+
let mut result_status = "info".to_string();
164168
if let Some(metrics) = metrics {
165169
self.invocation_context.runtime_duration_ms = metrics.duration_ms;
166170
if status == Status::Timeout {
167171
message.push_str(&format!(" Task timed out after {:.2} seconds", metrics.duration_ms / 1000.0));
172+
result_status = "error".to_string();
173+
} else if status == Status::Error {
174+
message.push_str(&format!(" Task failed: {:?}", error_type.unwrap_or_default()));
175+
result_status = "error".to_string();
168176
}
169177
}
170178
// Remove the `request_id` since no more orphan logs will be processed with this one
@@ -175,6 +183,7 @@ impl LambdaProcessor {
175183
Some(request_id),
176184
self.function_arn.clone(),
177185
event.time.timestamp_millis(),
186+
Some(result_status),
178187
))
179188
},
180189
TelemetryRecord::PlatformReport { request_id, metrics, .. } => { // TODO: check what to do with rest of the fields
@@ -209,6 +218,7 @@ impl LambdaProcessor {
209218
Some(request_id),
210219
self.function_arn.clone(),
211220
event.time.timestamp_millis(),
221+
None,
212222
))
213223
},
214224
// TODO: PlatformInitRuntimeDone
@@ -522,7 +532,7 @@ mod tests {
522532
request_id: Some("test-request-id".to_string()),
523533
},
524534
timestamp: 1_673_061_827_000,
525-
status: "info".to_string(),
535+
status: "error".to_string(),
526536
},
527537
),
528538

0 commit comments

Comments
 (0)