Skip to content

Commit 6ed9143

Browse files
committed
fix(client): bound select-phase watchdog stalls
1 parent 7227957 commit 6ed9143

1 file changed

Lines changed: 43 additions & 2 deletions

File tree

crates/slipstream-client/src/runtime.rs

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ const HEALTH_LOG_INTERVAL_US: u64 = 300_000_000;
7777
const WATCHDOG_STALE_SECS: u64 = 15;
7878
const WATCHDOG_CHECK_INTERVAL: Duration = Duration::from_secs(3);
7979
const WATCHDOG_ABORT_STRIKES: u32 = 3;
80+
const WATCHDOG_SELECT_RECONNECT_SECS: u64 = 30;
81+
const WATCHDOG_SELECT_ABORT_SECS: u64 = 180;
8082
const ACTIVE_PATH_LOSS_RECONNECT_STREAMS: usize = 32;
8183

8284
/// Watchdog that runs on a separate OS thread (not tokio) to detect when the
@@ -85,11 +87,12 @@ const ACTIVE_PATH_LOSS_RECONNECT_STREAMS: usize = 32;
8587
/// the watchdog emits warnings and only aborts in non-select phases.
8688
///
8789
/// `PHASE_SELECT` may legitimately appear stale under host scheduler jitter,
88-
/// so aborting there creates false outages.
90+
/// so we first request reconnect; abort is only a last-resort cap.
8991
struct Watchdog {
9092
heartbeat: Arc<AtomicU64>,
9193
phase: Arc<AtomicU32>,
9294
alive: Arc<AtomicBool>,
95+
select_reconnect_requested: Arc<AtomicBool>,
9396
_handle: std::thread::JoinHandle<()>,
9497
}
9598

@@ -125,9 +128,11 @@ impl Watchdog {
125128
let heartbeat = Arc::new(AtomicU64::new(0));
126129
let phase = Arc::new(AtomicU32::new(0));
127130
let alive = Arc::new(AtomicBool::new(true));
131+
let select_reconnect_requested = Arc::new(AtomicBool::new(false));
128132
let hb = Arc::clone(&heartbeat);
129133
let ph = Arc::clone(&phase);
130134
let al = Arc::clone(&alive);
135+
let reconnect_flag = Arc::clone(&select_reconnect_requested);
131136
let handle = std::thread::Builder::new()
132137
.name("watchdog".into())
133138
.spawn(move || {
@@ -173,8 +178,26 @@ impl Watchdog {
173178
let stuck_phase = ph.load(Ordering::Relaxed);
174179
let phase_name = phase_name(stuck_phase);
175180
if stuck_phase == PHASE_SELECT {
181+
if stale_us >= WATCHDOG_SELECT_RECONNECT_SECS * 1_000_000 {
182+
let first_request = !reconnect_flag.swap(true, Ordering::Relaxed);
183+
if first_request {
184+
eprintln!(
185+
"WATCHDOG: select phase stale for {:.1}s; requesting reconnect",
186+
stale_us as f64 / 1_000_000.0,
187+
);
188+
}
189+
}
190+
if stale_us >= WATCHDOG_SELECT_ABORT_SECS * 1_000_000 {
191+
eprintln!(
192+
"WATCHDOG: select phase stalled for {:.1}s (phase {} / {}), aborting as last resort",
193+
stale_us as f64 / 1_000_000.0,
194+
stuck_phase,
195+
phase_name,
196+
);
197+
std::process::abort();
198+
}
176199
eprintln!(
177-
"WATCHDOG: stale heartbeat {:.1}s at phase {} ({}), strikes={}, waiting (select phase never aborts)",
200+
"WATCHDOG: stale heartbeat {:.1}s at phase {} ({}), strikes={}, waiting (select phase uses reconnect-first policy)",
178201
stale_us as f64 / 1_000_000.0,
179202
stuck_phase,
180203
phase_name,
@@ -209,6 +232,7 @@ impl Watchdog {
209232
heartbeat,
210233
phase,
211234
alive,
235+
select_reconnect_requested,
212236
_handle: handle,
213237
}
214238
}
@@ -221,6 +245,11 @@ impl Watchdog {
221245
fn set_phase(&self, p: u32) {
222246
self.phase.store(p, Ordering::Relaxed);
223247
}
248+
249+
fn take_select_reconnect_requested(&self) -> bool {
250+
self.select_reconnect_requested
251+
.swap(false, Ordering::Relaxed)
252+
}
224253
}
225254

226255
impl Drop for Watchdog {
@@ -467,6 +496,12 @@ pub async fn run_client(config: &ClientConfig<'_>) -> Result<i32, ClientError> {
467496
watchdog.pet();
468497
watchdog.set_phase(PHASE_DRAIN_COMMANDS);
469498
let current_time = unsafe { picoquic_current_time() };
499+
if watchdog.take_select_reconnect_requested() {
500+
warn!(
501+
"watchdog requested reconnect after select-phase stall; recycling connection"
502+
);
503+
break;
504+
}
470505
// Only process application commands after the QUIC handshake
471506
// completes. During reconnect the acceptor may queue NewStream
472507
// commands while the connection is still in initial state;
@@ -694,6 +729,12 @@ pub async fn run_client(config: &ClientConfig<'_>) -> Result<i32, ClientError> {
694729
);
695730
}
696731
watchdog.pet();
732+
if watchdog.take_select_reconnect_requested() {
733+
warn!(
734+
"watchdog requested reconnect after select-phase stall; recycling connection"
735+
);
736+
break;
737+
}
697738

698739
watchdog.set_phase(PHASE_POST_DRAIN);
699740
if unsafe { (*state_ptr).is_ready() } {

0 commit comments

Comments
 (0)