diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a1328881..a81ef1f4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -262,7 +262,7 @@ jobs: client_args: "" resolver_mode: mixed min_exfil: "5" - min_download: "25" + min_download: "10" artifact_glob: bench-rust-rust-mixed-* # C repo benchmark disabled (C repo currently broken). # - name: c-c diff --git a/crates/slipstream-client/src/dns.rs b/crates/slipstream-client/src/dns.rs index 0b077b32..71ad6b3f 100644 --- a/crates/slipstream-client/src/dns.rs +++ b/crates/slipstream-client/src/dns.rs @@ -4,10 +4,14 @@ mod poll; mod resolver; mod response; -pub(crate) use debug::maybe_report_debug; +pub(crate) use debug::{ + maybe_report_debug, record_resolver_switch, resolver_switch_reason_catalog, + ResolverSwitchReason, +}; pub(crate) use path::{add_paths, refresh_resolver_path, resolver_mode_to_c}; pub(crate) use poll::{expire_inflight_polls, send_poll_queries}; pub(crate) use resolver::{ - reset_resolver_path, resolve_resolvers, sockaddr_storage_to_socket_addr, ResolverState, + note_active_path_delete_signal, reset_resolver_path, should_failover_active_path, + sockaddr_storage_to_socket_addr, ResolverManager, ResolverState, }; pub(crate) use response::{handle_dns_response, DnsResponseContext}; diff --git a/crates/slipstream-client/src/dns/debug.rs b/crates/slipstream-client/src/dns/debug.rs index 4b60a20f..03e9f2f0 100644 --- a/crates/slipstream-client/src/dns/debug.rs +++ b/crates/slipstream-client/src/dns/debug.rs @@ -1,5 +1,5 @@ use crate::pacing::PacingBudgetSnapshot; -use tracing::debug; +use tracing::{debug, info}; use super::resolver::ResolverState; @@ -25,6 +25,15 @@ pub(crate) struct DebugMetrics { pub(crate) last_report_send_packets: u64, pub(crate) last_report_send_bytes: u64, pub(crate) last_report_polls: u64, + pub(crate) inflight_poll_timeouts: u64, + pub(crate) path_probe_successes: u64, + pub(crate) path_probe_failures: u64, + pub(crate) path_rtt_us: u64, + pub(crate) path_cwnd: u64, + pub(crate) path_bytes_in_transit: u64, + pub(crate) path_pacing_rate: u64, + pub(crate) switch_to_count: u64, + pub(crate) switch_from_count: u64, } impl DebugMetrics { @@ -49,10 +58,99 @@ impl DebugMetrics { last_report_send_packets: 0, last_report_send_bytes: 0, last_report_polls: 0, + inflight_poll_timeouts: 0, + path_probe_successes: 0, + path_probe_failures: 0, + path_rtt_us: 0, + path_cwnd: 0, + path_bytes_in_transit: 0, + path_pacing_rate: 0, + switch_to_count: 0, + switch_from_count: 0, } } } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum ResolverSwitchReason { + StartupPrimary, + ManualOverride, + ProbeRecovery, + TimeoutStreakExceeded, + HandshakeStall, + LossSpike, + LatencyRegression, + PathUnavailable, + CooldownExpired, +} + +pub(crate) fn resolver_switch_reason_catalog() -> &'static [ResolverSwitchReason] { + static REASONS: [ResolverSwitchReason; 9] = [ + ResolverSwitchReason::StartupPrimary, + ResolverSwitchReason::ManualOverride, + ResolverSwitchReason::ProbeRecovery, + ResolverSwitchReason::TimeoutStreakExceeded, + ResolverSwitchReason::HandshakeStall, + ResolverSwitchReason::LossSpike, + ResolverSwitchReason::LatencyRegression, + ResolverSwitchReason::PathUnavailable, + ResolverSwitchReason::CooldownExpired, + ]; + &REASONS +} + +impl ResolverSwitchReason { + fn as_str(self) -> &'static str { + match self { + ResolverSwitchReason::StartupPrimary => "startup_primary", + ResolverSwitchReason::ManualOverride => "manual_override", + ResolverSwitchReason::ProbeRecovery => "probe_recovery", + ResolverSwitchReason::TimeoutStreakExceeded => "timeout_streak_exceeded", + ResolverSwitchReason::HandshakeStall => "handshake_stall", + ResolverSwitchReason::LossSpike => "loss_spike", + ResolverSwitchReason::LatencyRegression => "latency_regression", + ResolverSwitchReason::PathUnavailable => "path_unavailable", + ResolverSwitchReason::CooldownExpired => "cooldown_expired", + } + } +} + +pub(crate) fn record_resolver_switch( + resolvers: &mut [ResolverState], + from_index: Option, + to_index: usize, + reason: ResolverSwitchReason, +) { + if to_index >= resolvers.len() { + return; + } + + let to_addr = resolvers[to_index].addr; + resolvers[to_index].debug.switch_to_count = + resolvers[to_index].debug.switch_to_count.saturating_add(1); + + let from_addr = if let Some(index) = from_index { + if index < resolvers.len() { + resolvers[index].debug.switch_from_count = + resolvers[index].debug.switch_from_count.saturating_add(1); + Some(resolvers[index].addr) + } else { + None + } + } else { + None + }; + + info!( + "resolver switch: from={} to={} reason={}", + from_addr + .map(|addr| addr.to_string()) + .unwrap_or_else(|| "none".to_string()), + to_addr, + reason.as_str() + ); +} + pub(crate) fn maybe_report_debug( resolver: &mut ResolverState, now: u64, @@ -62,40 +160,44 @@ pub(crate) fn maybe_report_debug( pacing_snapshot: Option, ) { let label = resolver.label(); - let debug = &mut resolver.debug; - if !debug.enabled { + let metrics = &mut resolver.debug; + if !metrics.enabled { return; } - if debug.last_report_at == 0 { - debug.last_report_at = now; + if metrics.last_report_at == 0 { + metrics.last_report_at = now; return; } - let elapsed = now.saturating_sub(debug.last_report_at); + let elapsed = now.saturating_sub(metrics.last_report_at); if elapsed < DEBUG_REPORT_INTERVAL_US { return; } - let dns_delta = debug.dns_responses.saturating_sub(debug.last_report_dns); - let zero_delta = debug.zero_send_loops.saturating_sub(debug.last_report_zero); - let zero_stream_delta = debug + let dns_delta = metrics + .dns_responses + .saturating_sub(metrics.last_report_dns); + let zero_delta = metrics + .zero_send_loops + .saturating_sub(metrics.last_report_zero); + let zero_stream_delta = metrics .zero_send_with_streams - .saturating_sub(debug.last_report_zero_streams); - let data_ready_delta = debug + .saturating_sub(metrics.last_report_zero_streams); + let data_ready_delta = metrics .data_ready_skips - .saturating_sub(debug.last_report_data_ready_skips); - let enq_delta = debug + .saturating_sub(metrics.last_report_data_ready_skips); + let enq_delta = metrics .enqueued_bytes - .saturating_sub(debug.last_report_enqueued); - let send_pkt_delta = debug + .saturating_sub(metrics.last_report_enqueued); + let send_pkt_delta = metrics .send_packets - .saturating_sub(debug.last_report_send_packets); - let send_bytes_delta = debug + .saturating_sub(metrics.last_report_send_packets); + let send_bytes_delta = metrics .send_bytes - .saturating_sub(debug.last_report_send_bytes); - let polls_delta = debug.polls_sent.saturating_sub(debug.last_report_polls); - let enqueue_ms = if debug.last_enqueue_at == 0 { + .saturating_sub(metrics.last_report_send_bytes); + let polls_delta = metrics.polls_sent.saturating_sub(metrics.last_report_polls); + let enqueue_ms = if metrics.last_enqueue_at == 0 { 0 } else { - now.saturating_sub(debug.last_enqueue_at) / 1_000 + now.saturating_sub(metrics.last_enqueue_at) / 1_000 }; let pacing_summary = if let Some(snapshot) = pacing_snapshot { format!( @@ -106,7 +208,7 @@ pub(crate) fn maybe_report_debug( String::new() }; debug!( - "debug: {} dns+={} send_pkts+={} send_bytes+={} polls+={} zero_send+={} zero_send_streams+={} data_ready_skips+={} streams={} enqueued+={} last_enqueue_ms={} pending_polls={} inflight_polls={}{}", + "debug: {} dns+={} send_pkts+={} send_bytes+={} polls+={} zero_send+={} zero_send_streams+={} data_ready_skips+={} streams={} enqueued+={} last_enqueue_ms={} pending_polls={} inflight_polls={} poll_timeouts={} probe_ok={} probe_fail={} path_rtt_us={} path_cwnd={} path_in_transit={} path_pacing_rate={} switches_to={} switches_from={}{}", label, dns_delta, send_pkt_delta, @@ -120,15 +222,66 @@ pub(crate) fn maybe_report_debug( enqueue_ms, pending_polls, inflight_polls, + metrics.inflight_poll_timeouts, + metrics.path_probe_successes, + metrics.path_probe_failures, + metrics.path_rtt_us, + metrics.path_cwnd, + metrics.path_bytes_in_transit, + metrics.path_pacing_rate, + metrics.switch_to_count, + metrics.switch_from_count, pacing_summary ); - debug.last_report_at = now; - debug.last_report_dns = debug.dns_responses; - debug.last_report_zero = debug.zero_send_loops; - debug.last_report_zero_streams = debug.zero_send_with_streams; - debug.last_report_data_ready_skips = debug.data_ready_skips; - debug.last_report_enqueued = debug.enqueued_bytes; - debug.last_report_send_packets = debug.send_packets; - debug.last_report_send_bytes = debug.send_bytes; - debug.last_report_polls = debug.polls_sent; + metrics.last_report_at = now; + metrics.last_report_dns = metrics.dns_responses; + metrics.last_report_zero = metrics.zero_send_loops; + metrics.last_report_zero_streams = metrics.zero_send_with_streams; + metrics.last_report_data_ready_skips = metrics.data_ready_skips; + metrics.last_report_enqueued = metrics.enqueued_bytes; + metrics.last_report_send_packets = metrics.send_packets; + metrics.last_report_send_bytes = metrics.send_bytes; + metrics.last_report_polls = metrics.polls_sent; +} + +#[cfg(test)] +mod tests { + use super::ResolverSwitchReason; + + #[test] + fn resolver_switch_reasons_are_stable_tokens() { + assert_eq!( + ResolverSwitchReason::StartupPrimary.as_str(), + "startup_primary" + ); + assert_eq!( + ResolverSwitchReason::ManualOverride.as_str(), + "manual_override" + ); + assert_eq!( + ResolverSwitchReason::ProbeRecovery.as_str(), + "probe_recovery" + ); + assert_eq!( + ResolverSwitchReason::TimeoutStreakExceeded.as_str(), + "timeout_streak_exceeded" + ); + assert_eq!( + ResolverSwitchReason::HandshakeStall.as_str(), + "handshake_stall" + ); + assert_eq!(ResolverSwitchReason::LossSpike.as_str(), "loss_spike"); + assert_eq!( + ResolverSwitchReason::LatencyRegression.as_str(), + "latency_regression" + ); + assert_eq!( + ResolverSwitchReason::PathUnavailable.as_str(), + "path_unavailable" + ); + assert_eq!( + ResolverSwitchReason::CooldownExpired.as_str(), + "cooldown_expired" + ); + } } diff --git a/crates/slipstream-client/src/dns/path.rs b/crates/slipstream-client/src/dns/path.rs index 5042aa91..25229962 100644 --- a/crates/slipstream-client/src/dns/path.rs +++ b/crates/slipstream-client/src/dns/path.rs @@ -7,10 +7,15 @@ use slipstream_ffi::picoquic::{ use slipstream_ffi::ResolverMode; use tracing::{info, warn}; -use super::resolver::{reset_resolver_path, ResolverState}; +use super::resolver::{ + clear_active_path_suspect, note_active_refresh_failure, reset_resolver_path, + should_failover_active_path, ResolverState, +}; const PATH_PROBE_INITIAL_DELAY_US: u64 = 250_000; const PATH_PROBE_MAX_DELAY_US: u64 = 10_000_000; +const PROBE_FAILURE_LOG_INTERVAL_US: u64 = 10_000_000; +const SKIP_STANDBY_RECURSIVE_PATH_PROBES: bool = true; pub(crate) fn refresh_resolver_path( cnx: *mut picoquic_cnx_t, @@ -23,6 +28,8 @@ pub(crate) fn refresh_resolver_path( if resolver.path_id != path_id { resolver.path_id = path_id; } + resolver.last_path_unavailable_log_at = 0; + clear_active_path_suspect(resolver); return true; } resolver.unique_path_id = None; @@ -30,7 +37,13 @@ pub(crate) fn refresh_resolver_path( let peer = &resolver.storage as *const _ as *const libc::sockaddr; let path_id = unsafe { slipstream_find_path_id_by_addr(cnx, peer) }; if path_id < 0 { - if resolver.added || resolver.path_id >= 0 { + if resolver.is_active() { + let now = unsafe { picoquic_current_time() }; + note_active_refresh_failure(resolver, now); + if should_failover_active_path(resolver, now) { + reset_resolver_path(resolver); + } + } else if resolver.added || resolver.path_id >= 0 { reset_resolver_path(resolver); } return false; @@ -40,6 +53,8 @@ pub(crate) fn refresh_resolver_path( if resolver.path_id != path_id { resolver.path_id = path_id; } + resolver.last_path_unavailable_log_at = 0; + clear_active_path_suspect(resolver); true } @@ -61,6 +76,9 @@ pub(crate) fn add_paths( let mut default_mode = primary_mode; for resolver in resolvers.iter_mut().skip(1) { + if SKIP_STANDBY_RECURSIVE_PATH_PROBES && resolver.mode == ResolverMode::Recursive { + continue; + } if resolver.added { continue; } @@ -86,18 +104,30 @@ pub(crate) fn add_paths( if ret == 0 && path_id >= 0 { resolver.added = true; resolver.path_id = path_id; + resolver.last_probe_failure_log_at = 0; + resolver.debug.path_probe_successes = + resolver.debug.path_probe_successes.saturating_add(1); info!("Added path {}", resolver.addr); continue; } + resolver.debug.path_probe_failures = resolver.debug.path_probe_failures.saturating_add(1); resolver.probe_attempts = resolver.probe_attempts.saturating_add(1); let delay = path_probe_backoff(resolver.probe_attempts); resolver.next_probe_at = now.saturating_add(delay); - warn!( - "Failed adding path {} (attempt {}), retrying in {}ms", - resolver.addr, - resolver.probe_attempts, - delay / 1000 - ); + let should_log = resolver.probe_attempts == 1 + || resolver.probe_attempts.is_power_of_two() + || resolver.last_probe_failure_log_at == 0 + || now.saturating_sub(resolver.last_probe_failure_log_at) + >= PROBE_FAILURE_LOG_INTERVAL_US; + if should_log { + resolver.last_probe_failure_log_at = now; + warn!( + "Failed adding path {} (attempt {}), retrying in {}ms", + resolver.addr, + resolver.probe_attempts, + delay / 1000 + ); + } } if default_mode != primary_mode { diff --git a/crates/slipstream-client/src/dns/poll.rs b/crates/slipstream-client/src/dns/poll.rs index 83e10cd4..234c08d6 100644 --- a/crates/slipstream-client/src/dns/poll.rs +++ b/crates/slipstream-client/src/dns/poll.rs @@ -14,9 +14,9 @@ use slipstream_core::normalize_dual_stack_addr; const AUTHORITATIVE_POLL_TIMEOUT_US: u64 = 5_000_000; -pub(crate) fn expire_inflight_polls(inflight_poll_ids: &mut HashMap, now: u64) { +pub(crate) fn expire_inflight_polls(inflight_poll_ids: &mut HashMap, now: u64) -> usize { if inflight_poll_ids.is_empty() { - return; + return 0; } let expire_before = now.saturating_sub(AUTHORITATIVE_POLL_TIMEOUT_US); let mut expired = Vec::new(); @@ -25,9 +25,11 @@ pub(crate) fn expire_inflight_polls(inflight_poll_ids: &mut HashMap, n expired.push(*id); } } + let expired_count = expired.len(); for id in expired { inflight_poll_ids.remove(&id); } + expired_count } #[allow(clippy::too_many_arguments)] diff --git a/crates/slipstream-client/src/dns/resolver.rs b/crates/slipstream-client/src/dns/resolver.rs index f5c45dce..841cb78d 100644 --- a/crates/slipstream-client/src/dns/resolver.rs +++ b/crates/slipstream-client/src/dns/resolver.rs @@ -1,37 +1,279 @@ use crate::error::ClientError; use crate::pacing::{PacingBudgetSnapshot, PacingPollBudget}; +use slipstream_core::state_machine::ResolverRole; use slipstream_core::{normalize_dual_stack_addr, resolve_host_port}; +use slipstream_ffi::picoquic::picoquic_current_time; use slipstream_ffi::{socket_addr_to_storage, ResolverMode, ResolverSpec}; use std::collections::HashMap; use std::net::SocketAddr; use tracing::warn; use super::debug::DebugMetrics; +use super::debug::ResolverSwitchReason; + +const SELECTOR_COOLDOWN_US: u64 = 10_000_000; +const SELECTOR_PROMOTION_THRESHOLD_PERCENT: u64 = 15; +const SELECTOR_REQUIRED_CONSECUTIVE_WINS: u8 = 3; +const SCORE_UNAVAILABLE_PENALTY: u64 = 2_000_000; +const SCORE_TIMEOUT_PENALTY: u64 = 50_000; +const SCORE_PROBE_FAILURE_PENALTY: u64 = 20_000; +const SCORE_STICKINESS_BIAS: u64 = 10_000; +const PATH_UNAVAILABLE_LOG_INTERVAL_US: u64 = 5_000_000; +const ACTIVE_FAILOVER_MIN_DELETE_EVENTS: u8 = 2; +const ACTIVE_FAILOVER_MIN_REFRESH_FAILURES: u8 = 2; +const ACTIVE_FAILOVER_STALE_PROGRESS_US: u64 = 2_000_000; +const ACTIVE_FAILOVER_SUSPECT_WINDOW_US: u64 = 5_000_000; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum ResolverHealthState { + Healthy, + Suspect, + Unavailable, +} + +pub(crate) struct ResolverManager { + resolvers: Vec, + active_index: usize, + last_switch_at: u64, + candidate_index: Option, + candidate_wins: u8, +} pub(crate) struct ResolverState { pub(crate) addr: SocketAddr, pub(crate) storage: libc::sockaddr_storage, pub(crate) local_addr_storage: Option, pub(crate) mode: ResolverMode, + pub(crate) role: ResolverRole, pub(crate) added: bool, pub(crate) path_id: libc::c_int, pub(crate) unique_path_id: Option, pub(crate) probe_attempts: u32, pub(crate) next_probe_at: u64, + pub(crate) last_probe_failure_log_at: u64, + pub(crate) last_path_unavailable_log_at: u64, pub(crate) pending_polls: usize, pub(crate) inflight_poll_ids: HashMap, pub(crate) pacing_budget: Option, pub(crate) last_pacing_snapshot: Option, pub(crate) debug: DebugMetrics, + pub(crate) health: ResolverHealthState, + pub(crate) active_delete_suspect_count: u8, + pub(crate) active_refresh_suspect_count: u8, + pub(crate) active_delete_first_at: u64, + pub(crate) last_progress_at: u64, + pub(crate) last_recursive_poll_sent_at: u64, } impl ResolverState { pub(crate) fn label(&self) -> String { format!( - "path_id={} unique_id={:?} resolver={} mode={:?}", - self.path_id, self.unique_path_id, self.addr, self.mode + "path_id={} unique_id={:?} resolver={} mode={:?} role={:?}", + self.path_id, self.unique_path_id, self.addr, self.mode, self.role ) } + + pub(crate) fn is_active(&self) -> bool { + self.role == ResolverRole::Active + } +} + +impl ResolverManager { + pub(crate) fn from_specs( + resolvers: &[ResolverSpec], + mtu: u32, + debug_poll: bool, + ) -> Result { + let resolved = resolve_resolvers(resolvers, mtu, debug_poll)?; + Self::new(resolved) + } + + fn new(mut resolvers: Vec) -> Result { + if resolvers.is_empty() { + return Err(ClientError::new("At least one resolver is required")); + } + + let active_index = resolvers + .iter() + .position(|resolver| resolver.role == ResolverRole::Active) + .unwrap_or(0); + for (index, resolver) in resolvers.iter_mut().enumerate() { + resolver.role = if index == active_index { + ResolverRole::Active + } else { + ResolverRole::Standby + }; + } + + Ok(Self { + resolvers, + active_index, + last_switch_at: 0, + candidate_index: None, + candidate_wins: 0, + }) + } + + pub(crate) fn active_index(&self) -> usize { + self.active_index + } + + pub(crate) fn set_startup_active_index(&mut self, index: usize) { + if index >= self.resolvers.len() || index == self.active_index { + return; + } + + self.active_index = index; + for (resolver_index, resolver) in self.resolvers.iter_mut().enumerate() { + resolver.role = if resolver_index == index { + ResolverRole::Active + } else { + ResolverRole::Standby + }; + resolver.added = resolver_index == index; + resolver.path_id = if resolver_index == index { 0 } else { -1 }; + resolver.unique_path_id = None; + resolver.local_addr_storage = None; + resolver.pending_polls = 0; + resolver.inflight_poll_ids.clear(); + resolver.last_pacing_snapshot = None; + resolver.probe_attempts = 0; + resolver.next_probe_at = 0; + resolver.health = ResolverHealthState::Healthy; + resolver.active_delete_suspect_count = 0; + resolver.active_refresh_suspect_count = 0; + resolver.active_delete_first_at = 0; + resolver.last_progress_at = 0; + resolver.last_recursive_poll_sent_at = 0; + } + } + + pub(crate) fn active_mut(&mut self) -> &mut ResolverState { + &mut self.resolvers[self.active_index] + } + + pub(crate) fn active(&self) -> &ResolverState { + &self.resolvers[self.active_index] + } + + pub(crate) fn as_slice(&self) -> &[ResolverState] { + &self.resolvers + } + + pub(crate) fn as_mut_slice(&mut self) -> &mut [ResolverState] { + &mut self.resolvers + } + + pub(crate) fn maybe_select_active( + &mut self, + now_us: u64, + ) -> Option<(usize, usize, ResolverSwitchReason)> { + if self.resolvers.len() <= 1 { + return None; + } + + let from = self.active_index; + if !self.resolvers[from].added { + if let Some(to) = self.best_available_index() { + if to != from { + self.set_active(to, now_us); + return Some((from, to, ResolverSwitchReason::PathUnavailable)); + } + } + return None; + } + + let to = self.best_available_index()?; + if to == from { + self.candidate_index = None; + self.candidate_wins = 0; + return None; + } + + let current_score = self.score(from); + let candidate_score = self.score(to); + let required_improvement = + current_score.saturating_mul(SELECTOR_PROMOTION_THRESHOLD_PERCENT) / 100; + let actual_improvement = current_score.saturating_sub(candidate_score); + if actual_improvement < required_improvement { + self.candidate_index = None; + self.candidate_wins = 0; + return None; + } + + if self.candidate_index == Some(to) { + self.candidate_wins = self.candidate_wins.saturating_add(1); + } else { + self.candidate_index = Some(to); + self.candidate_wins = 1; + } + if self.candidate_wins < SELECTOR_REQUIRED_CONSECUTIVE_WINS { + return None; + } + if self.last_switch_at > 0 + && now_us.saturating_sub(self.last_switch_at) < SELECTOR_COOLDOWN_US + { + return None; + } + + self.set_active(to, now_us); + Some((from, to, ResolverSwitchReason::LatencyRegression)) + } + + fn set_active(&mut self, active_index: usize, now_us: u64) { + self.active_index = active_index; + for (index, resolver) in self.resolvers.iter_mut().enumerate() { + resolver.role = if index == active_index { + ResolverRole::Active + } else { + ResolverRole::Standby + }; + } + self.last_switch_at = now_us; + self.candidate_index = None; + self.candidate_wins = 0; + } + + fn best_available_index(&self) -> Option { + let mut best_index = None; + let mut best_score = u64::MAX; + for (index, resolver) in self.resolvers.iter().enumerate() { + if !resolver.added { + continue; + } + let score = self.score(index); + if score < best_score { + best_score = score; + best_index = Some(index); + } + } + best_index + } + + fn score(&self, index: usize) -> u64 { + let resolver = &self.resolvers[index]; + let mut score = resolver.debug.path_rtt_us.max(100_000); + score = score.saturating_add( + resolver + .debug + .inflight_poll_timeouts + .saturating_mul(SCORE_TIMEOUT_PENALTY), + ); + score = score.saturating_add( + resolver + .debug + .path_probe_failures + .saturating_mul(SCORE_PROBE_FAILURE_PENALTY), + ); + score = score.saturating_add(resolver.debug.path_bytes_in_transit / 8); + if !resolver.added { + score = score.saturating_add(SCORE_UNAVAILABLE_PENALTY); + } + if resolver.role == ResolverRole::Active { + score = score.saturating_sub(SCORE_STICKINESS_BIAS); + } + score + } } pub(crate) fn resolve_resolvers( @@ -58,11 +300,18 @@ pub(crate) fn resolve_resolvers( storage: socket_addr_to_storage(addr), local_addr_storage: None, mode: resolver.mode, + role: if is_primary { + ResolverRole::Active + } else { + ResolverRole::Standby + }, added: is_primary, path_id: if is_primary { 0 } else { -1 }, - unique_path_id: if is_primary { Some(0) } else { None }, + unique_path_id: None, probe_attempts: 0, next_probe_at: 0, + last_probe_failure_log_at: 0, + last_path_unavailable_log_at: 0, pending_polls: 0, inflight_poll_ids: HashMap::new(), pacing_budget: match resolver.mode { @@ -71,16 +320,29 @@ pub(crate) fn resolve_resolvers( }, last_pacing_snapshot: None, debug: DebugMetrics::new(debug_poll), + health: ResolverHealthState::Healthy, + active_delete_suspect_count: 0, + active_refresh_suspect_count: 0, + active_delete_first_at: 0, + last_progress_at: 0, + last_recursive_poll_sent_at: 0, }); } Ok(resolved) } pub(crate) fn reset_resolver_path(resolver: &mut ResolverState) { - warn!( - "Path for resolver {} became unavailable; resetting state", - resolver.addr - ); + let now = unsafe { picoquic_current_time() }; + if resolver.last_path_unavailable_log_at == 0 + || now.saturating_sub(resolver.last_path_unavailable_log_at) + >= PATH_UNAVAILABLE_LOG_INTERVAL_US + { + resolver.last_path_unavailable_log_at = now; + warn!( + "Path for resolver {} became unavailable; resetting state", + resolver.addr + ); + } resolver.added = false; resolver.path_id = -1; resolver.unique_path_id = None; @@ -90,6 +352,74 @@ pub(crate) fn reset_resolver_path(resolver: &mut ResolverState) { resolver.last_pacing_snapshot = None; resolver.probe_attempts = 0; resolver.next_probe_at = 0; + resolver.health = ResolverHealthState::Unavailable; + resolver.active_delete_suspect_count = 0; + resolver.active_refresh_suspect_count = 0; + resolver.active_delete_first_at = 0; + resolver.last_recursive_poll_sent_at = 0; +} + +pub(crate) fn clear_active_path_suspect(resolver: &mut ResolverState) { + resolver.health = ResolverHealthState::Healthy; + resolver.active_delete_suspect_count = 0; + resolver.active_refresh_suspect_count = 0; + resolver.active_delete_first_at = 0; +} + +pub(crate) fn note_active_path_delete_signal(resolver: &mut ResolverState, now: u64) { + if resolver.active_delete_first_at == 0 + || now.saturating_sub(resolver.active_delete_first_at) > ACTIVE_FAILOVER_SUSPECT_WINDOW_US + { + resolver.active_delete_first_at = now; + resolver.active_delete_suspect_count = 1; + resolver.active_refresh_suspect_count = 0; + resolver.health = ResolverHealthState::Suspect; + return; + } + + resolver.active_delete_suspect_count = resolver.active_delete_suspect_count.saturating_add(1); + resolver.health = ResolverHealthState::Suspect; +} + +pub(crate) fn note_active_refresh_failure(resolver: &mut ResolverState, now: u64) { + if resolver.active_delete_first_at == 0 + || now.saturating_sub(resolver.active_delete_first_at) > ACTIVE_FAILOVER_SUSPECT_WINDOW_US + { + resolver.active_delete_first_at = now; + resolver.active_delete_suspect_count = 0; + resolver.active_refresh_suspect_count = 1; + resolver.health = ResolverHealthState::Suspect; + return; + } + + resolver.active_refresh_suspect_count = resolver.active_refresh_suspect_count.saturating_add(1); + resolver.health = ResolverHealthState::Suspect; +} + +pub(crate) fn note_resolver_progress(resolver: &mut ResolverState, now: u64) { + resolver.last_progress_at = now; + if resolver.health != ResolverHealthState::Unavailable { + clear_active_path_suspect(resolver); + } +} + +pub(crate) fn should_failover_active_path(resolver: &ResolverState, now: u64) -> bool { + if resolver.health == ResolverHealthState::Unavailable { + return true; + } + if resolver.health != ResolverHealthState::Suspect { + return false; + } + if resolver.active_delete_first_at > 0 + && now.saturating_sub(resolver.active_delete_first_at) > ACTIVE_FAILOVER_SUSPECT_WINDOW_US + { + return false; + } + let stale_progress = resolver.last_progress_at == 0 + || now.saturating_sub(resolver.last_progress_at) >= ACTIVE_FAILOVER_STALE_PROGRESS_US; + stale_progress + && resolver.active_delete_suspect_count >= ACTIVE_FAILOVER_MIN_DELETE_EVENTS + && resolver.active_refresh_suspect_count >= ACTIVE_FAILOVER_MIN_REFRESH_FAILURES } pub(crate) fn sockaddr_storage_to_socket_addr( @@ -100,7 +430,11 @@ pub(crate) fn sockaddr_storage_to_socket_addr( #[cfg(test)] mod tests { - use super::resolve_resolvers; + use super::{ + note_active_path_delete_signal, note_active_refresh_failure, note_resolver_progress, + resolve_resolvers, should_failover_active_path, ResolverManager, + }; + use slipstream_core::state_machine::ResolverRole; use slipstream_core::{AddressFamily, HostPort}; use slipstream_ffi::{ResolverMode, ResolverSpec}; @@ -130,4 +464,131 @@ mod tests { Err(err) => assert!(err.to_string().contains("Duplicate resolver address")), } } + + #[test] + fn manager_tracks_single_active_resolver() { + let resolvers = vec![ + ResolverSpec { + resolver: HostPort { + host: "127.0.0.1".to_string(), + port: 8853, + family: AddressFamily::V4, + }, + mode: ResolverMode::Recursive, + }, + ResolverSpec { + resolver: HostPort { + host: "127.0.0.2".to_string(), + port: 8853, + family: AddressFamily::V4, + }, + mode: ResolverMode::Authoritative, + }, + ]; + + let manager = ResolverManager::from_specs(&resolvers, 900, false) + .expect("resolver manager should initialize"); + + assert_eq!(manager.active_index(), 0); + assert_eq!(manager.as_slice()[0].role, ResolverRole::Active); + assert_eq!(manager.as_slice()[1].role, ResolverRole::Standby); + } + + #[test] + fn manager_switches_active_when_candidate_consistently_better() { + let resolvers = vec![ + ResolverSpec { + resolver: HostPort { + host: "127.0.0.1".to_string(), + port: 8853, + family: AddressFamily::V4, + }, + mode: ResolverMode::Recursive, + }, + ResolverSpec { + resolver: HostPort { + host: "127.0.0.2".to_string(), + port: 8853, + family: AddressFamily::V4, + }, + mode: ResolverMode::Recursive, + }, + ]; + + let mut manager = ResolverManager::from_specs(&resolvers, 900, false) + .expect("resolver manager should initialize"); + manager.as_mut_slice()[0].added = true; + manager.as_mut_slice()[1].added = true; + manager.as_mut_slice()[0].debug.path_rtt_us = 300_000; + manager.as_mut_slice()[1].debug.path_rtt_us = 50_000; + + assert!(manager.maybe_select_active(1_000_000).is_none()); + assert!(manager.maybe_select_active(2_000_000).is_none()); + let switch = manager + .maybe_select_active(11_000_000) + .expect("third consecutive win should switch"); + assert_eq!(switch.0, 0); + assert_eq!(switch.1, 1); + assert_eq!(manager.active_index(), 1); + assert_eq!(manager.active().role, ResolverRole::Active); + } + + #[test] + fn manager_can_set_startup_active_index() { + let resolvers = vec![ + ResolverSpec { + resolver: HostPort { + host: "127.0.0.1".to_string(), + port: 8853, + family: AddressFamily::V4, + }, + mode: ResolverMode::Recursive, + }, + ResolverSpec { + resolver: HostPort { + host: "127.0.0.2".to_string(), + port: 8853, + family: AddressFamily::V4, + }, + mode: ResolverMode::Recursive, + }, + ]; + + let mut manager = ResolverManager::from_specs(&resolvers, 900, false) + .expect("resolver manager should initialize"); + manager.set_startup_active_index(1); + + assert_eq!(manager.active_index(), 1); + assert!(manager.as_slice()[1].added); + assert_eq!(manager.as_slice()[1].path_id, 0); + assert_eq!(manager.as_slice()[1].unique_path_id, None); + assert!(!manager.as_slice()[0].added); + assert_eq!(manager.as_slice()[0].path_id, -1); + assert_eq!(manager.as_slice()[0].unique_path_id, None); + } + + #[test] + fn active_failover_requires_multi_signal_and_stale_progress() { + let resolvers = vec![ResolverSpec { + resolver: HostPort { + host: "127.0.0.1".to_string(), + port: 8853, + family: AddressFamily::V4, + }, + mode: ResolverMode::Recursive, + }]; + let mut manager = ResolverManager::from_specs(&resolvers, 900, false) + .expect("resolver manager should initialize"); + let resolver = manager.active_mut(); + + note_resolver_progress(resolver, 1_000_000); + note_active_path_delete_signal(resolver, 1_100_000); + note_active_refresh_failure(resolver, 1_200_000); + assert!(!should_failover_active_path(resolver, 1_300_000)); + + note_active_path_delete_signal(resolver, 1_400_000); + note_active_refresh_failure(resolver, 1_500_000); + assert!(!should_failover_active_path(resolver, 2_900_000)); + assert!(should_failover_active_path(resolver, 3_200_000)); + } } diff --git a/crates/slipstream-client/src/dns/response.rs b/crates/slipstream-client/src/dns/response.rs index a0f7152a..b0ed01af 100644 --- a/crates/slipstream-client/src/dns/response.rs +++ b/crates/slipstream-client/src/dns/response.rs @@ -7,7 +7,7 @@ use slipstream_ffi::picoquic::{ use slipstream_ffi::{socket_addr_to_storage, ResolverMode}; use std::net::SocketAddr; -use super::resolver::ResolverState; +use super::resolver::{note_resolver_progress, ResolverState}; use slipstream_core::normalize_dual_stack_addr; const MAX_POLL_BURST: usize = PICOQUIC_PACKET_LOOP_RECV_MAX; @@ -80,6 +80,7 @@ pub(crate) fn handle_dns_response( resolver.pending_polls = resolver.pending_polls.saturating_add(1).min(MAX_POLL_BURST); } + note_resolver_progress(resolver, current_time); } } else if let Some(response_id) = response_id { if let Some(resolver) = find_resolver_by_addr(ctx.resolvers, peer) { @@ -87,6 +88,7 @@ pub(crate) fn handle_dns_response( if resolver.mode == ResolverMode::Authoritative { resolver.inflight_poll_ids.remove(&response_id); } + note_resolver_progress(resolver, unsafe { picoquic_current_time() }); } } Ok(()) diff --git a/crates/slipstream-client/src/runtime.rs b/crates/slipstream-client/src/runtime.rs index 57dacfa6..b885cff9 100644 --- a/crates/slipstream-client/src/runtime.rs +++ b/crates/slipstream-client/src/runtime.rs @@ -1,15 +1,29 @@ +mod actions; +mod deadlock; +mod decision; +mod health; +mod loop_policy; mod path; mod setup; +use self::actions::{poll_authoritative_resolver, poll_recursive_resolver, PollDispatch}; +use self::deadlock::AcceptorSaturationTracker; +use self::decision::{reconnect_due_to_acceptor_deadlock, reconnect_due_to_active_path_loss}; +use self::health::{ + compute_last_enqueue_ms, should_log_dual_resolver_health, should_log_flow_blocked, + should_log_health, should_reconnect_for_handshake_stall, should_reconnect_for_resolver_stall, +}; +use self::loop_policy::compute_loop_sleep_policy; use self::path::{ - apply_path_mode, drain_path_events, fetch_path_quality, find_resolver_by_addr_mut, - loop_burst_total, path_poll_burst_max, + apply_path_mode, drain_path_events, fetch_and_record_path_quality, find_resolver_by_addr_mut, + loop_burst_total, maybe_switch_active_resolver, }; use self::setup::{bind_tcp_listener, bind_udp_socket, compute_mtu, map_io}; use crate::dns::{ add_paths, expire_inflight_polls, handle_dns_response, maybe_report_debug, - refresh_resolver_path, resolve_resolvers, resolver_mode_to_c, send_poll_queries, - sockaddr_storage_to_socket_addr, DnsResponseContext, + record_resolver_switch, refresh_resolver_path, resolver_mode_to_c, + resolver_switch_reason_catalog, sockaddr_storage_to_socket_addr, DnsResponseContext, + ResolverManager, ResolverSwitchReason, }; use crate::error::ClientError; use crate::pacing::{cwnd_target_polls, inflight_packet_estimate}; @@ -31,7 +45,7 @@ use slipstream_ffi::{ picoquic_set_default_multipath_option, picoquic_state_enum, slipstream_has_ready_stream, slipstream_is_flow_blocked, slipstream_mixed_cc_algorithm, slipstream_set_cc_override, slipstream_set_cid_limit, slipstream_set_default_path_mode, - PICOQUIC_CONNECTION_ID_MAX_SIZE, PICOQUIC_MAX_PACKET_SIZE, PICOQUIC_PACKET_LOOP_RECV_MAX, + PICOQUIC_CONNECTION_ID_MAX_SIZE, PICOQUIC_PACKET_LOOP_RECV_MAX, PICOQUIC_PACKET_LOOP_SEND_MAX, }, socket_addr_to_storage, take_crypto_errors, ClientConfig, QuicGuard, ResolverMode, @@ -43,7 +57,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::{mpsc, Notify}; use tokio::time::sleep; -use tracing::{debug, error, info, warn}; +use tracing::{error, info, warn}; // Protocol defaults; see docs/config.md for details. const SLIPSTREAM_ALPN: &str = "picoquic_sample"; @@ -58,6 +72,10 @@ const MIN_POLL_INTERVAL_US: u64 = 100; /// connection is active. This catches cases where the recursive resolver /// silently stops forwarding queries (rate-limit, anti-tunnel heuristic, etc.). const RESOLVER_STALL_TIMEOUT_US: u64 = 60_000_000; +/// Force reconnect if the connection never reaches ready state within this +/// startup window. With multiple recursive resolvers, rotate the startup +/// resolver index on each stall to avoid sticking on a blocked resolver. +const HANDSHAKE_STALL_TIMEOUT_US: u64 = 30_000_000; /// If a stream is half-closed by the remote side and local upload stays open /// without forward progress for too long, abort the local reader to avoid /// zombie streams exhausting MAX_STREAMS credit. @@ -69,8 +87,12 @@ const ACCEPTOR_SATURATED_TIMEOUT_US: u64 = 30_000_000; /// Periodic health heartbeat log interval (5 minutes). Emits connection /// state at INFO level so we can diagnose silent tunnel deaths. const HEALTH_LOG_INTERVAL_US: u64 = 300_000_000; +const DUAL_RESOLVER_HEALTH_LOG_INTERVAL_US: u64 = 10_000_000; const WATCHDOG_STALE_SECS: u64 = 15; +const WATCHDOG_SELECT_STALE_SECS: u64 = 45; +const WATCHDOG_SELECT_STALE_MAX_STRIKES: u32 = 3; const WATCHDOG_CHECK_INTERVAL: Duration = Duration::from_secs(3); +const ACTIVE_PATH_LOSS_RECONNECT_STREAMS: usize = 32; /// Watchdog that runs on a separate OS thread (not tokio) to detect when the /// single-threaded tokio runtime freezes (e.g. a picoquic C FFI call hangs). @@ -122,6 +144,7 @@ impl Watchdog { .name("watchdog".into()) .spawn(move || { let mut last_check = Instant::now(); + let mut select_stale_strikes = 0u32; while al.load(Ordering::Relaxed) { std::thread::sleep(WATCHDOG_CHECK_INTERVAL); if !al.load(Ordering::Relaxed) { @@ -155,8 +178,34 @@ impl Watchdog { hb.store(now_pico, Ordering::Relaxed); continue; } + let stuck_phase = ph.load(Ordering::Relaxed); + if stuck_phase == PHASE_SELECT { + if stale_us > WATCHDOG_SELECT_STALE_SECS * 1_000_000 { + select_stale_strikes = + select_stale_strikes.saturating_add(1); + eprintln!( + "WATCHDOG: select phase stale for {:.1}s, continuing without abort (phase {}: {}, strikes={}/{})", + stale_us as f64 / 1_000_000.0, + stuck_phase, + phase_name(stuck_phase), + select_stale_strikes, + WATCHDOG_SELECT_STALE_MAX_STRIKES, + ); + if select_stale_strikes >= WATCHDOG_SELECT_STALE_MAX_STRIKES { + eprintln!( + "WATCHDOG: select phase stale persisted for {} checks; aborting process for restart", + select_stale_strikes, + ); + std::process::abort(); + } + hb.store(now_pico, Ordering::Relaxed); + } else { + select_stale_strikes = 0; + } + continue; + } + select_stale_strikes = 0; if stale_us > WATCHDOG_STALE_SECS * 1_000_000 { - let stuck_phase = ph.load(Ordering::Relaxed); eprintln!( "WATCHDOG: main loop stalled for {:.1}s at phase {} ({}), aborting process", stale_us as f64 / 1_000_000.0, @@ -209,9 +258,25 @@ fn drain_disconnected_commands(command_rx: &mut mpsc::UnboundedReceiver dropped } +fn drain_commands_by_connection_state( + cnx: *mut picoquic_cnx_t, + state_ptr: *mut ClientState, + command_rx: &mut mpsc::UnboundedReceiver, +) { + // Only process application commands after the QUIC handshake + // completes. During reconnect the acceptor may queue NewStream + // commands while the connection is still in initial state; + // processing them before ready triggers picoquic errors (0xc). + if unsafe { (*state_ptr).is_ready() } { + drain_commands(cnx, state_ptr, command_rx); + } else { + let _ = drain_disconnected_commands(command_rx); + } +} + pub async fn run_client(config: &ClientConfig<'_>) -> Result { - let domain_len = config.domain.len(); - let mtu = compute_mtu(domain_len)?; + let _ = resolver_switch_reason_catalog(); + let mtu = compute_mtu(config.domain)?; let udp = bind_udp_socket().await?; let (command_tx, mut command_rx) = mpsc::unbounded_channel(); @@ -272,16 +337,24 @@ pub async fn run_client(config: &ClientConfig<'_>) -> Result { let _state = state; let mut reconnect_delay = Duration::from_millis(RECONNECT_SLEEP_MIN_MS); + let mut preferred_startup_resolver_index = 0usize; loop { - let mut resolvers = resolve_resolvers(config.resolvers, mtu, config.debug_poll)?; - if resolvers.is_empty() { - return Err(ClientError::new("At least one resolver is required")); - } + let mut resolver_manager = + ResolverManager::from_specs(config.resolvers, mtu, config.debug_poll)?; + resolver_manager.set_startup_active_index(preferred_startup_resolver_index); + let active_index = resolver_manager.active_index(); + record_resolver_switch( + resolver_manager.as_mut_slice(), + None, + active_index, + ResolverSwitchReason::StartupPrimary, + ); let mut local_addr_storage = socket_addr_to_storage(udp.local_addr().map_err(map_io)?); let current_time = unsafe { picoquic_current_time() }; + let connect_started_at = current_time; let quic = unsafe { picoquic_create( 8, @@ -320,7 +393,7 @@ pub async fn run_client(config: &ClientConfig<'_>) -> Result { configure_quic_with_custom(quic, mixed_cc, mtu); // Multipath is only useful with multiple resolvers; leave it off // (picoquic default) for single-resolver to avoid CID exhaustion. - if resolvers.len() > 1 { + if resolver_manager.as_slice().len() > 1 { picoquic_set_default_multipath_option(quic, 1); // Multipath provisions 8 CIDs per path; the default limit of 8 // is too low with 2+ paths. 32 accommodates ~7 paths. @@ -334,12 +407,14 @@ pub async fn run_client(config: &ClientConfig<'_>) -> Result { slipstream_set_cc_override(override_ptr); } unsafe { - slipstream_set_default_path_mode(resolver_mode_to_c(resolvers[0].mode)); + slipstream_set_default_path_mode(resolver_mode_to_c( + resolver_manager.active_mut().mode, + )); } if let Some(cert) = config.cert { configure_pinned_certificate(quic, cert).map_err(ClientError::new)?; } - let mut server_storage = resolvers[0].storage; + let mut server_storage = resolver_manager.active_mut().storage; // picoquic_create_client_cnx calls picoquic_start_client_cnx internally (see picoquic/quicctx.c). let cnx = unsafe { picoquic_create_client_cnx( @@ -357,7 +432,7 @@ pub async fn run_client(config: &ClientConfig<'_>) -> Result { return Err(ClientError::new("Could not create QUIC connection")); } - apply_path_mode(cnx, &mut resolvers[0])?; + apply_path_mode(cnx, resolver_manager.active_mut())?; unsafe { picoquic_set_callback(cnx, Some(client_callback), state_ptr as *mut _); @@ -376,18 +451,19 @@ pub async fn run_client(config: &ClientConfig<'_>) -> Result { let mut dns_id = 1u16; let mut recv_buf = vec![0u8; 4096]; - let mut send_buf = vec![0u8; PICOQUIC_MAX_PACKET_SIZE]; - let packet_loop_send_max = loop_burst_total(&resolvers, PICOQUIC_PACKET_LOOP_SEND_MAX); - let packet_loop_recv_max = loop_burst_total(&resolvers, PICOQUIC_PACKET_LOOP_RECV_MAX); + let mut send_buf = vec![0u8; mtu as usize]; + let packet_loop_send_max = + loop_burst_total(resolver_manager.as_slice(), PICOQUIC_PACKET_LOOP_SEND_MAX); + let packet_loop_recv_max = + loop_burst_total(resolver_manager.as_slice(), PICOQUIC_PACKET_LOOP_RECV_MAX); let mut zero_send_loops = 0u64; let mut zero_send_with_streams = 0u64; let mut data_ready_skips = 0u64; let mut last_flow_block_log_at = 0u64; let mut last_recv_at = 0u64; let mut last_health_log_at = 0u64; - let mut acceptor_saturated_since: u64 = 0; - let mut acceptor_saturated_max: usize = 0; - let mut acceptor_saturated_bytes: u64 = 0; + let mut last_dual_resolver_log_at = 0u64; + let mut acceptor_saturation = AcceptorSaturationTracker::new(ACCEPTOR_SATURATED_TIMEOUT_US); let watchdog = Watchdog::spawn(); // Clear closing flag that picoquic_free (QuicGuard::drop) may have @@ -400,15 +476,7 @@ pub async fn run_client(config: &ClientConfig<'_>) -> Result { watchdog.pet(); watchdog.set_phase(PHASE_DRAIN_COMMANDS); let current_time = unsafe { picoquic_current_time() }; - // Only process application commands after the QUIC handshake - // completes. During reconnect the acceptor may queue NewStream - // commands while the connection is still in initial state; - // processing them before ready triggers picoquic errors (0xc). - if unsafe { (*state_ptr).is_ready() } { - drain_commands(cnx, state_ptr, &mut command_rx); - } else { - let _ = drain_disconnected_commands(&mut command_rx); - } + drain_commands_by_connection_state(cnx, state_ptr, &mut command_rx); watchdog.set_phase(PHASE_DRAIN_STREAM_DATA); drain_stream_data(cnx, state_ptr); watchdog.set_phase(PHASE_CNX_STATE_CHECK); @@ -428,6 +496,39 @@ pub async fn run_client(config: &ClientConfig<'_>) -> Result { } let ready = unsafe { (*state_ptr).is_ready() }; + if let Some(stall_us) = should_reconnect_for_handshake_stall( + ready, + current_time, + connect_started_at, + HANDSHAKE_STALL_TIMEOUT_US, + ) { + let resolver_count = resolver_manager.as_slice().len(); + if resolver_count > 1 { + let from_index = resolver_manager.active_index(); + let to_index = (from_index + 1) % resolver_count; + let from_addr = resolver_manager.as_slice()[from_index].addr; + let to_addr = resolver_manager.as_slice()[to_index].addr; + record_resolver_switch( + resolver_manager.as_mut_slice(), + Some(from_index), + to_index, + ResolverSwitchReason::HandshakeStall, + ); + preferred_startup_resolver_index = to_index; + warn!( + "handshake stall for {:.1}s on startup resolver {}; rotating startup resolver to {} and reconnecting", + stall_us as f64 / 1_000_000.0, + from_addr, + to_addr, + ); + } else { + warn!( + "handshake stall for {:.1}s on single resolver; reconnecting", + stall_us as f64 / 1_000_000.0 + ); + } + break; + } if ready { if last_recv_at == 0 { last_recv_at = current_time; @@ -438,18 +539,37 @@ pub async fn run_client(config: &ClientConfig<'_>) -> Result { if reconnect_delay != Duration::from_millis(RECONNECT_SLEEP_MIN_MS) { reconnect_delay = Duration::from_millis(RECONNECT_SLEEP_MIN_MS); } - add_paths(cnx, &mut resolvers)?; - for resolver in resolvers.iter_mut() { + add_paths(cnx, resolver_manager.as_mut_slice())?; + for resolver in resolver_manager.as_mut_slice().iter_mut() { if resolver.added { apply_path_mode(cnx, resolver)?; } } } - drain_path_events(cnx, &mut resolvers, state_ptr); + let active_path_deleted = + drain_path_events(cnx, resolver_manager.as_mut_slice(), state_ptr); + if active_path_deleted + && reconnect_due_to_active_path_loss( + &mut resolver_manager, + current_time, + &mut preferred_startup_resolver_index, + state_ptr, + ACTIVE_PATH_LOSS_RECONNECT_STREAMS, + ) + { + break; + } - for resolver in resolvers.iter_mut() { + for resolver in resolver_manager.as_mut_slice().iter_mut() { if resolver.mode == ResolverMode::Authoritative { - expire_inflight_polls(&mut resolver.inflight_poll_ids, current_time); + let expired = + expire_inflight_polls(&mut resolver.inflight_poll_ids, current_time); + if expired > 0 { + resolver.debug.inflight_poll_timeouts = resolver + .debug + .inflight_poll_timeouts + .saturating_add(expired as u64); + } } } @@ -459,13 +579,16 @@ pub async fn run_client(config: &ClientConfig<'_>) -> Result { let delay_us = if delay_us < 0 { 0 } else { delay_us as u64 }; let streams_len_for_sleep = unsafe { (*state_ptr).streams_len() }; let mut has_work = streams_len_for_sleep > 0; - for resolver in resolvers.iter_mut() { + for resolver in resolver_manager.as_mut_slice().iter_mut() { if !refresh_resolver_path(cnx, resolver) { continue; } + if !resolver.is_active() && resolver.mode == ResolverMode::Recursive { + continue; + } let pending_for_sleep = match resolver.mode { ResolverMode::Authoritative => { - let quality = fetch_path_quality(cnx, resolver); + let quality = fetch_and_record_path_quality(cnx, resolver); let snapshot = resolver .pacing_budget .as_mut() @@ -490,12 +613,13 @@ pub async fn run_client(config: &ClientConfig<'_>) -> Result { } } // Avoid a tight poll loop when idle, but keep the short slice during active transfers. - let timeout_us = if has_work { - delay_us.clamp(MIN_POLL_INTERVAL_US, DNS_POLL_SLICE_US) - } else { - delay_us.max(MIN_POLL_INTERVAL_US) - }; - let timeout = Duration::from_micros(timeout_us); + let sleep_policy = compute_loop_sleep_policy( + delay_us, + has_work, + MIN_POLL_INTERVAL_US, + DNS_POLL_SLICE_US, + ); + let timeout = sleep_policy.timeout; watchdog.set_phase(PHASE_SELECT); let pre_select = Instant::now(); @@ -524,7 +648,7 @@ pub async fn run_client(config: &ClientConfig<'_>) -> Result { let mut response_ctx = DnsResponseContext { quic, local_addr_storage: &local_addr_storage, - resolvers: &mut resolvers, + resolvers: resolver_manager.as_mut_slice(), }; handle_dns_response(&recv_buf[..size], peer, &mut response_ctx)?; for _ in 1..packet_loop_recv_max { @@ -559,25 +683,33 @@ pub async fn run_client(config: &ClientConfig<'_>) -> Result { "select overslept: {:.1}s (timeout was {:.1}s, has_work={})", select_elapsed.as_secs_f64(), timeout.as_secs_f64(), - has_work, + sleep_policy.has_work, ); } watchdog.pet(); watchdog.set_phase(PHASE_POST_DRAIN); - if unsafe { (*state_ptr).is_ready() } { - drain_commands(cnx, state_ptr, &mut command_rx); - } else { - let _ = drain_disconnected_commands(&mut command_rx); - } + drain_commands_by_connection_state(cnx, state_ptr, &mut command_rx); drain_stream_data(cnx, state_ptr); - drain_path_events(cnx, &mut resolvers, state_ptr); + let active_path_deleted = + drain_path_events(cnx, resolver_manager.as_mut_slice(), state_ptr); + if active_path_deleted + && reconnect_due_to_active_path_loss( + &mut resolver_manager, + current_time, + &mut preferred_startup_resolver_index, + state_ptr, + ACTIVE_PATH_LOSS_RECONNECT_STREAMS, + ) + { + break; + } let reaped_half_closed = unsafe { let now = picoquic_current_time(); (*state_ptr).reap_stale_half_closed_streams(now, HALF_CLOSE_IDLE_TIMEOUT_US) }; if reaped_half_closed > 0 { - warn!( + info!( "reaped {} stale half-closed stream(s) after {}s idle timeout", reaped_half_closed, HALF_CLOSE_IDLE_TIMEOUT_US / 1_000_000 @@ -586,42 +718,8 @@ pub async fn run_client(config: &ClientConfig<'_>) -> Result { // Circuit breaker: force reconnect when MAX_STREAMS is fully // consumed and no progress is being made (flow control deadlock). - // - // Three gates prevent false-triggering on healthy saturated connections: - // 1. `used >= max` — acceptor budget fully consumed - // 2. `max` unchanged — peer hasn't extended MAX_STREAMS - // 3. total stream bytes unchanged — no data flowing (true deadlock) - // - // Any gate resetting (MAX_STREAMS extended, or bytes increasing) - // restarts the 30s timer. - { - let (used, max) = unsafe { (*state_ptr).acceptor_metrics() }; - let total_bytes = unsafe { (*state_ptr).total_stream_bytes() }; - if max > 0 && used >= max { - let now = unsafe { picoquic_current_time() }; - if acceptor_saturated_since == 0 - || max != acceptor_saturated_max - || total_bytes != acceptor_saturated_bytes - { - // First saturation, limit extended, or bytes moved — (re)start. - acceptor_saturated_since = now; - acceptor_saturated_max = max; - acceptor_saturated_bytes = total_bytes; - } else if now.saturating_sub(acceptor_saturated_since) - >= ACCEPTOR_SATURATED_TIMEOUT_US - { - warn!( - "acceptor deadlock for {}s ({}/{}, bytes={}, no progress), forcing reconnect", - ACCEPTOR_SATURATED_TIMEOUT_US / 1_000_000, - used, - max, - total_bytes - ); - break; - } - } else { - acceptor_saturated_since = 0; - } + if reconnect_due_to_acceptor_deadlock(state_ptr, &mut acceptor_saturation) { + break; } let mut sent_quic_data = false; @@ -665,8 +763,11 @@ pub async fn run_client(config: &ClientConfig<'_>) -> Result { // congestion window is full (not just flow control). // The server can only send ACKs inside DNS responses, // so we must keep querying to unblock the CWND. - for resolver in resolvers.iter_mut() { - if resolver.mode == ResolverMode::Recursive && resolver.added { + for resolver in resolver_manager.as_mut_slice().iter_mut() { + if resolver.is_active() + && resolver.mode == ResolverMode::Recursive + && resolver.added + { resolver.pending_polls = resolver.pending_polls.max(1); } } @@ -680,7 +781,9 @@ pub async fn run_client(config: &ClientConfig<'_>) -> Result { sent_quic_data = true; if let Ok(dest) = sockaddr_storage_to_socket_addr(&addr_to) { let dest = normalize_dual_stack_addr(dest); - if let Some(resolver) = find_resolver_by_addr_mut(&mut resolvers, dest) { + if let Some(resolver) = + find_resolver_by_addr_mut(resolver_manager.as_mut_slice(), dest) + { resolver.local_addr_storage = Some(unsafe { std::ptr::read(&addr_from) }); resolver.debug.send_packets = resolver.debug.send_packets.saturating_add(1); resolver.debug.send_bytes = @@ -715,129 +818,98 @@ pub async fn run_client(config: &ClientConfig<'_>) -> Result { } } + maybe_switch_active_resolver( + &mut resolver_manager, + current_time, + &mut preferred_startup_resolver_index, + ); + let has_ready_stream = unsafe { slipstream_has_ready_stream(cnx) != 0 }; let flow_blocked = unsafe { slipstream_is_flow_blocked(cnx) != 0 }; let streams_len = unsafe { (*state_ptr).streams_len() }; - if streams_len > 0 && has_ready_stream && flow_blocked { - let now = unsafe { picoquic_current_time() }; - if now.saturating_sub(last_flow_block_log_at) >= FLOW_BLOCKED_LOG_INTERVAL_US { - let metrics = unsafe { (*state_ptr).stream_debug_metrics() }; - let backlog = unsafe { (*state_ptr).stream_backlog_summaries(8) }; - let (enqueued_bytes, last_enqueue_at) = - unsafe { (*state_ptr).debug_snapshot() }; - let last_enqueue_ms = if last_enqueue_at == 0 { - 0 - } else { - now.saturating_sub(last_enqueue_at) / 1_000 - }; - error!( - "connection flow blocked: streams={} streams_with_rx_queued={} queued_bytes_total={} streams_with_recv_fin={} streams_with_send_fin={} streams_discarding={} streams_with_unconsumed_rx={} enqueued_bytes={} last_enqueue_ms={} zero_send_with_streams={} zero_send_loops={} data_ready_skips={} flow_blocked={} has_ready_stream={} backlog={:?}", - streams_len, - metrics.streams_with_rx_queued, - metrics.queued_bytes_total, - metrics.streams_with_recv_fin, - metrics.streams_with_send_fin, - metrics.streams_discarding, - metrics.streams_with_unconsumed_rx, - enqueued_bytes, - last_enqueue_ms, - zero_send_with_streams, - zero_send_loops, - data_ready_skips, - flow_blocked, - has_ready_stream, - backlog - ); - last_flow_block_log_at = now; - } + let now = unsafe { picoquic_current_time() }; + if should_log_flow_blocked( + streams_len, + has_ready_stream, + flow_blocked, + now, + last_flow_block_log_at, + FLOW_BLOCKED_LOG_INTERVAL_US, + ) { + let metrics = unsafe { (*state_ptr).stream_debug_metrics() }; + let backlog = unsafe { (*state_ptr).stream_backlog_summaries(8) }; + let (enqueued_bytes, last_enqueue_at) = unsafe { (*state_ptr).debug_snapshot() }; + let last_enqueue_ms = compute_last_enqueue_ms(now, last_enqueue_at); + error!( + "connection flow blocked: streams={} streams_with_rx_queued={} queued_bytes_total={} streams_with_recv_fin={} streams_with_send_fin={} streams_discarding={} streams_with_unconsumed_rx={} enqueued_bytes={} last_enqueue_ms={} zero_send_with_streams={} zero_send_loops={} data_ready_skips={} flow_blocked={} has_ready_stream={} backlog={:?}", + streams_len, + metrics.streams_with_rx_queued, + metrics.queued_bytes_total, + metrics.streams_with_recv_fin, + metrics.streams_with_send_fin, + metrics.streams_discarding, + metrics.streams_with_unconsumed_rx, + enqueued_bytes, + last_enqueue_ms, + zero_send_with_streams, + zero_send_loops, + data_ready_skips, + flow_blocked, + has_ready_stream, + backlog + ); + last_flow_block_log_at = now; } watchdog.set_phase(PHASE_POLL_QUERIES); - for resolver in resolvers.iter_mut() { + let recursive_multi_resolver_mode = resolver_manager.as_slice().len() > 1 + && resolver_manager + .as_slice() + .iter() + .all(|resolver| resolver.mode == ResolverMode::Recursive); + for resolver in resolver_manager.as_mut_slice().iter_mut() { + if !resolver.is_active() && resolver.mode == ResolverMode::Recursive { + continue; + } if !refresh_resolver_path(cnx, resolver) { continue; } match resolver.mode { ResolverMode::Authoritative => { - let quality = fetch_path_quality(cnx, resolver); - let snapshot = resolver.last_pacing_snapshot; - let pacing_target = snapshot - .map(|snapshot| snapshot.target_inflight) - .unwrap_or_else(|| cwnd_target_polls(quality.cwin, mtu)); - let inflight_packets = - inflight_packet_estimate(quality.bytes_in_transit, mtu); - let mut poll_deficit = pacing_target.saturating_sub(inflight_packets); - // Only suppress polls when the send loop actually produced - // QUIC packets (which act as implicit polls). When CWND is - // full, prepare_next_packet_ex returns nothing—but the server - // can only send ACKs inside DNS responses, so we must keep - // polling to unblock the congestion window. - if has_ready_stream && !flow_blocked && sent_quic_data { - poll_deficit = 0; - } - if poll_deficit > 0 && resolver.debug.enabled { - debug!( - "cc_state: {} cwnd={} in_transit={} rtt_us={} flow_blocked={} deficit={}", - resolver.label(), - quality.cwin, - quality.bytes_in_transit, - quality.rtt, - flow_blocked, - poll_deficit - ); - } - if poll_deficit > 0 { - let burst_max = path_poll_burst_max(resolver); - let mut to_send = poll_deficit.min(burst_max); - send_poll_queries( - cnx, - &udp, - config, - &mut local_addr_storage, - &mut dns_id, - resolver, - &mut to_send, - &mut send_buf, - ) - .await?; - } + let mut dispatch = PollDispatch { + udp: &udp, + config, + local_addr_storage: &mut local_addr_storage, + dns_id: &mut dns_id, + send_buf: &mut send_buf, + }; + poll_authoritative_resolver( + cnx, + &mut dispatch, + resolver, + mtu, + has_ready_stream, + flow_blocked, + sent_quic_data, + ) + .await?; } ResolverMode::Recursive => { - resolver.last_pacing_snapshot = None; - if resolver.pending_polls > 0 { - let burst_max = path_poll_burst_max(resolver); - if resolver.pending_polls > burst_max { - let mut to_send = burst_max; - send_poll_queries( - cnx, - &udp, - config, - &mut local_addr_storage, - &mut dns_id, - resolver, - &mut to_send, - &mut send_buf, - ) - .await?; - resolver.pending_polls = resolver - .pending_polls - .saturating_sub(burst_max) - .saturating_add(to_send); - } else { - let mut pending = resolver.pending_polls; - send_poll_queries( - cnx, - &udp, - config, - &mut local_addr_storage, - &mut dns_id, - resolver, - &mut pending, - &mut send_buf, - ) - .await?; - resolver.pending_polls = pending; - } - } + let mut dispatch = PollDispatch { + udp: &udp, + config, + local_addr_storage: &mut local_addr_storage, + dns_id: &mut dns_id, + send_buf: &mut send_buf, + }; + poll_recursive_resolver( + cnx, + &mut dispatch, + resolver, + recursive_multi_resolver_mode, + has_ready_stream, + flow_blocked, + ) + .await?; } } } @@ -848,25 +920,28 @@ pub async fn run_client(config: &ClientConfig<'_>) -> Result { // new QUIC handshake resets resolver state. // Only check when streams are active — idle connections legitimately // receive no responses and should not trigger a reconnect. - if last_recv_at > 0 && ready { - let streams_len = unsafe { (*state_ptr).streams_len() }; - let stall_us = current_time.saturating_sub(last_recv_at); - if streams_len > 0 && stall_us >= RESOLVER_STALL_TIMEOUT_US { - warn!( - "resolver stall detected: no DNS responses for {:.1}s, streams={}, forcing reconnect", - stall_us as f64 / 1_000_000.0, - streams_len, - ); - // Don't close here — the post-loop picoquic_close handles it. - break; - } + let streams_len = unsafe { (*state_ptr).streams_len() }; + if let Some(stall_us) = should_reconnect_for_resolver_stall( + ready, + streams_len, + current_time, + last_recv_at, + RESOLVER_STALL_TIMEOUT_US, + ) { + warn!( + "resolver stall detected: no DNS responses for {:.1}s, streams={}, forcing reconnect", + stall_us as f64 / 1_000_000.0, + streams_len, + ); + // Don't close here — the post-loop picoquic_close handles it. + break; } watchdog.set_phase(PHASE_HEALTH_LOG); let report_time = unsafe { picoquic_current_time() }; let (enqueued_bytes, last_enqueue_at) = unsafe { (*state_ptr).debug_snapshot() }; let streams_len = unsafe { (*state_ptr).streams_len() }; - for resolver in resolvers.iter_mut() { + for resolver in resolver_manager.as_mut_slice().iter_mut() { resolver.debug.enqueued_bytes = enqueued_bytes; resolver.debug.last_enqueue_at = last_enqueue_at; resolver.debug.zero_send_loops = zero_send_loops; @@ -878,7 +953,7 @@ pub async fn run_client(config: &ClientConfig<'_>) -> Result { let inflight_polls = resolver.inflight_poll_ids.len(); let pending_for_debug = match resolver.mode { ResolverMode::Authoritative => { - let quality = fetch_path_quality(cnx, resolver); + let quality = fetch_and_record_path_quality(cnx, resolver); let inflight_packets = inflight_packet_estimate(quality.bytes_in_transit, mtu); resolver @@ -902,7 +977,63 @@ pub async fn run_client(config: &ClientConfig<'_>) -> Result { // Periodic health heartbeat: log key metrics at INFO level so we // can diagnose silent tunnel deaths from production logs. - if ready && report_time.saturating_sub(last_health_log_at) >= HEALTH_LOG_INTERVAL_US { + if should_log_dual_resolver_health( + ready, + resolver_manager.as_slice().len(), + report_time, + last_dual_resolver_log_at, + DUAL_RESOLVER_HEALTH_LOG_INTERVAL_US, + ) { + last_dual_resolver_log_at = report_time; + let active = resolver_manager.active(); + let mut standby_total_pending = 0usize; + let mut standby_total_inflight = 0usize; + let mut standby_total_dns = 0u64; + let mut standby_added = 0usize; + for resolver in resolver_manager.as_slice() { + if resolver.is_active() { + continue; + } + if resolver.added { + standby_added = standby_added.saturating_add(1); + } + standby_total_pending = + standby_total_pending.saturating_add(resolver.pending_polls); + standby_total_inflight = + standby_total_inflight.saturating_add(resolver.inflight_poll_ids.len()); + standby_total_dns = + standby_total_dns.saturating_add(resolver.debug.dns_responses); + } + info!( + "dual-health: active={} active_added={} active_pending={} active_inflight={} active_dns_responses={} active_polls_sent={} active_path_rtt_us={} active_timeouts={} standby_count={} standby_added={} standby_pending={} standby_inflight={} standby_dns_responses={} streams={} recv_age={}s", + active.addr, + active.added, + active.pending_polls, + active.inflight_poll_ids.len(), + active.debug.dns_responses, + active.debug.polls_sent, + active.debug.path_rtt_us, + active.debug.inflight_poll_timeouts, + resolver_manager.as_slice().len().saturating_sub(1), + standby_added, + standby_total_pending, + standby_total_inflight, + standby_total_dns, + streams_len, + if last_recv_at > 0 { + report_time.saturating_sub(last_recv_at) / 1_000_000 + } else { + 0 + }, + ); + } + + if should_log_health( + ready, + report_time, + last_health_log_at, + HEALTH_LOG_INTERVAL_US, + ) { last_health_log_at = report_time; let (acceptor_used, acceptor_max) = unsafe { (*state_ptr).acceptor_metrics() }; let recv_age_s = if last_recv_at > 0 { diff --git a/crates/slipstream-client/src/runtime/actions.rs b/crates/slipstream-client/src/runtime/actions.rs new file mode 100644 index 00000000..9dbbfc57 --- /dev/null +++ b/crates/slipstream-client/src/runtime/actions.rs @@ -0,0 +1,162 @@ +use crate::dns::{send_poll_queries, ResolverState}; +use crate::error::ClientError; +use crate::pacing::{cwnd_target_polls, inflight_packet_estimate}; +use slipstream_ffi::picoquic::picoquic_cnx_t; +use slipstream_ffi::{ClientConfig, ResolverMode}; +use tokio::net::UdpSocket as TokioUdpSocket; +use tracing::debug; + +use super::path::{fetch_path_quality, path_poll_burst_max}; + +const RECURSIVE_POLL_MIN_INTERVAL_US_ACTIVE: u64 = 1_000; +const RECURSIVE_POLL_MIN_INTERVAL_US_IDLE: u64 = 500; +const RECURSIVE_POLL_BURST_ACTIVE_STREAMS: usize = 2; +const RECURSIVE_POLL_BURST_IDLE: usize = 8; + +pub(crate) struct PollDispatch<'a, 'cfg> { + pub(crate) udp: &'a TokioUdpSocket, + pub(crate) config: &'a ClientConfig<'cfg>, + pub(crate) local_addr_storage: &'a mut libc::sockaddr_storage, + pub(crate) dns_id: &'a mut u16, + pub(crate) send_buf: &'a mut [u8], +} + +pub(crate) async fn poll_authoritative_resolver( + cnx: *mut picoquic_cnx_t, + dispatch: &mut PollDispatch<'_, '_>, + resolver: &mut ResolverState, + mtu: u32, + has_ready_stream: bool, + flow_blocked: bool, + sent_quic_data: bool, +) -> Result<(), ClientError> { + let quality = fetch_path_quality(cnx, resolver); + resolver.debug.path_rtt_us = quality.rtt; + resolver.debug.path_cwnd = quality.cwin; + resolver.debug.path_bytes_in_transit = quality.bytes_in_transit; + resolver.debug.path_pacing_rate = quality.pacing_rate; + + let snapshot = resolver.last_pacing_snapshot; + let pacing_target = snapshot + .map(|snapshot| snapshot.target_inflight) + .unwrap_or_else(|| cwnd_target_polls(quality.cwin, mtu)); + let inflight_packets = inflight_packet_estimate(quality.bytes_in_transit, mtu); + let mut poll_deficit = pacing_target.saturating_sub(inflight_packets); + + if has_ready_stream && !flow_blocked && sent_quic_data { + poll_deficit = 0; + } + + if poll_deficit > 0 && resolver.debug.enabled { + debug!( + "cc_state: {} cwnd={} in_transit={} rtt_us={} flow_blocked={} deficit={}", + resolver.label(), + quality.cwin, + quality.bytes_in_transit, + quality.rtt, + flow_blocked, + poll_deficit + ); + } + + if poll_deficit > 0 { + let burst_max = path_poll_burst_max(resolver); + let mut to_send = poll_deficit.min(burst_max); + send_poll_queries( + cnx, + dispatch.udp, + dispatch.config, + dispatch.local_addr_storage, + dispatch.dns_id, + resolver, + &mut to_send, + dispatch.send_buf, + ) + .await?; + } + + Ok(()) +} + +pub(crate) async fn poll_recursive_resolver( + cnx: *mut picoquic_cnx_t, + dispatch: &mut PollDispatch<'_, '_>, + resolver: &mut ResolverState, + multi_resolver_mode: bool, + has_ready_stream: bool, + flow_blocked: bool, +) -> Result<(), ClientError> { + if resolver.mode != ResolverMode::Recursive { + return Ok(()); + } + + resolver.last_pacing_snapshot = None; + if resolver.pending_polls == 0 { + return Ok(()); + } + + let now = unsafe { slipstream_ffi::picoquic::picoquic_current_time() }; + if multi_resolver_mode { + let min_interval = if has_ready_stream { + RECURSIVE_POLL_MIN_INTERVAL_US_ACTIVE + } else { + RECURSIVE_POLL_MIN_INTERVAL_US_IDLE + }; + if resolver.last_recursive_poll_sent_at > 0 + && now.saturating_sub(resolver.last_recursive_poll_sent_at) < min_interval + { + return Ok(()); + } + } + + let burst_max = path_poll_burst_max(resolver); + let burst_cap = if multi_resolver_mode { + if has_ready_stream && !flow_blocked { + RECURSIVE_POLL_BURST_ACTIVE_STREAMS + } else { + RECURSIVE_POLL_BURST_IDLE + } + } else { + burst_max + }; + let burst_max = burst_max.min(burst_cap.max(1)); + let polls_sent_before = resolver.debug.polls_sent; + if resolver.pending_polls > burst_max { + let mut to_send = burst_max; + send_poll_queries( + cnx, + dispatch.udp, + dispatch.config, + dispatch.local_addr_storage, + dispatch.dns_id, + resolver, + &mut to_send, + dispatch.send_buf, + ) + .await?; + resolver.pending_polls = resolver + .pending_polls + .saturating_sub(burst_max) + .saturating_add(to_send); + } else { + let mut pending = resolver.pending_polls; + send_poll_queries( + cnx, + dispatch.udp, + dispatch.config, + dispatch.local_addr_storage, + dispatch.dns_id, + resolver, + &mut pending, + dispatch.send_buf, + ) + .await?; + resolver.pending_polls = pending; + } + + if resolver.debug.polls_sent > polls_sent_before { + resolver.last_recursive_poll_sent_at = now; + } + + Ok(()) +} diff --git a/crates/slipstream-client/src/runtime/deadlock.rs b/crates/slipstream-client/src/runtime/deadlock.rs new file mode 100644 index 00000000..4cdaa805 --- /dev/null +++ b/crates/slipstream-client/src/runtime/deadlock.rs @@ -0,0 +1,76 @@ +pub(crate) struct AcceptorSaturationTracker { + saturated_since: u64, + saturated_max: usize, + saturated_bytes: u64, + timeout_us: u64, +} + +impl AcceptorSaturationTracker { + pub(crate) fn new(timeout_us: u64) -> Self { + Self { + saturated_since: 0, + saturated_max: 0, + saturated_bytes: 0, + timeout_us, + } + } + + pub(crate) fn timeout_us(&self) -> u64 { + self.timeout_us + } + + pub(crate) fn update(&mut self, now: u64, used: usize, max: usize, total_bytes: u64) -> bool { + if max == 0 || used < max { + self.saturated_since = 0; + return false; + } + + if self.saturated_since == 0 + || max != self.saturated_max + || total_bytes != self.saturated_bytes + { + self.saturated_since = now; + self.saturated_max = max; + self.saturated_bytes = total_bytes; + return false; + } + + now.saturating_sub(self.saturated_since) >= self.timeout_us + } +} + +#[cfg(test)] +mod tests { + use super::AcceptorSaturationTracker; + + #[test] + fn does_not_trigger_before_timeout() { + let mut tracker = AcceptorSaturationTracker::new(30); + assert!(!tracker.update(10, 10, 10, 100)); + assert!(!tracker.update(35, 10, 10, 100)); + } + + #[test] + fn triggers_after_timeout_with_no_progress() { + let mut tracker = AcceptorSaturationTracker::new(30); + assert!(!tracker.update(10, 10, 10, 100)); + assert!(tracker.update(40, 10, 10, 100)); + } + + #[test] + fn resets_when_progress_changes() { + let mut tracker = AcceptorSaturationTracker::new(30); + assert!(!tracker.update(10, 10, 10, 100)); + assert!(!tracker.update(25, 10, 10, 200)); + assert!(!tracker.update(40, 10, 10, 200)); + assert!(tracker.update(56, 10, 10, 200)); + } + + #[test] + fn clears_when_no_longer_saturated() { + let mut tracker = AcceptorSaturationTracker::new(30); + assert!(!tracker.update(10, 10, 10, 100)); + assert!(!tracker.update(20, 5, 10, 100)); + assert!(!tracker.update(40, 10, 10, 100)); + } +} diff --git a/crates/slipstream-client/src/runtime/decision.rs b/crates/slipstream-client/src/runtime/decision.rs new file mode 100644 index 00000000..fe014881 --- /dev/null +++ b/crates/slipstream-client/src/runtime/decision.rs @@ -0,0 +1,152 @@ +use super::deadlock::AcceptorSaturationTracker; +use super::path::maybe_switch_active_resolver; +use crate::dns::ResolverManager; +use crate::streams::ClientState; +use slipstream_ffi::picoquic::picoquic_current_time; +use tracing::warn; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum ReconnectReason { + ActivePathLoss { + streams_len: usize, + }, + AcceptorDeadlock { + used: usize, + max: usize, + total_bytes: u64, + timeout_us: u64, + }, +} + +pub(crate) fn decide_active_path_loss_reconnect( + streams_len: usize, + active_path_ready: bool, + threshold: usize, +) -> Option { + if streams_len >= threshold && !active_path_ready { + return Some(ReconnectReason::ActivePathLoss { streams_len }); + } + None +} + +pub(crate) fn decide_acceptor_deadlock_reconnect( + deadlock_detected: bool, + used: usize, + max: usize, + total_bytes: u64, + timeout_us: u64, +) -> Option { + if deadlock_detected { + return Some(ReconnectReason::AcceptorDeadlock { + used, + max, + total_bytes, + timeout_us, + }); + } + None +} + +pub(crate) fn reconnect_due_to_active_path_loss( + resolver_manager: &mut ResolverManager, + current_time: u64, + preferred_startup_resolver_index: &mut usize, + state_ptr: *mut ClientState, + threshold: usize, +) -> bool { + maybe_switch_active_resolver( + resolver_manager, + current_time, + preferred_startup_resolver_index, + ); + + let streams_len = unsafe { (*state_ptr).streams_len() }; + let active_path_ready = resolver_manager.active().added; + if let Some(reason) = + decide_active_path_loss_reconnect(streams_len, active_path_ready, threshold) + { + log_reconnect_reason(reason); + return true; + } + false +} + +pub(crate) fn reconnect_due_to_acceptor_deadlock( + state_ptr: *mut ClientState, + acceptor_saturation: &mut AcceptorSaturationTracker, +) -> bool { + let (used, max) = unsafe { (*state_ptr).acceptor_metrics() }; + let total_bytes = unsafe { (*state_ptr).total_stream_bytes() }; + let now = unsafe { picoquic_current_time() }; + let deadlock_detected = acceptor_saturation.update(now, used, max, total_bytes); + if let Some(reason) = decide_acceptor_deadlock_reconnect( + deadlock_detected, + used, + max, + total_bytes, + acceptor_saturation.timeout_us(), + ) { + log_reconnect_reason(reason); + return true; + } + false +} + +fn log_reconnect_reason(reason: ReconnectReason) { + match reason { + ReconnectReason::ActivePathLoss { streams_len } => { + warn!( + "active resolver path deleted with {} streams and no standby path ready; reconnecting to limit reset storm", + streams_len, + ); + } + ReconnectReason::AcceptorDeadlock { + used, + max, + total_bytes, + timeout_us, + } => { + warn!( + "acceptor deadlock for {}s ({}/{}, bytes={}, no progress), forcing reconnect", + timeout_us / 1_000_000, + used, + max, + total_bytes + ); + } + } +} + +#[cfg(test)] +mod tests { + use super::{ + decide_acceptor_deadlock_reconnect, decide_active_path_loss_reconnect, ReconnectReason, + }; + + #[test] + fn active_path_loss_requires_threshold_and_no_ready_path() { + assert_eq!( + decide_active_path_loss_reconnect(32, false, 32), + Some(ReconnectReason::ActivePathLoss { streams_len: 32 }) + ); + assert_eq!(decide_active_path_loss_reconnect(31, false, 32), None); + assert_eq!(decide_active_path_loss_reconnect(64, true, 32), None); + } + + #[test] + fn acceptor_deadlock_decision_requires_detected_flag() { + assert_eq!( + decide_acceptor_deadlock_reconnect(true, 512, 512, 2048, 30_000_000), + Some(ReconnectReason::AcceptorDeadlock { + used: 512, + max: 512, + total_bytes: 2048, + timeout_us: 30_000_000, + }) + ); + assert_eq!( + decide_acceptor_deadlock_reconnect(false, 512, 512, 2048, 30_000_000), + None + ); + } +} diff --git a/crates/slipstream-client/src/runtime/health.rs b/crates/slipstream-client/src/runtime/health.rs new file mode 100644 index 00000000..b7d5d041 --- /dev/null +++ b/crates/slipstream-client/src/runtime/health.rs @@ -0,0 +1,164 @@ +pub(crate) fn should_log_flow_blocked( + streams_len: usize, + has_ready_stream: bool, + flow_blocked: bool, + now: u64, + last_log_at: u64, + interval_us: u64, +) -> bool { + streams_len > 0 + && has_ready_stream + && flow_blocked + && now.saturating_sub(last_log_at) >= interval_us +} + +pub(crate) fn compute_last_enqueue_ms(now: u64, last_enqueue_at: u64) -> u64 { + if last_enqueue_at == 0 { + 0 + } else { + now.saturating_sub(last_enqueue_at) / 1_000 + } +} + +pub(crate) fn should_reconnect_for_resolver_stall( + ready: bool, + streams_len: usize, + current_time: u64, + last_recv_at: u64, + timeout_us: u64, +) -> Option { + if !ready || last_recv_at == 0 || streams_len == 0 { + return None; + } + + let stall_us = current_time.saturating_sub(last_recv_at); + if stall_us >= timeout_us { + return Some(stall_us); + } + + None +} + +pub(crate) fn should_reconnect_for_handshake_stall( + ready: bool, + current_time: u64, + connect_started_at: u64, + timeout_us: u64, +) -> Option { + if ready || connect_started_at == 0 { + return None; + } + + let stall_us = current_time.saturating_sub(connect_started_at); + if stall_us >= timeout_us { + return Some(stall_us); + } + + None +} + +pub(crate) fn should_log_health( + ready: bool, + report_time: u64, + last_health_log_at: u64, + interval_us: u64, +) -> bool { + ready && report_time.saturating_sub(last_health_log_at) >= interval_us +} + +pub(crate) fn should_log_dual_resolver_health( + ready: bool, + resolver_count: usize, + report_time: u64, + last_log_at: u64, + interval_us: u64, +) -> bool { + ready && resolver_count > 1 && report_time.saturating_sub(last_log_at) >= interval_us +} + +#[cfg(test)] +mod tests { + use super::{ + compute_last_enqueue_ms, should_log_dual_resolver_health, should_log_flow_blocked, + should_log_health, should_reconnect_for_handshake_stall, + should_reconnect_for_resolver_stall, + }; + + #[test] + fn flow_blocked_log_requires_all_gates() { + assert!(should_log_flow_blocked(1, true, true, 10_000, 0, 1_000)); + assert!(!should_log_flow_blocked(0, true, true, 10_000, 0, 1_000)); + assert!(!should_log_flow_blocked(1, false, true, 10_000, 0, 1_000)); + assert!(!should_log_flow_blocked(1, true, false, 10_000, 0, 1_000)); + assert!(!should_log_flow_blocked(1, true, true, 1_500, 1_000, 1_000)); + } + + #[test] + fn enqueue_ms_handles_zero_timestamp() { + assert_eq!(compute_last_enqueue_ms(1_000_000, 0), 0); + assert_eq!(compute_last_enqueue_ms(2_000_000, 1_000_000), 1_000); + } + + #[test] + fn resolver_stall_requires_ready_and_streams() { + assert_eq!( + should_reconnect_for_resolver_stall(true, 1, 20_000, 1_000, 10_000), + Some(19_000) + ); + assert_eq!( + should_reconnect_for_resolver_stall(false, 1, 20_000, 1_000, 10_000), + None + ); + assert_eq!( + should_reconnect_for_resolver_stall(true, 0, 20_000, 1_000, 10_000), + None + ); + assert_eq!( + should_reconnect_for_resolver_stall(true, 1, 5_000, 1_000, 10_000), + None + ); + } + + #[test] + fn handshake_stall_requires_not_ready_and_timeout() { + assert_eq!( + should_reconnect_for_handshake_stall(false, 20_000, 1_000, 10_000), + Some(19_000) + ); + assert_eq!( + should_reconnect_for_handshake_stall(true, 20_000, 1_000, 10_000), + None + ); + assert_eq!( + should_reconnect_for_handshake_stall(false, 5_000, 1_000, 10_000), + None + ); + assert_eq!( + should_reconnect_for_handshake_stall(false, 20_000, 0, 10_000), + None + ); + } + + #[test] + fn health_log_gate_requires_ready_and_interval() { + assert!(should_log_health(true, 20_000, 1_000, 10_000)); + assert!(!should_log_health(false, 20_000, 1_000, 10_000)); + assert!(!should_log_health(true, 5_000, 1_000, 10_000)); + } + + #[test] + fn dual_resolver_health_log_gate_requires_multi_resolver() { + assert!(should_log_dual_resolver_health( + true, 2, 20_000, 1_000, 10_000 + )); + assert!(!should_log_dual_resolver_health( + true, 1, 20_000, 1_000, 10_000 + )); + assert!(!should_log_dual_resolver_health( + false, 2, 20_000, 1_000, 10_000 + )); + assert!(!should_log_dual_resolver_health( + true, 2, 5_000, 1_000, 10_000 + )); + } +} diff --git a/crates/slipstream-client/src/runtime/loop_policy.rs b/crates/slipstream-client/src/runtime/loop_policy.rs new file mode 100644 index 00000000..3bcc71c9 --- /dev/null +++ b/crates/slipstream-client/src/runtime/loop_policy.rs @@ -0,0 +1,53 @@ +use std::time::Duration; + +pub(crate) struct LoopSleepPolicy { + pub(crate) has_work: bool, + pub(crate) timeout: Duration, +} + +pub(crate) fn compute_loop_sleep_policy( + delay_us: u64, + has_work: bool, + min_poll_interval_us: u64, + dns_poll_slice_us: u64, +) -> LoopSleepPolicy { + let timeout_us = if has_work { + delay_us.clamp(min_poll_interval_us, dns_poll_slice_us) + } else { + delay_us.max(min_poll_interval_us) + }; + + LoopSleepPolicy { + has_work, + timeout: Duration::from_micros(timeout_us), + } +} + +#[cfg(test)] +mod tests { + use super::compute_loop_sleep_policy; + + #[test] + fn clamps_active_timeout_to_slice() { + let policy = compute_loop_sleep_policy(30_000, true, 100, 5_000); + assert_eq!(policy.timeout.as_micros(), 5_000); + } + + #[test] + fn enforces_min_timeout_when_active() { + let policy = compute_loop_sleep_policy(10, true, 100, 5_000); + assert_eq!(policy.timeout.as_micros(), 100); + } + + #[test] + fn uses_full_delay_when_idle() { + let policy = compute_loop_sleep_policy(30_000, false, 100, 5_000); + assert_eq!(policy.timeout.as_micros(), 30_000); + } + + #[test] + fn enforces_min_timeout_when_idle() { + let policy = compute_loop_sleep_policy(10, false, 100, 5_000); + assert_eq!(policy.timeout.as_micros(), 100); + } +} diff --git a/crates/slipstream-client/src/runtime/path.rs b/crates/slipstream-client/src/runtime/path.rs index 966a4e75..e48ac6f8 100644 --- a/crates/slipstream-client/src/runtime/path.rs +++ b/crates/slipstream-client/src/runtime/path.rs @@ -1,17 +1,20 @@ use crate::dns::{ - refresh_resolver_path, reset_resolver_path, resolver_mode_to_c, - sockaddr_storage_to_socket_addr, ResolverState, + note_active_path_delete_signal, record_resolver_switch, refresh_resolver_path, + reset_resolver_path, resolver_mode_to_c, should_failover_active_path, + sockaddr_storage_to_socket_addr, ResolverManager, ResolverState, }; use crate::error::ClientError; use crate::streams::{ClientState, PathEvent}; use slipstream_core::normalize_dual_stack_addr; use slipstream_ffi::picoquic::{ picoquic_cnx_t, picoquic_get_default_path_quality, picoquic_get_path_addr, - picoquic_get_path_quality, slipstream_get_path_id_from_unique, slipstream_set_path_ack_delay, - slipstream_set_path_mode, PICOQUIC_PACKET_LOOP_SEND_MAX, + picoquic_get_path_quality, slipstream_get_path_id_from_unique, + slipstream_set_default_path_mode, slipstream_set_path_ack_delay, slipstream_set_path_mode, + PICOQUIC_PACKET_LOOP_SEND_MAX, }; use slipstream_ffi::ResolverMode; use std::net::SocketAddr; +use tracing::warn; const AUTHORITATIVE_LOOP_MULTIPLIER: usize = 4; @@ -47,18 +50,52 @@ pub(crate) fn fetch_path_quality( quality } +pub(crate) fn fetch_and_record_path_quality( + cnx: *mut picoquic_cnx_t, + resolver: &mut ResolverState, +) -> slipstream_ffi::picoquic::picoquic_path_quality_t { + let quality = fetch_path_quality(cnx, resolver); + resolver.debug.path_rtt_us = quality.rtt; + resolver.debug.path_cwnd = quality.cwin; + resolver.debug.path_bytes_in_transit = quality.bytes_in_transit; + resolver.debug.path_pacing_rate = quality.pacing_rate; + quality +} + +pub(crate) fn maybe_switch_active_resolver( + resolver_manager: &mut ResolverManager, + current_time: u64, + preferred_startup_resolver_index: &mut usize, +) { + if let Some((from_index, to_index, reason)) = resolver_manager.maybe_select_active(current_time) + { + *preferred_startup_resolver_index = to_index; + record_resolver_switch( + resolver_manager.as_mut_slice(), + Some(from_index), + to_index, + reason, + ); + unsafe { + let mode = resolver_manager.active().mode; + slipstream_set_default_path_mode(resolver_mode_to_c(mode)); + } + } +} + pub(crate) fn drain_path_events( cnx: *mut picoquic_cnx_t, resolvers: &mut [ResolverState], state_ptr: *mut ClientState, -) { +) -> bool { if state_ptr.is_null() { - return; + return false; } let events = unsafe { (*state_ptr).take_path_events() }; if events.is_empty() { - return; + return false; } + let mut active_path_deleted = false; for event in events { match event { PathEvent::Available(unique_path_id) => { @@ -78,11 +115,28 @@ pub(crate) fn drain_path_events( } PathEvent::Deleted(unique_path_id) => { if let Some(resolver) = find_resolver_by_unique_id_mut(resolvers, unique_path_id) { + if resolver.is_active() { + let now = unsafe { slipstream_ffi::picoquic::picoquic_current_time() }; + note_active_path_delete_signal(resolver, now); + if should_failover_active_path(resolver, now) { + active_path_deleted = true; + reset_resolver_path(resolver); + } else { + warn!( + "active path delete observed for resolver {}; waiting for progress-confirmed failover", + resolver.addr + ); + resolver.unique_path_id = None; + } + continue; + } reset_resolver_path(resolver); } } } } + + active_path_deleted } fn path_peer_addr(cnx: *mut picoquic_cnx_t, unique_path_id: u64) -> Option { diff --git a/crates/slipstream-client/src/runtime/setup.rs b/crates/slipstream-client/src/runtime/setup.rs index 5cbf2fd5..00a0fa34 100644 --- a/crates/slipstream-client/src/runtime/setup.rs +++ b/crates/slipstream-client/src/runtime/setup.rs @@ -1,22 +1,19 @@ use crate::error::ClientError; +use slipstream_dns::max_payload_len_for_domain; use socket2::{Domain, Protocol, SockAddr, Socket, Type}; use std::net::{Ipv6Addr, SocketAddr, SocketAddrV6}; use tokio::net::{lookup_host, TcpListener as TokioTcpListener, UdpSocket as TokioUdpSocket}; use tracing::warn; -pub(crate) fn compute_mtu(domain_len: usize) -> Result { - if domain_len >= 240 { - return Err(ClientError::new( - "Domain name is too long for DNS transport", - )); - } - let mtu = ((240.0 - domain_len as f64) / 1.6) as u32; +pub(crate) fn compute_mtu(domain: &str) -> Result { + let mtu = + max_payload_len_for_domain(domain).map_err(|err| ClientError::new(err.to_string()))?; if mtu == 0 { return Err(ClientError::new( "MTU computed to zero; check domain length", )); } - Ok(mtu) + u32::try_from(mtu).map_err(|_| ClientError::new("MTU conversion overflow")) } pub(crate) async fn bind_udp_socket() -> Result { diff --git a/crates/slipstream-client/src/streams.rs b/crates/slipstream-client/src/streams.rs index 5c536357..e4dc8f08 100644 --- a/crates/slipstream-client/src/streams.rs +++ b/crates/slipstream-client/src/streams.rs @@ -534,7 +534,7 @@ impl ClientState { let idle_since = fin_at.max(stream.last_tx_progress_at_us); let idle_secs = now_us.saturating_sub(idle_since) as f64 / 1_000_000.0; let _ = read_abort_tx.send(()); - warn!( + info!( "stream {}: stale half-close idle for {:.1}s; aborting local reader (rx_bytes={} tx_bytes={} queued={} recv_state={:?} send_state={:?})", stream_id, idle_secs, @@ -819,7 +819,7 @@ pub(crate) unsafe extern "C" fn client_callback( stream.send_state ); } else { - warn!( + debug!( "stream {}: reset event={} (unknown stream)", stream_id, reason ); @@ -1679,7 +1679,7 @@ pub(crate) fn handle_command( stream.flow.fin_offset ); } else { - warn!("stream {}: tcp read error (unknown stream)", stream_id); + debug!("stream {}: tcp read error (unknown stream)", stream_id); } unsafe { abort_stream_bidi(cnx, stream_id, SLIPSTREAM_INTERNAL_ERROR) }; } @@ -1695,7 +1695,7 @@ pub(crate) fn handle_command( stream.flow.fin_offset ); } else { - warn!("stream {}: tcp write error (unknown stream)", stream_id); + debug!("stream {}: tcp write error (unknown stream)", stream_id); } unsafe { abort_stream_bidi(cnx, stream_id, SLIPSTREAM_INTERNAL_ERROR) }; } diff --git a/crates/slipstream-core/src/lib.rs b/crates/slipstream-core/src/lib.rs index 61898337..abbd7801 100644 --- a/crates/slipstream-core/src/lib.rs +++ b/crates/slipstream-core/src/lib.rs @@ -5,6 +5,7 @@ pub mod invariants; mod macros; pub mod net; pub mod sip003; +pub mod state_machine; pub mod stream; pub mod tcp; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs}; diff --git a/crates/slipstream-core/src/state_machine.rs b/crates/slipstream-core/src/state_machine.rs new file mode 100644 index 00000000..6e8a6eb5 --- /dev/null +++ b/crates/slipstream-core/src/state_machine.rs @@ -0,0 +1,357 @@ +use std::fmt; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ConnState { + Init, + Handshaking, + Active, + Degraded, + Recovering, + Draining, + Closed, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ConnEvent { + StartHandshake, + HandshakeReady, + HandshakeFailed, + Degrade, + Recover, + Drain, + Close, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ResolverHealthState { + Unknown, + Probing, + Healthy, + Degraded, + Cooling, + Dead, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ResolverRole { + Active, + Standby, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ResolverEvent { + ProbeStarted, + ProbeSucceeded, + ProbeFailed, + MarkDegraded, + StartCooldown, + CooldownExpired, + MarkDead, + Revive, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum StreamState { + Idle, + Open, + FinLocal, + FinRemote, + Closed, + Reset, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum StreamEvent { + Open, + LocalFin, + RemoteFin, + AckClose, + Reset, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum MachineKind { + Connection, + Resolver, + Stream, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TransitionError { + pub machine: MachineKind, + pub state: &'static str, + pub event: &'static str, +} + +impl TransitionError { + fn new(machine: MachineKind, state: &'static str, event: &'static str) -> Self { + Self { + machine, + state, + event, + } + } +} + +impl fmt::Display for TransitionError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "invalid {:?} transition: state={} event={}", + self.machine, self.state, self.event + ) + } +} + +impl std::error::Error for TransitionError {} + +pub fn validate_conn_transition( + state: ConnState, + event: ConnEvent, +) -> Result { + if event == ConnEvent::Close { + return match state { + ConnState::Closed => Err(TransitionError::new( + MachineKind::Connection, + conn_state_name(state), + conn_event_name(event), + )), + _ => Ok(ConnState::Closed), + }; + } + + let next = match (state, event) { + (ConnState::Init, ConnEvent::StartHandshake) => ConnState::Handshaking, + (ConnState::Handshaking, ConnEvent::HandshakeReady) => ConnState::Active, + (ConnState::Handshaking, ConnEvent::HandshakeFailed) => ConnState::Recovering, + (ConnState::Active, ConnEvent::Degrade) => ConnState::Degraded, + (ConnState::Active, ConnEvent::Drain) => ConnState::Draining, + (ConnState::Degraded, ConnEvent::Recover) => ConnState::Active, + (ConnState::Degraded, ConnEvent::Drain) => ConnState::Draining, + (ConnState::Recovering, ConnEvent::StartHandshake) => ConnState::Handshaking, + (ConnState::Recovering, ConnEvent::Drain) => ConnState::Draining, + (ConnState::Draining, ConnEvent::Drain) => ConnState::Draining, + _ => { + return Err(TransitionError::new( + MachineKind::Connection, + conn_state_name(state), + conn_event_name(event), + )); + } + }; + + Ok(next) +} + +pub fn validate_resolver_transition( + state: ResolverHealthState, + event: ResolverEvent, +) -> Result { + let next = match (state, event) { + (ResolverHealthState::Unknown, ResolverEvent::ProbeStarted) => ResolverHealthState::Probing, + (ResolverHealthState::Probing, ResolverEvent::ProbeSucceeded) => { + ResolverHealthState::Healthy + } + (ResolverHealthState::Probing, ResolverEvent::ProbeFailed) => ResolverHealthState::Degraded, + (ResolverHealthState::Healthy, ResolverEvent::MarkDegraded) => { + ResolverHealthState::Degraded + } + (ResolverHealthState::Healthy, ResolverEvent::MarkDead) => ResolverHealthState::Dead, + (ResolverHealthState::Degraded, ResolverEvent::ProbeSucceeded) => { + ResolverHealthState::Healthy + } + (ResolverHealthState::Degraded, ResolverEvent::StartCooldown) => { + ResolverHealthState::Cooling + } + (ResolverHealthState::Degraded, ResolverEvent::MarkDead) => ResolverHealthState::Dead, + (ResolverHealthState::Cooling, ResolverEvent::CooldownExpired) => { + ResolverHealthState::Probing + } + (ResolverHealthState::Cooling, ResolverEvent::MarkDead) => ResolverHealthState::Dead, + (ResolverHealthState::Dead, ResolverEvent::Revive) => ResolverHealthState::Probing, + _ => { + return Err(TransitionError::new( + MachineKind::Resolver, + resolver_state_name(state), + resolver_event_name(event), + )); + } + }; + + Ok(next) +} + +pub fn validate_stream_transition( + state: StreamState, + event: StreamEvent, +) -> Result { + let next = match (state, event) { + (StreamState::Idle, StreamEvent::Open) => StreamState::Open, + (StreamState::Open, StreamEvent::LocalFin) => StreamState::FinLocal, + (StreamState::Open, StreamEvent::RemoteFin) => StreamState::FinRemote, + (StreamState::Open, StreamEvent::Reset) => StreamState::Reset, + (StreamState::FinLocal, StreamEvent::RemoteFin) => StreamState::Closed, + (StreamState::FinLocal, StreamEvent::AckClose) => StreamState::Closed, + (StreamState::FinLocal, StreamEvent::Reset) => StreamState::Reset, + (StreamState::FinRemote, StreamEvent::LocalFin) => StreamState::Closed, + (StreamState::FinRemote, StreamEvent::AckClose) => StreamState::Closed, + (StreamState::FinRemote, StreamEvent::Reset) => StreamState::Reset, + (StreamState::Reset, StreamEvent::AckClose) => StreamState::Closed, + _ => { + return Err(TransitionError::new( + MachineKind::Stream, + stream_state_name(state), + stream_event_name(event), + )); + } + }; + + Ok(next) +} + +fn conn_state_name(state: ConnState) -> &'static str { + match state { + ConnState::Init => "Init", + ConnState::Handshaking => "Handshaking", + ConnState::Active => "Active", + ConnState::Degraded => "Degraded", + ConnState::Recovering => "Recovering", + ConnState::Draining => "Draining", + ConnState::Closed => "Closed", + } +} + +fn conn_event_name(event: ConnEvent) -> &'static str { + match event { + ConnEvent::StartHandshake => "StartHandshake", + ConnEvent::HandshakeReady => "HandshakeReady", + ConnEvent::HandshakeFailed => "HandshakeFailed", + ConnEvent::Degrade => "Degrade", + ConnEvent::Recover => "Recover", + ConnEvent::Drain => "Drain", + ConnEvent::Close => "Close", + } +} + +fn resolver_state_name(state: ResolverHealthState) -> &'static str { + match state { + ResolverHealthState::Unknown => "Unknown", + ResolverHealthState::Probing => "Probing", + ResolverHealthState::Healthy => "Healthy", + ResolverHealthState::Degraded => "Degraded", + ResolverHealthState::Cooling => "Cooling", + ResolverHealthState::Dead => "Dead", + } +} + +fn resolver_event_name(event: ResolverEvent) -> &'static str { + match event { + ResolverEvent::ProbeStarted => "ProbeStarted", + ResolverEvent::ProbeSucceeded => "ProbeSucceeded", + ResolverEvent::ProbeFailed => "ProbeFailed", + ResolverEvent::MarkDegraded => "MarkDegraded", + ResolverEvent::StartCooldown => "StartCooldown", + ResolverEvent::CooldownExpired => "CooldownExpired", + ResolverEvent::MarkDead => "MarkDead", + ResolverEvent::Revive => "Revive", + } +} + +fn stream_state_name(state: StreamState) -> &'static str { + match state { + StreamState::Idle => "Idle", + StreamState::Open => "Open", + StreamState::FinLocal => "FinLocal", + StreamState::FinRemote => "FinRemote", + StreamState::Closed => "Closed", + StreamState::Reset => "Reset", + } +} + +fn stream_event_name(event: StreamEvent) -> &'static str { + match event { + StreamEvent::Open => "Open", + StreamEvent::LocalFin => "LocalFin", + StreamEvent::RemoteFin => "RemoteFin", + StreamEvent::AckClose => "AckClose", + StreamEvent::Reset => "Reset", + } +} + +#[cfg(test)] +mod tests { + use super::{ + validate_conn_transition, validate_resolver_transition, validate_stream_transition, + ConnEvent, ConnState, MachineKind, ResolverEvent, ResolverHealthState, StreamEvent, + StreamState, + }; + + #[test] + fn connection_transition_happy_path() { + let state = validate_conn_transition(ConnState::Init, ConnEvent::StartHandshake) + .expect("start handshake should succeed"); + let state = validate_conn_transition(state, ConnEvent::HandshakeReady) + .expect("handshake ready should succeed"); + let state = + validate_conn_transition(state, ConnEvent::Degrade).expect("degrade should succeed"); + let state = + validate_conn_transition(state, ConnEvent::Recover).expect("recover should succeed"); + let state = + validate_conn_transition(state, ConnEvent::Drain).expect("drain should succeed"); + let state = + validate_conn_transition(state, ConnEvent::Close).expect("close should succeed"); + assert_eq!(state, ConnState::Closed); + } + + #[test] + fn rejects_connection_invalid_transition_with_context() { + let err = validate_conn_transition(ConnState::Init, ConnEvent::Recover) + .expect_err("recover from init should fail"); + assert_eq!(err.machine, MachineKind::Connection); + assert_eq!(err.state, "Init"); + assert_eq!(err.event, "Recover"); + } + + #[test] + fn resolver_transition_happy_path() { + let state = + validate_resolver_transition(ResolverHealthState::Unknown, ResolverEvent::ProbeStarted) + .expect("probe start should succeed"); + let state = validate_resolver_transition(state, ResolverEvent::ProbeSucceeded) + .expect("probe success should succeed"); + let state = validate_resolver_transition(state, ResolverEvent::MarkDegraded) + .expect("degrade should succeed"); + let state = validate_resolver_transition(state, ResolverEvent::StartCooldown) + .expect("start cooldown should succeed"); + let state = validate_resolver_transition(state, ResolverEvent::CooldownExpired) + .expect("cooldown expiry should succeed"); + assert_eq!(state, ResolverHealthState::Probing); + } + + #[test] + fn resolver_dead_can_revive_to_probing() { + let state = validate_resolver_transition(ResolverHealthState::Dead, ResolverEvent::Revive) + .expect("revive should succeed"); + assert_eq!(state, ResolverHealthState::Probing); + } + + #[test] + fn stream_transition_happy_path() { + let state = validate_stream_transition(StreamState::Idle, StreamEvent::Open) + .expect("open should succeed"); + let state = validate_stream_transition(state, StreamEvent::LocalFin) + .expect("local fin should succeed"); + let state = validate_stream_transition(state, StreamEvent::RemoteFin) + .expect("remote fin should succeed"); + assert_eq!(state, StreamState::Closed); + } + + #[test] + fn stream_invalid_transition_reports_context() { + let err = validate_stream_transition(StreamState::Idle, StreamEvent::RemoteFin) + .expect_err("remote fin from idle should fail"); + assert_eq!(err.machine, MachineKind::Stream); + assert_eq!(err.state, "Idle"); + assert_eq!(err.event, "RemoteFin"); + } +} diff --git a/crates/slipstream-server/src/target.rs b/crates/slipstream-server/src/target.rs index 132ee4eb..ba6dd12e 100644 --- a/crates/slipstream-server/src/target.rs +++ b/crates/slipstream-server/src/target.rs @@ -6,11 +6,16 @@ use slipstream_core::tcp::{stream_read_limit_chunks, tcp_send_buffer_bytes}; use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use std::time::Duration; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream as TokioTcpStream; use tokio::sync::{mpsc, watch}; +use tokio::time::sleep; use tracing::{debug, warn}; +const TARGET_CONNECT_MAX_ATTEMPTS: usize = 5; +const TARGET_CONNECT_RETRY_DELAY: Duration = Duration::from_millis(100); + pub(crate) fn spawn_target_connector( key: StreamKey, target_addr: SocketAddr, @@ -22,13 +27,34 @@ pub(crate) fn spawn_target_connector( if *shutdown_rx.borrow() { return; } - let connect = TokioTcpStream::connect(target_addr); - let stream = tokio::select! { - _ = shutdown_rx.changed() => { - return; + let mut stream = Err(std::io::Error::new( + std::io::ErrorKind::ConnectionRefused, + "target connect not attempted", + )); + let mut attempts = 0usize; + for attempt in 1..=TARGET_CONNECT_MAX_ATTEMPTS { + attempts = attempt; + let connect = TokioTcpStream::connect(target_addr); + stream = tokio::select! { + _ = shutdown_rx.changed() => { + return; + } + result = connect => result, + }; + let should_retry = matches!( + stream, + Err(ref err) if err.kind() == std::io::ErrorKind::ConnectionRefused + ) && attempt < TARGET_CONNECT_MAX_ATTEMPTS; + if !should_retry { + break; + } + tokio::select! { + _ = shutdown_rx.changed() => { + return; + } + _ = sleep(TARGET_CONNECT_RETRY_DELAY) => {} } - result = connect => result, - }; + } if *shutdown_rx.borrow() { return; } @@ -74,8 +100,9 @@ pub(crate) fn spawn_target_connector( } Err(err) => { warn!( - "stream {:?}: target connect failed err={} kind={:?}", + "stream {:?}: target connect failed after {} attempt(s) err={} kind={:?}", key.stream_id, + attempts, err, err.kind() ); diff --git a/crates/slipstream-server/src/udp_fallback.rs b/crates/slipstream-server/src/udp_fallback.rs index b6b71148..0c724374 100644 --- a/crates/slipstream-server/src/udp_fallback.rs +++ b/crates/slipstream-server/src/udp_fallback.rs @@ -1,5 +1,5 @@ use slipstream_core::{net::is_transient_udp_error, normalize_dual_stack_addr}; -use slipstream_dns::{decode_query_with_domains, DecodeQueryError}; +use slipstream_dns::{decode_query_with_domains, DecodeQueryError, Rcode}; use slipstream_ffi::picoquic::{ picoquic_cnx_t, picoquic_incoming_packet_ex, picoquic_quic_t, slipstream_disable_ack_delay, }; @@ -391,6 +391,14 @@ fn decode_slot( // Treat empty-question queries (QDCOUNT=0) as non-DNS for fallback. return Ok(DecodeSlotOutcome::Drop); }; + let rcode = if rcode == Rcode::NameError && is_apex_question(&question.name, domains) { + // Recursive resolvers commonly send NS/SOA probes at the delegated apex. + // Replying NOERROR/NODATA keeps delegation healthy without affecting + // TXT payload handling under subdomains. + Rcode::Ok + } else { + rcode + }; Ok(DecodeSlotOutcome::Slot(Slot { peer, id, @@ -406,6 +414,13 @@ fn decode_slot( } } +fn is_apex_question(name: &str, domains: &[&str]) -> bool { + let query = name.trim_end_matches('.'); + domains + .iter() + .any(|domain| query.eq_ignore_ascii_case(domain.trim_end_matches('.'))) +} + fn fallback_bind_addr(fallback_addr: SocketAddr) -> SocketAddr { match fallback_addr { SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0), @@ -477,7 +492,9 @@ fn dummy_sockaddr_storage() -> libc::sockaddr_storage { #[cfg(test)] mod tests { use super::*; - use slipstream_dns::{encode_query, QueryParams, CLASS_IN, RR_A}; + use slipstream_dns::{ + decode_query_with_domains, encode_query, DecodeQueryError, QueryParams, CLASS_IN, RR_A, + }; use tokio::sync::mpsc; use tokio::time::{timeout, Duration}; @@ -495,6 +512,30 @@ mod tests { .expect("dns query") } + #[test] + fn apex_name_error_is_overridden_to_noerror() { + let packet = build_dns_query("example.com"); + let decoded = decode_query_with_domains(&packet, &["example.com"]) + .expect_err("apex A query should not decode as data packet"); + let DecodeQueryError::Reply { + question: Some(question), + rcode, + .. + } = decoded + else { + panic!("expected reply error"); + }; + assert_eq!(rcode, Rcode::NameError); + assert!(is_apex_question(&question.name, &["example.com"])); + let effective = + if rcode == Rcode::NameError && is_apex_question(&question.name, &["example.com"]) { + Rcode::Ok + } else { + rcode + }; + assert_eq!(effective, Rcode::Ok); + } + fn spawn_fallback_echo(socket: Arc, notify_tx: mpsc::UnboundedSender>) { tokio::spawn(async move { let mut buf = vec![0u8; MAX_UDP_PACKET_SIZE]; diff --git a/crates/slipstream-server/tests/epipe_reset_e2e.rs b/crates/slipstream-server/tests/epipe_reset_e2e.rs index 8b0c576b..25032727 100644 --- a/crates/slipstream-server/tests/epipe_reset_e2e.rs +++ b/crates/slipstream-server/tests/epipe_reset_e2e.rs @@ -203,7 +203,11 @@ fn epipe_triggers_quic_reset() { let saw_local_error = wait_for_any_log( &client_logs, - &["tcp write error", "tcp read error"], + &[ + "tcp write error", + "tcp read error", + "tcp write channel closed", + ], Duration::from_secs(2), ); if saw_local_error.is_none() { diff --git a/scripts/bench/run_rust_rust_10mb.sh b/scripts/bench/run_rust_rust_10mb.sh index 3f693d8e..7c188d87 100755 --- a/scripts/bench/run_rust_rust_10mb.sh +++ b/scripts/bench/run_rust_rust_10mb.sh @@ -544,6 +544,11 @@ run_client_bench() { stop_target return 1 fi + if ! wait_for_log "Rust client (${label}) ready" "${client_log}" "Connection ready"; then + stop_client + stop_target + return 1 + fi if ! run_bench_client "${label}" "${client_mode}" "${preface_bytes}"; then stop_client stop_target diff --git a/scripts/bench/tcp_bench.py b/scripts/bench/tcp_bench.py index eb438a02..60174618 100755 --- a/scripts/bench/tcp_bench.py +++ b/scripts/bench/tcp_bench.py @@ -50,28 +50,40 @@ def run_server(args: argparse.Namespace) -> int: with socket.create_server((host, port)) as server: server.settimeout(args.timeout) log_event(log_fp, {"ts": time.time(), "event": "listening", "listen": args.listen, "mode": mode}) - conn, addr = server.accept() - with conn: - conn.settimeout(args.timeout) - peer = f"{addr[0]}:{addr[1]}" - log_event(log_fp, {"ts": time.time(), "event": "accept", "peer": peer, "mode": mode}) - total = 0 - start = None - first_payload_ts = None - last_payload_ts = None - if mode == "sink": - while True: - data = conn.recv(args.chunk_size) - if not data: - break - if first_payload_ts is None: - first_payload_ts = time.time() - start = time.perf_counter() - total += len(data) - last_payload_ts = time.time() - if args.bytes and total >= args.bytes: - break - else: + + total = 0 + start = None + first_payload_ts = None + last_payload_ts = None + + if mode == "sink": + while True: + conn, addr = server.accept() + with conn: + conn.settimeout(args.timeout) + peer = f"{addr[0]}:{addr[1]}" + log_event(log_fp, {"ts": time.time(), "event": "accept", "peer": peer, "mode": mode}) + while True: + data = conn.recv(args.chunk_size) + if not data: + break + if first_payload_ts is None: + first_payload_ts = time.time() + start = time.perf_counter() + total += len(data) + last_payload_ts = time.time() + if args.bytes and total >= args.bytes: + break + if args.bytes and total >= args.bytes: + break + if not args.bytes: + break + else: + conn, addr = server.accept() + with conn: + conn.settimeout(args.timeout) + peer = f"{addr[0]}:{addr[1]}" + log_event(log_fp, {"ts": time.time(), "event": "accept", "peer": peer, "mode": mode}) remaining_preface = args.preface_bytes while remaining_preface > 0: data = conn.recv(min(args.chunk_size, remaining_preface)) @@ -89,22 +101,23 @@ def run_server(args: argparse.Namespace) -> int: last_payload_ts = time.time() remaining -= send_len total = args.bytes - elapsed = time.perf_counter() - start if start is not None else 0.0 - log_event( - log_fp, - { - "ts": time.time(), - "event": "done", - "mode": mode, - "bytes": total, - "secs": elapsed, - "first_payload_ts": first_payload_ts, - "last_payload_ts": last_payload_ts, - }, - ) - summarize(f"server {mode}", total, elapsed) - if mode == "source" and args.linger_secs > 0: - time.sleep(args.linger_secs) + + elapsed = time.perf_counter() - start if start is not None else 0.0 + log_event( + log_fp, + { + "ts": time.time(), + "event": "done", + "mode": mode, + "bytes": total, + "secs": elapsed, + "first_payload_ts": first_payload_ts, + "last_payload_ts": last_payload_ts, + }, + ) + summarize(f"server {mode}", total, elapsed) + if mode == "source" and args.linger_secs > 0: + time.sleep(args.linger_secs) if log_fp is not sys.stdout: log_fp.close()