Skip to content

Commit ed7aa79

Browse files
feat: per-session pipelining via seq-ordered data ops
1 parent c774851 commit ed7aa79

3 files changed

Lines changed: 3620 additions & 319 deletions

File tree

src/domain_fronter.rs

Lines changed: 102 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -435,8 +435,32 @@ pub struct TunnelResponse {
435435
/// `e` only when this is `None` and compatibility is needed.
436436
#[serde(default)]
437437
pub code: Option<String>,
438+
/// Per-session sequence number echoed from the request's `seq` (if any).
439+
/// `None` from servers that don't speak the pipelining protocol — the
440+
/// client treats absence as "not pipelining-capable" and uses the
441+
/// single-in-flight loop. Present and matching the request's seq
442+
/// otherwise; the client routes the response into the per-session
443+
/// reorder buffer keyed by this value.
444+
///
445+
/// `u64` (not `u32`): a long-lived TCP session generating ~100 ops/s
446+
/// would hit `u32::MAX` in ~1.4 years, at which point the server's
447+
/// `expected` saturates and refuses every subsequent op. `u64`
448+
/// pushes that horizon past any realistic hardware lifetime.
449+
#[serde(default)]
450+
pub seq: Option<u64>,
451+
/// Capability bitfield, advertised on `connect` / `connect_data`
452+
/// success responses by tunnel-nodes that speak ≥ this protocol
453+
/// version. Bit 0 = `CAPS_PIPELINE_SEQ`. Old tunnel-nodes return
454+
/// `None`; the client opts into pipelining only when the bit is set.
455+
#[serde(default)]
456+
pub caps: Option<u32>,
438457
}
439458

459+
/// Capability bit advertised by a tunnel-node that supports per-session
460+
/// `data`-op sequence numbers (per-session pipelining). Only checked on
461+
/// `connect` / `connect_data` success replies.
462+
pub const CAPS_PIPELINE_SEQ: u32 = 1 << 0;
463+
440464
/// A single op in a batch tunnel request.
441465
#[derive(Serialize, Clone, Debug)]
442466
pub struct BatchOp {
@@ -449,6 +473,16 @@ pub struct BatchOp {
449473
pub port: Option<u16>,
450474
#[serde(skip_serializing_if = "Option::is_none")]
451475
pub d: Option<String>,
476+
/// Per-session monotonic sequence number for `data` ops on a
477+
/// pipelining-enabled session. Lets the tunnel-node enforce
478+
/// in-order processing of ops that arrive in different batches
479+
/// (potentially via different deployments and network paths).
480+
/// Skipped from the wire when `None`; old tunnel-nodes that don't
481+
/// recognize the field ignore it (serde default + structural-only
482+
/// JSON dispatch on the server). `u64` to keep long-lived sessions
483+
/// from saturating — see `TunnelResponse::seq` for the math.
484+
#[serde(skip_serializing_if = "Option::is_none")]
485+
pub seq: Option<u64>,
452486
}
453487

454488
/// Batch tunnel response from Apps Script / tunnel node.
@@ -2793,10 +2827,35 @@ impl DomainFronter {
27932827
/// Like `tunnel_batch_request` but targets a specific deployment ID.
27942828
/// Used by the pipeline mux to pin a batch to a deployment whose
27952829
/// per-account concurrency slot has already been acquired.
2830+
///
2831+
/// Backward-compatible 3-arg signature: forwards to
2832+
/// `tunnel_batch_request_with_timeout` using the configured
2833+
/// `self.batch_timeout`. New callers that need pipelined seq's
2834+
/// longer effective budget should use the explicit
2835+
/// `tunnel_batch_request_with_timeout` directly.
27962836
pub async fn tunnel_batch_request_to(
27972837
&self,
27982838
script_id: &str,
27992839
ops: &[BatchOp],
2840+
) -> Result<BatchTunnelResponse, FronterError> {
2841+
self.tunnel_batch_request_with_timeout(script_id, ops, self.batch_timeout)
2842+
.await
2843+
}
2844+
2845+
/// Targets a specific deployment with an explicit per-batch
2846+
/// timeout, applied to both the h2 fast path (via
2847+
/// `h2_relay_request`'s `response_deadline`) and the h1
2848+
/// fallback's header read. The caller computes this: legacy
2849+
/// batches use `self.batch_timeout`, pipelined batches use
2850+
/// `max(self.batch_timeout, PIPELINED_BATCH_TIMEOUT_FLOOR)` so a
2851+
/// valid server-side wait of `SEQ_WAIT_TIMEOUT +
2852+
/// LONGPOLL_DEADLINE` doesn't fire a spurious transport timeout
2853+
/// AND a deployment timeout strike.
2854+
pub async fn tunnel_batch_request_with_timeout(
2855+
&self,
2856+
script_id: &str,
2857+
ops: &[BatchOp],
2858+
batch_timeout: Duration,
28002859
) -> Result<BatchTunnelResponse, FronterError> {
28012860
let mut map = serde_json::Map::new();
28022861
map.insert("k".into(), Value::String(self.auth_key.clone()));
@@ -2813,11 +2872,14 @@ impl DomainFronter {
28132872
// `data`/`udp_data`/`connect` may have already executed
28142873
// upstream when the response framing failed. Replaying the
28152874
// whole batch on h1 risks duplicating every op in it. Only
2816-
// fall back when h2 definitely never sent. Honors
2817-
// user-configured batch_timeout so a slow but legitimate
2818-
// batch isn't cut off at an arbitrary fixed cap.
2875+
// fall back when h2 definitely never sent. Honors the
2876+
// caller-supplied `batch_timeout` (legacy = configured
2877+
// `request_timeout_secs`; pipelined = max(configured,
2878+
// PIPELINED_BATCH_TIMEOUT_FLOOR)) so a slow but legitimate
2879+
// batch isn't cut off at an arbitrary fixed cap, and
2880+
// pipelined-batch server-side waits stay inside the budget.
28192881
match self
2820-
.h2_relay_request(&path, payload.clone(), self.batch_timeout)
2882+
.h2_relay_request(&path, payload.clone(), batch_timeout)
28212883
.await
28222884
{
28232885
Ok((status, _hdrs, _resp_body)) if is_h2_fronting_refusal_status(status) => {
@@ -2865,8 +2927,14 @@ impl DomainFronter {
28652927
entry.stream.write_all(&payload).await?;
28662928
entry.stream.flush().await?;
28672929

2930+
// Use `batch_timeout` for the per-read deadline on this h1
2931+
// fallback. The legacy 10 s default in `read_http_response`
2932+
// would fire before a pipelined batch's server-side
2933+
// `SEQ_WAIT_TIMEOUT + LONGPOLL_DEADLINE` (~45 s) wait could
2934+
// legitimately complete — that fired as "batch timed out"
2935+
// client-side AND recorded a deployment timeout strike.
28682936
let (mut status, mut resp_headers, mut resp_body) =
2869-
read_http_response(&mut entry.stream).await?;
2937+
read_http_response_with_timeout(&mut entry.stream, batch_timeout).await?;
28702938

28712939
// Follow redirect chain
28722940
for _ in 0..5 {
@@ -2879,7 +2947,7 @@ impl DomainFronter {
28792947
);
28802948
entry.stream.write_all(req.as_bytes()).await?;
28812949
entry.stream.flush().await?;
2882-
let (s, h, b) = read_http_response(&mut entry.stream).await?;
2950+
let (s, h, b) = read_http_response_with_timeout(&mut entry.stream, batch_timeout).await?;
28832951
status = s; resp_headers = h; resp_body = b;
28842952
}
28852953

@@ -3665,14 +3733,41 @@ fn parse_redirect(location: &str) -> (String, Option<String>) {
36653733

36663734
/// Read a single HTTP/1.1 response from the stream. Keep-alive safe: respects
36673735
/// Content-Length or chunked transfer-encoding.
3736+
/// Default first-byte / per-read deadline for `read_http_response`.
3737+
/// Single-tunnel-op and relay paths use this — Apps Script normally
3738+
/// streams headers within ~1 s, so 10 s catches stalls without
3739+
/// false-firing on jitter. The batched-tunnel path overrides via
3740+
/// `read_http_response_with_timeout` because pipelined batches can
3741+
/// legitimately wait `SEQ_WAIT_TIMEOUT + LONGPOLL_DEADLINE` before
3742+
/// Apps Script sends a single response byte.
3743+
const DEFAULT_HTTP_READ_TIMEOUT: Duration = Duration::from_secs(10);
3744+
36683745
async fn read_http_response<S>(stream: &mut S) -> Result<(u16, Vec<(String, String)>, Vec<u8>), FronterError>
3746+
where
3747+
S: tokio::io::AsyncRead + Unpin,
3748+
{
3749+
read_http_response_with_timeout(stream, DEFAULT_HTTP_READ_TIMEOUT).await
3750+
}
3751+
3752+
/// Like [`read_http_response`] but with a caller-controlled per-read
3753+
/// deadline. Used by the batched-tunnel h1 path so a pipelined batch
3754+
/// whose server-side worst-case wait is `SEQ_WAIT_TIMEOUT (30 s) +
3755+
/// LONGPOLL_DEADLINE (15 s)` doesn't get cut off at the legacy 10 s
3756+
/// header read — that fired before the response could legitimately
3757+
/// arrive, surfacing as a "batch timed out" client-side AND
3758+
/// recording a timeout strike against an otherwise-healthy
3759+
/// deployment.
3760+
async fn read_http_response_with_timeout<S>(
3761+
stream: &mut S,
3762+
read_timeout: Duration,
3763+
) -> Result<(u16, Vec<(String, String)>, Vec<u8>), FronterError>
36693764
where
36703765
S: tokio::io::AsyncRead + Unpin,
36713766
{
36723767
let mut buf = Vec::with_capacity(8192);
36733768
let mut tmp = [0u8; 8192];
36743769
let header_end = loop {
3675-
let n = timeout(Duration::from_secs(10), stream.read(&mut tmp)).await
3770+
let n = timeout(read_timeout, stream.read(&mut tmp)).await
36763771
.map_err(|_| FronterError::Timeout)??;
36773772
if n == 0 {
36783773
return Err(FronterError::BadResponse("connection closed before headers".into()));

0 commit comments

Comments
 (0)