Skip to content

Commit 7d070d3

Browse files
feat(tunnel): event-driven drain with adaptive long-poll
1 parent f4eb166 commit 7d070d3

2 files changed

Lines changed: 718 additions & 62 deletions

File tree

src/tunnel_client.rs

Lines changed: 91 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,15 @@ const CLIENT_FIRST_DATA_WAIT: Duration = Duration::from_millis(50);
5959
/// op (version mismatch). Must match `tunnel-node/src/main.rs`.
6060
const CODE_UNSUPPORTED_OP: &str = "UNSUPPORTED_OP";
6161

62+
/// Empty poll round-trip latency below which we conclude the tunnel-node
63+
/// is *not* long-polling (legacy fixed-sleep drain instead). On a
64+
/// long-poll-capable server an empty poll with no upstream push either
65+
/// returns near `LONGPOLL_DEADLINE` (~5 s) or comes back early *with*
66+
/// pushed bytes — neither matches a fast empty reply. Threshold sits
67+
/// well above the legacy `~350 ms` drain and well below the long-poll
68+
/// floor, so network jitter on either side won't false-trigger.
69+
const LEGACY_DETECT_THRESHOLD: Duration = Duration::from_millis(1500);
70+
6271
/// Ports where the *server* speaks first (SMTP banner, SSH identification,
6372
/// POP3/IMAP greeting, FTP banner). On these, waiting for client bytes
6473
/// gains nothing and just adds handshake latency — skip the pre-read.
@@ -102,6 +111,16 @@ pub struct TunnelMux {
102111
/// `connect_data` as unsupported. Subsequent sessions skip the
103112
/// optimistic path entirely and go straight to plain connect + data.
104113
connect_data_unsupported: Arc<AtomicBool>,
114+
/// Set to `true` after we observe an empty poll round-trip that
115+
/// returned in less than `LEGACY_DETECT_THRESHOLD` with no data.
116+
/// On a long-poll-capable tunnel-node, an empty poll either returns
117+
/// quickly *with data* (push arrived) or holds open until the
118+
/// server's `LONGPOLL_DEADLINE`. A fast empty reply means the server
119+
/// is doing the legacy fixed-sleep drain — in that mode, hammering
120+
/// idle sessions at the new 500 ms cadence wastes Apps Script quota
121+
/// for no benefit, so the loop reverts to the pre-long-poll
122+
/// "skip empty polls when idle" behavior.
123+
server_no_longpoll: Arc<AtomicBool>,
105124
/// Pre-read observability. Lets an operator see whether the 50 ms
106125
/// wait-for-first-bytes is pulling its weight:
107126
/// * `preread_win` — client sent bytes in time, bundled with connect
@@ -134,6 +153,7 @@ impl TunnelMux {
134153
Arc::new(Self {
135154
tx,
136155
connect_data_unsupported: Arc::new(AtomicBool::new(false)),
156+
server_no_longpoll: Arc::new(AtomicBool::new(false)),
137157
preread_win: AtomicU64::new(0),
138158
preread_loss: AtomicU64::new(0),
139159
preread_skip_port: AtomicU64::new(0),
@@ -159,6 +179,19 @@ impl TunnelMux {
159179
}
160180
}
161181

182+
fn server_no_longpoll(&self) -> bool {
183+
self.server_no_longpoll.load(Ordering::Relaxed)
184+
}
185+
186+
fn mark_server_no_longpoll(&self) {
187+
if !self.server_no_longpoll.swap(true, Ordering::Relaxed) {
188+
tracing::warn!(
189+
"tunnel-node returned an empty poll faster than {:?}; assuming legacy (no long-poll) drain — falling back to skip-empty-when-idle to avoid quota waste",
190+
LEGACY_DETECT_THRESHOLD,
191+
);
192+
}
193+
}
194+
162195
fn record_preread_win(&self, port: u16, elapsed: Duration) {
163196
self.preread_win.fetch_add(1, Ordering::Relaxed);
164197
self.preread_win_total_us
@@ -649,14 +682,27 @@ async fn tunnel_loop(
649682
let mut consecutive_empty = 0u32;
650683

651684
loop {
685+
// Cadence depends on whether the tunnel-node is doing long-poll
686+
// drains. With long-poll, the server holds empty polls open up
687+
// to its `LONGPOLL_DEADLINE` (~5 s currently), so the client
688+
// can keep this read timeout short — the wait is on the wire,
689+
// not here. Against a *legacy* tunnel-node (no long-poll, fast
690+
// empty replies), the same short cadence + always-poll behavior
691+
// would generate continuous round-trips on idle sessions and
692+
// burn Apps Script quota. The `server_no_longpoll` flag detects
693+
// the legacy case from reply latency below and reverts to the
694+
// pre-long-poll cadence: long sleep on local read, skip empty
695+
// polls when sustained-idle.
696+
let legacy_mode = mux.server_no_longpoll();
652697
let client_data = if let Some(data) = pending_client_data.take() {
653698
Some(data)
654699
} else {
655-
let read_timeout = match consecutive_empty {
656-
0 => Duration::from_millis(20),
657-
1 => Duration::from_millis(80),
658-
2 => Duration::from_millis(200),
659-
_ => Duration::from_secs(30),
700+
let read_timeout = match (legacy_mode, consecutive_empty) {
701+
(_, 0) => Duration::from_millis(20),
702+
(_, 1) => Duration::from_millis(80),
703+
(_, 2) => Duration::from_millis(200),
704+
(false, _) => Duration::from_millis(500),
705+
(true, _) => Duration::from_secs(30),
660706
};
661707

662708
match tokio::time::timeout(read_timeout, reader.read(&mut buf)).await {
@@ -670,13 +716,21 @@ async fn tunnel_loop(
670716
}
671717
};
672718

673-
if client_data.is_none() && consecutive_empty > 3 {
719+
// Legacy-server skip: against a non-long-polling tunnel-node,
720+
// an empty poll is wasted work — fast-empty reply, no push
721+
// delivery benefit. Preserve the pre-long-poll behavior of
722+
// going quiet after a few empties. Long-poll-capable servers
723+
// skip this branch and always send the empty op so the server
724+
// can hold it open.
725+
if legacy_mode && client_data.is_none() && consecutive_empty > 3 {
674726
continue;
675727
}
676728

677729
let data = client_data.unwrap_or_default();
730+
let was_empty_poll = data.is_empty();
678731

679732
let (reply_tx, reply_rx) = oneshot::channel();
733+
let send_at = Instant::now();
680734
mux.send(MuxMsg::Data {
681735
sid: sid.to_string(),
682736
data,
@@ -701,6 +755,21 @@ async fn tunnel_loop(
701755
}
702756
};
703757

758+
// Legacy-server detection: an empty-in/empty-out round trip
759+
// that finishes well under `LEGACY_DETECT_THRESHOLD` is
760+
// structurally impossible on a long-poll-capable tunnel-node
761+
// (the server holds the response either until data arrives or
762+
// until its long-poll deadline). One observation flips the
763+
// sticky flag for the rest of this process. Skip the check
764+
// once already in legacy mode — the comparison is cheap, but
765+
// calling `mark_server_no_longpoll` repeatedly muddies logs.
766+
if !legacy_mode && was_empty_poll {
767+
let reply_was_empty = resp.d.as_deref().map(str::is_empty).unwrap_or(true);
768+
if reply_was_empty && send_at.elapsed() < LEGACY_DETECT_THRESHOLD {
769+
mux.mark_server_no_longpoll();
770+
}
771+
}
772+
704773
if let Some(ref e) = resp.e {
705774
tracing::debug!("tunnel error: {}", e);
706775
break;
@@ -844,6 +913,7 @@ mod tests {
844913
let mux = Arc::new(TunnelMux {
845914
tx,
846915
connect_data_unsupported: Arc::new(AtomicBool::new(false)),
916+
server_no_longpoll: Arc::new(AtomicBool::new(false)),
847917
preread_win: AtomicU64::new(0),
848918
preread_loss: AtomicU64::new(0),
849919
preread_skip_port: AtomicU64::new(0),
@@ -932,6 +1002,21 @@ mod tests {
9321002
assert!(mux.connect_data_unsupported());
9331003
}
9341004

1005+
/// `server_no_longpoll` must be sticky too: once we see a legacy
1006+
/// fast-empty reply, every subsequent session uses the legacy idle
1007+
/// cadence (long read timeout + skip-empty) for the rest of the
1008+
/// process. Flipping it back per-session would either thrash the
1009+
/// cadence or double the detection cost.
1010+
#[test]
1011+
fn no_longpoll_cache_is_sticky() {
1012+
let (mux, _rx) = mux_for_test();
1013+
assert!(!mux.server_no_longpoll());
1014+
mux.mark_server_no_longpoll();
1015+
assert!(mux.server_no_longpoll());
1016+
mux.mark_server_no_longpoll(); // idempotent
1017+
assert!(mux.server_no_longpoll());
1018+
}
1019+
9351020
#[test]
9361021
fn preread_counters_track_each_outcome() {
9371022
let (mux, _rx) = mux_for_test();

0 commit comments

Comments
 (0)