Skip to content

Commit 9f45761

Browse files
authored
Aj/fix shutdown deadlock (#746)
We added this check for the platform.Report item because we'd occasionally race past it when shutting down, leading to missing metrics (namely post-runtime duration for the last request). This predominantly affected hourly/infrequent functions. But that change introduced a race condition where sometimes Lambda would respond to /next with the shutdown event at the same time we'd receive the last invocation's report event. In some cases, Tokio would run the report event first followed by the response to /next, where then we'd hit the shutdown event bus loop and wait for a report event that was never coming. This change also adds a 300ms timeout in case of a deadlock. Jira: https://datadoghq.atlassian.net/browse/SLES-2151
1 parent 25ec01f commit 9f45761

3 files changed

Lines changed: 39 additions & 20 deletions

File tree

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -669,28 +669,26 @@ async fn extension_loop_active(
669669
}
670670
}
671671

672-
if let NextEventResponse::Shutdown {
673-
shutdown_reason, ..
674-
} = maybe_shutdown_event
675-
{
672+
if let NextEventResponse::Shutdown { .. } = maybe_shutdown_event {
676673
// Redrive/block on any failed payloads
677674
let tf = trace_flusher.clone();
678675
pending_flush_handles
679676
.await_flush_handles(&logs_flusher.clone(), &tf, &metrics_flushers)
680677
.await;
681-
// The Shutdown event we get during a timeout will
682-
// never include a report log
683-
if shutdown_reason != "timeout" {
684-
'shutdown: loop {
685-
tokio::select! {
686-
Some(event) = event_bus.rx.recv() => {
687-
if let Some(telemetry_event) = handle_event_bus_event(event, invocation_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await {
688-
if let TelemetryRecord::PlatformReport{ .. } = telemetry_event.record {
689-
// Wait for the report event before shutting down
690-
break 'shutdown;
691-
}
692-
}
678+
// Wait for tombstone event from telemetry listener to ensure all events are processed
679+
'shutdown: loop {
680+
tokio::select! {
681+
Some(event) = event_bus.rx.recv() => {
682+
if let Event::Telemetry(TelemetryEvent { record: TelemetryRecord::PlatformTombstone, .. }) = event {
683+
debug!("Received tombstone event, proceeding with shutdown");
684+
break 'shutdown;
693685
}
686+
handle_event_bus_event(event, invocation_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await;
687+
}
688+
// Add timeout to prevent hanging indefinitely
689+
() = tokio::time::sleep(tokio::time::Duration::from_millis(300)) => {
690+
debug!("Timeout waiting for tombstone event, proceeding with shutdown");
691+
break 'shutdown;
694692
}
695693
}
696694
}

bottlecap/src/telemetry/events.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,10 @@ pub enum TelemetryRecord {
150150
/// When unsuccessful, the `error_type` describes what kind of error occurred
151151
error_type: Option<String>,
152152
},
153+
154+
/// Tombstone event to signal shutdown
155+
#[serde(rename = "platform.tombstone")]
156+
PlatformTombstone,
153157
}
154158

155159
/// Type of Initialization

bottlecap/src/telemetry/listener.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
22
http::{extract_request_body, handler_not_found},
3-
telemetry::events::TelemetryEvent,
3+
telemetry::events::{TelemetryEvent, TelemetryRecord},
44
};
55

66
use axum::{
@@ -10,6 +10,7 @@ use axum::{
1010
routing::post,
1111
Router,
1212
};
13+
use chrono::Utc;
1314
use std::net::SocketAddr;
1415
use tokio::{net::TcpListener, sync::mpsc::Sender};
1516
use tokio_util::sync::CancellationToken;
@@ -46,13 +47,17 @@ impl TelemetryListener {
4647
let router = self.make_router();
4748

4849
let cancel_token_clone = self.cancel_token();
50+
let event_bus_clone = self.event_bus.clone();
4951
tokio::spawn(async move {
5052
let listener = TcpListener::bind(&socket)
5153
.await
5254
.expect("Failed to bind socket");
5355
debug!("Telemetry API | Starting listener on {}", socket);
5456
axum::serve(listener, router)
55-
.with_graceful_shutdown(Self::graceful_shutdown(cancel_token_clone))
57+
.with_graceful_shutdown(Self::graceful_shutdown(
58+
cancel_token_clone,
59+
event_bus_clone,
60+
))
5661
.await
5762
.expect("Failed to start telemetry listener");
5863
});
@@ -69,9 +74,21 @@ impl TelemetryListener {
6974
.with_state(event_bus)
7075
}
7176

72-
async fn graceful_shutdown(cancel_token: CancellationToken) {
77+
async fn graceful_shutdown(cancel_token: CancellationToken, event_bus: Sender<TelemetryEvent>) {
7378
cancel_token.cancelled().await;
74-
debug!("Telemetry API | Shutdown signal received, shutting down");
79+
debug!("Telemetry API | Shutdown signal received, sending tombstone event");
80+
81+
// Send tombstone event to signal shutdown
82+
let tombstone_event = TelemetryEvent {
83+
time: Utc::now(),
84+
record: TelemetryRecord::PlatformTombstone,
85+
};
86+
87+
if let Err(e) = event_bus.send(tombstone_event).await {
88+
debug!("Failed to send tombstone event: {:?}", e);
89+
}
90+
91+
debug!("Telemetry API | Shutting down");
7592
}
7693

7794
async fn handle(State(event_bus): State<Sender<TelemetryEvent>>, request: Request) -> Response {

0 commit comments

Comments
 (0)