Skip to content

Commit 100f1c1

Browse files
committed
fix(client): reconnect before aborting select-phase stalls
1 parent a206800 commit 100f1c1

1 file changed

Lines changed: 72 additions & 2 deletions

File tree

crates/slipstream-client/src/runtime.rs

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,15 +78,20 @@ const RECONNECT_BURST_WARN_COUNT: u32 = 3;
7878
const HEALTH_LOG_INTERVAL_US: u64 = 300_000_000;
7979
const WATCHDOG_STALE_SECS: u64 = 15;
8080
const WATCHDOG_CHECK_INTERVAL: Duration = Duration::from_secs(3);
81+
const WATCHDOG_ABORT_STRIKES: u32 = 3;
82+
const WATCHDOG_SELECT_RECONNECT_SECS: u64 = 30;
83+
const WATCHDOG_SELECT_ABORT_SECS: u64 = 180;
8184

8285
/// Watchdog that runs on a separate OS thread (not tokio) to detect when the
8386
/// single-threaded tokio runtime freezes (e.g. a picoquic C FFI call hangs).
8487
/// If the main loop hasn't updated the heartbeat for WATCHDOG_STALE_SECS,
85-
/// the watchdog aborts the process so systemd can restart it.
88+
/// the watchdog requests reconnect for select/sleep stalls first, then aborts
89+
/// only as a last resort.
8690
struct Watchdog {
8791
heartbeat: Arc<AtomicU64>,
8892
phase: Arc<AtomicU32>,
8993
alive: Arc<AtomicBool>,
94+
select_reconnect_requested: Arc<AtomicBool>,
9095
_handle: std::thread::JoinHandle<()>,
9196
}
9297

@@ -122,13 +127,16 @@ impl Watchdog {
122127
let heartbeat = Arc::new(AtomicU64::new(0));
123128
let phase = Arc::new(AtomicU32::new(0));
124129
let alive = Arc::new(AtomicBool::new(true));
130+
let select_reconnect_requested = Arc::new(AtomicBool::new(false));
125131
let hb = Arc::clone(&heartbeat);
126132
let ph = Arc::clone(&phase);
127133
let al = Arc::clone(&alive);
134+
let reconnect_flag = Arc::clone(&select_reconnect_requested);
128135
let handle = std::thread::Builder::new()
129136
.name("watchdog".into())
130137
.spawn(move || {
131138
let mut last_check = Instant::now();
139+
let mut stale_strikes = 0u32;
132140
while al.load(Ordering::Relaxed) {
133141
std::thread::sleep(WATCHDOG_CHECK_INTERVAL);
134142
if !al.load(Ordering::Relaxed) {
@@ -137,6 +145,7 @@ impl Watchdog {
137145
let now_instant = Instant::now();
138146
let ts = hb.load(Ordering::Relaxed);
139147
if ts == 0 {
148+
stale_strikes = 0;
140149
last_check = now_instant;
141150
continue;
142151
}
@@ -150,6 +159,7 @@ impl Watchdog {
150159
if stale_us > WATCHDOG_STALE_SECS * 1_000_000
151160
&& own_sleep_us > expected_sleep_us * 3
152161
{
162+
stale_strikes = 0;
153163
let stuck_phase = ph.load(Ordering::Relaxed);
154164
eprintln!(
155165
"WATCHDOG: VPS suspend detected ({:.1}s gap, own sleep {:.1}s), \
@@ -163,14 +173,56 @@ impl Watchdog {
163173
continue;
164174
}
165175
if stale_us > WATCHDOG_STALE_SECS * 1_000_000 {
176+
stale_strikes = stale_strikes.saturating_add(1);
166177
let stuck_phase = ph.load(Ordering::Relaxed);
178+
if stuck_phase == PHASE_SELECT {
179+
if stale_us >= WATCHDOG_SELECT_RECONNECT_SECS * 1_000_000 {
180+
let first_request = !reconnect_flag.swap(true, Ordering::Relaxed);
181+
if first_request {
182+
eprintln!(
183+
"WATCHDOG: select phase stale for {:.1}s; requesting reconnect",
184+
stale_us as f64 / 1_000_000.0,
185+
);
186+
}
187+
}
188+
if stale_us >= WATCHDOG_SELECT_ABORT_SECS * 1_000_000 {
189+
eprintln!(
190+
"WATCHDOG: select phase stalled for {:.1}s at phase {} ({}), aborting as last resort",
191+
stale_us as f64 / 1_000_000.0,
192+
stuck_phase,
193+
phase_name(stuck_phase),
194+
);
195+
std::process::abort();
196+
}
197+
eprintln!(
198+
"WATCHDOG: stale heartbeat {:.1}s at phase {} ({}), strikes={}, waiting (select phase uses reconnect-first policy)",
199+
stale_us as f64 / 1_000_000.0,
200+
stuck_phase,
201+
phase_name(stuck_phase),
202+
stale_strikes,
203+
);
204+
continue;
205+
}
206+
if stale_strikes < WATCHDOG_ABORT_STRIKES {
207+
eprintln!(
208+
"WATCHDOG: stale heartbeat {:.1}s at phase {} ({}), strikes={}, waiting",
209+
stale_us as f64 / 1_000_000.0,
210+
stuck_phase,
211+
phase_name(stuck_phase),
212+
stale_strikes,
213+
);
214+
continue;
215+
}
167216
eprintln!(
168-
"WATCHDOG: main loop stalled for {:.1}s at phase {} ({}), aborting process",
217+
"WATCHDOG: main loop stalled for {:.1}s at phase {} ({}), strikes={}, aborting process",
169218
stale_us as f64 / 1_000_000.0,
170219
stuck_phase,
171220
phase_name(stuck_phase),
221+
stale_strikes,
172222
);
173223
std::process::abort();
224+
} else {
225+
stale_strikes = 0;
174226
}
175227
}
176228
})
@@ -179,6 +231,7 @@ impl Watchdog {
179231
heartbeat,
180232
phase,
181233
alive,
234+
select_reconnect_requested,
182235
_handle: handle,
183236
}
184237
}
@@ -191,6 +244,11 @@ impl Watchdog {
191244
fn set_phase(&self, p: u32) {
192245
self.phase.store(p, Ordering::Relaxed);
193246
}
247+
248+
fn take_select_reconnect_requested(&self) -> bool {
249+
self.select_reconnect_requested
250+
.swap(false, Ordering::Relaxed)
251+
}
194252
}
195253

196254
impl Drop for Watchdog {
@@ -437,6 +495,12 @@ pub async fn run_client(config: &ClientConfig<'_>) -> Result<i32, ClientError> {
437495
loop {
438496
watchdog.pet();
439497
watchdog.set_phase(PHASE_DRAIN_COMMANDS);
498+
if watchdog.take_select_reconnect_requested() {
499+
warn!(
500+
"watchdog requested reconnect after select-phase stall; recycling connection"
501+
);
502+
break;
503+
}
440504
let current_time = unsafe { picoquic_current_time() };
441505
// Only process application commands after the QUIC handshake
442506
// completes. During reconnect the acceptor may queue NewStream
@@ -601,6 +665,12 @@ pub async fn run_client(config: &ClientConfig<'_>) -> Result<i32, ClientError> {
601665
);
602666
}
603667
watchdog.pet();
668+
if watchdog.take_select_reconnect_requested() {
669+
warn!(
670+
"watchdog requested reconnect after select-phase stall; recycling connection"
671+
);
672+
break;
673+
}
604674

605675
watchdog.set_phase(PHASE_POST_DRAIN);
606676
if unsafe { (*state_ptr).is_ready() } {

0 commit comments

Comments
 (0)