Skip to content

Commit 0b49eda

Browse files
committed
feat(tunnel): prioritize interactive mux operations
Classify tunnel mux messages by dispatch urgency before applying the coalescing wait. Plain connection opens, connect-and-send opens, and data-bearing TCP or UDP operations now bypass the short batching delay once any already queued work has been drained. Empty polling operations and close notices remain batch-friendly so idle long-poll cadence and cleanup traffic can still piggyback without forcing extra Apps Script batches. The change leaves batch serialization, response indexing, payload-size limits, operation-count limits, deployment selection, and Apps Script quota accounting unchanged. It only decides whether the mux should wait for additional operations before processing the current group, reducing avoidable latency for interactive flows while preserving batching behavior for low-urgency traffic. Add focused unit coverage for immediate opening and payload-carrying messages, batchable empty polls and closes, and mixed groups where one urgent operation should short-circuit the wait.
1 parent 40b5386 commit 0b49eda

2 files changed

Lines changed: 116 additions & 0 deletions

File tree

docs/guide.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,8 @@ More deployments = more total concurrency = lower per-session latency. Each batc
232232
- **4 MB payload cap** per batch — well under Apps Script's 50 MB limit
233233
- **30 s timeout** per batch — slow / dead targets can't block other sessions forever
234234

235+
Opening/data-bearing tunnel operations bypass the short coalescing wait; empty polls and close notices stay batch-friendly.
236+
235237
### Full mode quick start
236238

237239
1. Deploy [`CodeFull.gs`](../assets/apps_script/CodeFull.gs) as a Web App on **each Google account** (same steps as `Code.gs`, but use the full-mode script that forwards to your tunnel-node). One deployment per account — the 30-concurrent limit is per account, so multiple deployments on one account share the pool. To scale, use more accounts:

src/tunnel_client.rs

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,30 @@ enum MuxMsg {
300300
},
301301
}
302302

303+
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
304+
enum DispatchPriority {
305+
Immediate,
306+
Batch,
307+
}
308+
309+
impl MuxMsg {
310+
fn dispatch_priority(&self) -> DispatchPriority {
311+
match self {
312+
MuxMsg::Connect { .. } | MuxMsg::ConnectData { .. } => DispatchPriority::Immediate,
313+
MuxMsg::Data { data, .. }
314+
| MuxMsg::UdpOpen { data, .. }
315+
| MuxMsg::UdpData { data, .. } => {
316+
if data.is_empty() {
317+
DispatchPriority::Batch
318+
} else {
319+
DispatchPriority::Immediate
320+
}
321+
}
322+
MuxMsg::Close { .. } => DispatchPriority::Batch,
323+
}
324+
}
325+
}
326+
303327
/// Raw, not-yet-encoded form of a batch operation. Lives only inside
304328
/// `mux_loop` and gets converted to `BatchOp` (with base64-encoded `d`)
305329
/// inside `fire_batch`'s spawned task — keeping the encoding work off
@@ -762,6 +786,8 @@ async fn mux_loop(mut rx: mpsc::UnboundedReceiver<MuxMsg>, fronter: Arc<DomainFr
762786
// queue. Once the first op lands, the adaptive coalesce loop waits
763787
// in `coalesce_step` increments (resetting on each new arrival, up
764788
// to `coalesce_max`) so concurrent ops land in the same batch.
789+
// Opening/data-bearing ops bypass that wait after any already-queued
790+
// work is drained; empty polls and closes remain batch-friendly.
765791
match rx.recv().await {
766792
Some(msg) => msgs.push(msg),
767793
None => break,
@@ -775,6 +801,9 @@ async fn mux_loop(mut rx: mpsc::UnboundedReceiver<MuxMsg>, fronter: Arc<DomainFr
775801
// Reset the soft deadline — more ops are arriving.
776802
soft_deadline = tokio::time::Instant::now() + coalesce_step;
777803
}
804+
if has_immediate_priority(&msgs) {
805+
break;
806+
}
778807
let now = tokio::time::Instant::now();
779808
let wait_until = soft_deadline.min(hard_deadline);
780809
if now >= wait_until {
@@ -908,6 +937,11 @@ async fn mux_loop(mut rx: mpsc::UnboundedReceiver<MuxMsg>, fronter: Arc<DomainFr
908937
}
909938
}
910939

940+
fn has_immediate_priority(msgs: &[MuxMsg]) -> bool {
941+
msgs.iter()
942+
.any(|msg| msg.dispatch_priority() == DispatchPriority::Immediate)
943+
}
944+
911945
/// Per-iteration accumulator for `mux_loop`. Owns the three fields that
912946
/// the data-bearing arms used to mutate in lockstep, with a single
913947
/// `push_or_fire` entry point so the cap-then-push pattern lives in one
@@ -2607,6 +2641,86 @@ mod tests {
26072641
);
26082642
}
26092643

2644+
fn batched_reply_for_test() -> BatchedReply {
2645+
oneshot::channel::<Result<(TunnelResponse, String), String>>().0
2646+
}
2647+
2648+
#[test]
2649+
fn mux_priority_marks_opening_and_payload_ops_immediate() {
2650+
let (connect_reply, _connect_rx) = oneshot::channel();
2651+
let connect = MuxMsg::Connect {
2652+
host: "example.com".into(),
2653+
port: 443,
2654+
reply: connect_reply,
2655+
};
2656+
let connect_data = MuxMsg::ConnectData {
2657+
host: "example.com".into(),
2658+
port: 443,
2659+
data: Bytes::from_static(b"\x16\x03\x01"),
2660+
reply: batched_reply_for_test(),
2661+
};
2662+
let data = MuxMsg::Data {
2663+
sid: "sid-1".into(),
2664+
data: Bytes::from_static(b"payload"),
2665+
seq: None,
2666+
wseq: None,
2667+
reply: batched_reply_for_test(),
2668+
};
2669+
let udp_open = MuxMsg::UdpOpen {
2670+
host: "example.com".into(),
2671+
port: 53,
2672+
data: Bytes::from_static(b"dns"),
2673+
reply: batched_reply_for_test(),
2674+
};
2675+
2676+
assert_eq!(connect.dispatch_priority(), DispatchPriority::Immediate);
2677+
assert_eq!(connect_data.dispatch_priority(), DispatchPriority::Immediate);
2678+
assert_eq!(data.dispatch_priority(), DispatchPriority::Immediate);
2679+
assert_eq!(udp_open.dispatch_priority(), DispatchPriority::Immediate);
2680+
}
2681+
2682+
#[test]
2683+
fn mux_priority_keeps_empty_polls_and_closes_batchable() {
2684+
let empty_data = MuxMsg::Data {
2685+
sid: "sid-1".into(),
2686+
data: Bytes::new(),
2687+
seq: None,
2688+
wseq: None,
2689+
reply: batched_reply_for_test(),
2690+
};
2691+
let empty_udp = MuxMsg::UdpData {
2692+
sid: "sid-2".into(),
2693+
data: Bytes::new(),
2694+
reply: batched_reply_for_test(),
2695+
};
2696+
let close = MuxMsg::Close {
2697+
sid: "sid-3".into(),
2698+
};
2699+
2700+
assert_eq!(empty_data.dispatch_priority(), DispatchPriority::Batch);
2701+
assert_eq!(empty_udp.dispatch_priority(), DispatchPriority::Batch);
2702+
assert_eq!(close.dispatch_priority(), DispatchPriority::Batch);
2703+
assert!(!has_immediate_priority(&[empty_data, empty_udp, close]));
2704+
}
2705+
2706+
#[test]
2707+
fn mux_priority_detects_immediate_inside_batchable_group() {
2708+
let empty_poll = MuxMsg::Data {
2709+
sid: "sid-1".into(),
2710+
data: Bytes::new(),
2711+
seq: None,
2712+
wseq: None,
2713+
reply: batched_reply_for_test(),
2714+
};
2715+
let payload = MuxMsg::UdpData {
2716+
sid: "sid-2".into(),
2717+
data: Bytes::from_static(b"packet"),
2718+
reply: batched_reply_for_test(),
2719+
};
2720+
2721+
assert!(has_immediate_priority(&[empty_poll, payload]));
2722+
}
2723+
26102724
#[test]
26112725
fn should_fire_first_op_never_fires() {
26122726
// Empty accumulator: even a single op larger than the payload cap

0 commit comments

Comments
 (0)