Skip to content

Commit 87594b1

Browse files
CaptainMiragemaybeknott
authored andcommitted
fix: WebSocket passthrough and stream timeout decoupling
Inside the MITM TLS session, detect Connection: Upgrade + Upgrade: websocket before hitting the Apps Script relay. Establish a direct TLS connection to the real server (via upstream_socks5 if configured), relay the upgrade handshake, then splice both directions with copy_bidirectional. Apps Script cannot hold persistent connections so the bypass is the only viable path for wss://. Split request_timeout_secs (header/connect, 30s) from a new stream_timeout_secs (per-chunk body idle, default 300s) so large range downloads through Apps Script are not killed mid-transfer by the batch_timeout firing during the body drain phase.
1 parent 40b5386 commit 87594b1

3 files changed

Lines changed: 217 additions & 29 deletions

File tree

src/config.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,9 +376,22 @@ pub struct Config {
376376
/// retry sooner when a deployment hangs. Floor `5`, ceiling `300`
377377
/// (anything beyond exceeds Apps Script's hard 6-min cap with
378378
/// no benefit).
379+
///
380+
/// This applies to connection establishment and response header
381+
/// arrival only. Body streaming is governed by `stream_timeout_secs`.
379382
#[serde(default = "default_request_timeout_secs")]
380383
pub request_timeout_secs: u64,
381384

385+
/// Per-chunk body streaming idle timeout (seconds). Default `300`.
386+
/// Applies to each individual body chunk read after headers arrive —
387+
/// a chunk that goes silent for longer than this is considered a
388+
/// stalled connection and the request is aborted. Distinct from
389+
/// `request_timeout_secs` so large responses through Apps Script
390+
/// (where each 256 KB range chunk can take 30-90s) are not killed
391+
/// mid-transfer. Floor `10`, ceiling `3600`.
392+
#[serde(default = "default_stream_timeout_secs")]
393+
pub stream_timeout_secs: u64,
394+
382395
/// Optional second-hop exit node, for sites that block traffic
383396
/// from Google datacenter IPs (Apps Script's outbound IP space).
384397
/// Most visibly: Cloudflare-fronted services that flag the GCP IP
@@ -531,6 +544,10 @@ fn default_auto_blacklist_cooldown_secs() -> u64 { 120 }
531544
/// hard-coded `BATCH_TIMEOUT` and Apps Script's typical response cliff.
532545
fn default_request_timeout_secs() -> u64 { 30 }
533546

547+
/// Default for `stream_timeout_secs`: 300s per-chunk idle timeout for
548+
/// body streaming, separate from the header/connect timeout.
549+
fn default_stream_timeout_secs() -> u64 { 300 }
550+
534551
fn default_google_ip() -> String {
535552
"216.239.38.120".into()
536553
}
@@ -766,6 +783,8 @@ pub struct TomlRelay {
766783
pub auto_blacklist_cooldown_secs: u64,
767784
#[serde(default = "default_request_timeout_secs")]
768785
pub request_timeout_secs: u64,
786+
#[serde(default = "default_stream_timeout_secs")]
787+
pub stream_timeout_secs: u64,
769788
}
770789

771790
/// [network] section of config.toml.
@@ -919,6 +938,7 @@ impl From<TomlConfig> for Config {
919938
auto_blacklist_window_secs: t.relay.auto_blacklist_window_secs,
920939
auto_blacklist_cooldown_secs: t.relay.auto_blacklist_cooldown_secs,
921940
request_timeout_secs: t.relay.request_timeout_secs,
941+
stream_timeout_secs: t.relay.stream_timeout_secs,
922942
exit_node: t.exit_node,
923943
}
924944
}
@@ -946,6 +966,7 @@ impl From<&Config> for TomlConfig {
946966
auto_blacklist_window_secs: c.auto_blacklist_window_secs,
947967
auto_blacklist_cooldown_secs: c.auto_blacklist_cooldown_secs,
948968
request_timeout_secs: c.request_timeout_secs,
969+
stream_timeout_secs: c.stream_timeout_secs,
949970
},
950971
network: TomlNetwork {
951972
google_ip: c.google_ip.clone(),

src/domain_fronter.rs

Lines changed: 47 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,12 @@ pub struct DomainFronter {
423423
/// Per-batch HTTP timeout. Mirrors `Config::request_timeout_secs`
424424
/// (#430, masterking32 PR #25). Read by `tunnel_client::fire_batch`
425425
/// so a single config field tunes the timeout used everywhere.
426+
/// Applies to connection establishment and response header arrival only.
426427
batch_timeout: Duration,
428+
/// Per-chunk body streaming idle timeout. Mirrors `Config::stream_timeout_secs`.
429+
/// Applied per-iteration of the body drain loop so large responses
430+
/// through Apps Script are not killed mid-transfer by `batch_timeout`.
431+
stream_timeout: Duration,
427432
/// Optional second-hop exit node (Deno Deploy / fly.io / etc.)
428433
/// to bypass CF-anti-bot blocks on sites that flag Google datacenter
429434
/// IPs (chatgpt.com, claude.ai, grok.com, x.com). Mirrors
@@ -642,6 +647,9 @@ impl DomainFronter {
642647
batch_timeout: Duration::from_secs(
643648
config.request_timeout_secs.clamp(5, 300),
644649
),
650+
stream_timeout: Duration::from_secs(
651+
config.stream_timeout_secs.clamp(10, 3600),
652+
),
645653
exit_node_enabled: config.exit_node.enabled
646654
&& !config.exit_node.relay_url.is_empty()
647655
&& !config.exit_node.psk.is_empty(),
@@ -697,6 +705,11 @@ impl DomainFronter {
697705
self.batch_timeout
698706
}
699707

708+
/// Per-chunk body streaming idle timeout. Clamped to `[10s, 3600s]`.
709+
pub(crate) fn stream_timeout(&self) -> Duration {
710+
self.stream_timeout
711+
}
712+
700713
/// Record one relay call toward the daily budget. Called once per
701714
/// outbound Apps Script fetch. Rolls over both daily counters at
702715
/// 00:00 Pacific Time, matching Apps Script's quota reset cadence
@@ -1533,18 +1546,17 @@ impl DomainFronter {
15331546
})?;
15341547
}
15351548

1536-
// Phase 2: response headers + body drain. Bounded by the
1537-
// caller's deadline. Errors and timeout here are
1538-
// `RequestSent::Maybe` — the request is on the wire and may
1539-
// already have side effects.
1540-
let response_phase = async {
1549+
// Phase 2a: wait for response headers. Bounded by the caller's
1550+
// deadline (`batch_timeout` / `request_timeout_secs`). A timeout
1551+
// here means the relay never responded — safe to retry.
1552+
let header_phase = async {
15411553
let response = response_fut.await.map_err(|e| {
15421554
(
15431555
FronterError::Relay(format!("h2 response: {}", e)),
15441556
RequestSent::Maybe,
15451557
)
15461558
})?;
1547-
let (parts, mut body) = response.into_parts();
1559+
let (parts, body) = response.into_parts();
15481560
let status = parts.status.as_u16();
15491561

15501562
// Convert headers to the (String, String) Vec the rest of
@@ -1557,27 +1569,12 @@ impl DomainFronter {
15571569
headers.push((name.as_str().to_string(), v.to_string()));
15581570
}
15591571
}
1560-
1561-
// Drain body. Release flow-control credit per chunk so
1562-
// large responses don't stall after the initial 4 MB window.
1563-
let mut buf: Vec<u8> = Vec::new();
1564-
while let Some(chunk) = body.data().await {
1565-
let chunk = chunk.map_err(|e| {
1566-
(
1567-
FronterError::Relay(format!("h2 body chunk: {}", e)),
1568-
RequestSent::Maybe,
1569-
)
1570-
})?;
1571-
let n = chunk.len();
1572-
buf.extend_from_slice(&chunk);
1573-
let _ = body.flow_control().release_capacity(n);
1574-
}
1575-
Ok::<_, (FronterError, RequestSent)>((status, headers, buf))
1572+
Ok::<_, (FronterError, RequestSent)>((status, headers, body))
15761573
};
15771574

1578-
let (status, headers, mut buf) = match tokio::time::timeout(
1575+
let (status, headers, mut body) = match tokio::time::timeout(
15791576
response_deadline,
1580-
response_phase,
1577+
header_phase,
15811578
)
15821579
.await
15831580
{
@@ -1586,6 +1583,32 @@ impl DomainFronter {
15861583
Err(_) => return Err((FronterError::Timeout, RequestSent::Maybe)),
15871584
};
15881585

1586+
// Phase 2b: drain body. Each chunk is individually bounded by
1587+
// `stream_timeout` (default 300s) so large responses routed
1588+
// through Apps Script (where a 256 KB range chunk can take 30-90s
1589+
// of wall-clock time) are not killed by the tighter `batch_timeout`.
1590+
// Release flow-control credit per chunk so large responses don't
1591+
// stall after the initial 4 MB window.
1592+
let stream_timeout = self.stream_timeout();
1593+
let mut buf: Vec<u8> = Vec::new();
1594+
loop {
1595+
match tokio::time::timeout(stream_timeout, body.data()).await {
1596+
Ok(None) => break,
1597+
Ok(Some(Ok(chunk))) => {
1598+
let n = chunk.len();
1599+
buf.extend_from_slice(&chunk);
1600+
let _ = body.flow_control().release_capacity(n);
1601+
}
1602+
Ok(Some(Err(e))) => {
1603+
return Err((
1604+
FronterError::Relay(format!("h2 body chunk: {}", e)),
1605+
RequestSent::Maybe,
1606+
));
1607+
}
1608+
Err(_) => return Err((FronterError::Timeout, RequestSent::Maybe)),
1609+
}
1610+
}
1611+
15891612
// Mirror `read_http_response`: if the server gzipped the body
15901613
// (we asked for it via accept-encoding), decompress before
15911614
// handing back so downstream JSON / envelope parsers see plain

src/proxy_server.rs

Lines changed: 149 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1818,7 +1818,7 @@ async fn dispatch_tunnel(
18181818
host,
18191819
port
18201820
);
1821-
run_mitm_then_relay(sock, &host, port, mitm, &fronter).await;
1821+
run_mitm_then_relay(sock, &host, port, mitm, &fronter, &rewrite_ctx.tls_connector, rewrite_ctx.upstream_socks5.as_deref()).await;
18221822
return Ok(());
18231823
}
18241824

@@ -1832,7 +1832,7 @@ async fn dispatch_tunnel(
18321832
port,
18331833
scheme
18341834
);
1835-
relay_http_stream_raw(sock, &host, port, scheme, &fronter).await;
1835+
relay_http_stream_raw(sock, &host, port, scheme, &fronter, &rewrite_ctx.tls_connector, rewrite_ctx.upstream_socks5.as_deref()).await;
18361836
return Ok(());
18371837
}
18381838

@@ -2115,6 +2115,8 @@ async fn run_mitm_then_relay(
21152115
port: u16,
21162116
mitm: Arc<Mutex<MitmCertManager>>,
21172117
fronter: &DomainFronter,
2118+
tls_connector: &TlsConnector,
2119+
upstream_socks5: Option<&str>,
21182120
) {
21192121
// Peek the TLS ClientHello BEFORE minting the MITM cert. When the client
21202122
// resolves the hostname itself (DoH in Chrome/Firefox) and hands us a raw
@@ -2176,7 +2178,7 @@ async fn run_mitm_then_relay(
21762178
// latter would produce an IP-in-Host request that Cloudflare/etc. reject
21772179
// outright.
21782180
loop {
2179-
match handle_mitm_request(&mut tls, &effective_host, port, fronter, "https").await {
2181+
match handle_mitm_request(&mut tls, &effective_host, port, fronter, "https", tls_connector, upstream_socks5).await {
21802182
Ok(true) => continue,
21812183
Ok(false) => break,
21822184
Err(e) => {
@@ -2203,9 +2205,11 @@ async fn relay_http_stream_raw(
22032205
port: u16,
22042206
scheme: &str,
22052207
fronter: &DomainFronter,
2208+
tls_connector: &TlsConnector,
2209+
upstream_socks5: Option<&str>,
22062210
) {
22072211
loop {
2208-
match handle_mitm_request(&mut sock, host, port, fronter, scheme).await {
2212+
match handle_mitm_request(&mut sock, host, port, fronter, scheme, tls_connector, upstream_socks5).await {
22092213
Ok(true) => continue,
22102214
Ok(false) => break,
22112215
Err(e) => {
@@ -2377,12 +2381,139 @@ fn parse_host_port(target: &str) -> (String, u16) {
23772381
}
23782382
}
23792383

2384+
/// Serialise a parsed request back to wire bytes so it can be forwarded to
2385+
/// the real upstream server during WebSocket passthrough. Forwards all headers
2386+
/// except hop-by-hop proxy headers (`Proxy-Connection`, `Proxy-Authorization`).
2387+
fn rebuild_request_bytes(method: &str, path: &str, version: &str, headers: &[(String, String)]) -> Vec<u8> {
2388+
let mut out = Vec::with_capacity(512);
2389+
out.extend_from_slice(method.as_bytes());
2390+
out.push(b' ');
2391+
out.extend_from_slice(path.as_bytes());
2392+
out.push(b' ');
2393+
out.extend_from_slice(version.as_bytes());
2394+
out.extend_from_slice(b"\r\n");
2395+
for (k, v) in headers {
2396+
let kl = k.to_ascii_lowercase();
2397+
if kl == "proxy-connection" || kl == "proxy-authorization" {
2398+
continue;
2399+
}
2400+
out.extend_from_slice(k.as_bytes());
2401+
out.extend_from_slice(b": ");
2402+
out.extend_from_slice(v.as_bytes());
2403+
out.extend_from_slice(b"\r\n");
2404+
}
2405+
out.extend_from_slice(b"\r\n");
2406+
out
2407+
}
2408+
2409+
/// After a WebSocket upgrade is detected inside the MITM TLS session, this
2410+
/// helper connects directly to the real `host:port` (optionally via SOCKS5),
2411+
/// performs a TLS handshake, forwards the upgrade request, relays the 101
2412+
/// response back to the client, then splices both directions until one side
2413+
/// closes. Apps Script cannot hold persistent WebSocket connections, so this
2414+
/// bypasses the relay entirely.
2415+
async fn ws_tls_passthrough<S>(
2416+
client: &mut S,
2417+
host: &str,
2418+
port: u16,
2419+
upgrade_request: &[u8],
2420+
tls_connector: &TlsConnector,
2421+
upstream_socks5: Option<&str>,
2422+
) -> std::io::Result<()>
2423+
where
2424+
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
2425+
{
2426+
let connect_timeout = std::time::Duration::from_secs(15);
2427+
2428+
let tcp = if let Some(proxy) = upstream_socks5 {
2429+
match socks5_connect_via(proxy, host, port).await {
2430+
Ok(s) => s,
2431+
Err(e) => {
2432+
tracing::warn!("ws passthrough: socks5 {} -> {}:{} failed: {}", proxy, host, port, e);
2433+
client.write_all(b"HTTP/1.1 502 Bad Gateway\r\nContent-Length: 0\r\nConnection: close\r\n\r\n").await?;
2434+
return Ok(());
2435+
}
2436+
}
2437+
} else {
2438+
match tokio::time::timeout(connect_timeout, TcpStream::connect((host, port))).await {
2439+
Ok(Ok(s)) => s,
2440+
Ok(Err(e)) => {
2441+
tracing::warn!("ws passthrough: direct connect to {}:{} failed: {}", host, port, e);
2442+
client.write_all(b"HTTP/1.1 502 Bad Gateway\r\nContent-Length: 0\r\nConnection: close\r\n\r\n").await?;
2443+
return Ok(());
2444+
}
2445+
Err(_) => {
2446+
tracing::warn!("ws passthrough: connect to {}:{} timed out", host, port);
2447+
client.write_all(b"HTTP/1.1 502 Bad Gateway\r\nContent-Length: 0\r\nConnection: close\r\n\r\n").await?;
2448+
return Ok(());
2449+
}
2450+
}
2451+
};
2452+
2453+
let server_name = match ServerName::try_from(host.to_string()) {
2454+
Ok(sn) => sn,
2455+
Err(_) => {
2456+
tracing::warn!("ws passthrough: invalid server name {}", host);
2457+
client.write_all(b"HTTP/1.1 502 Bad Gateway\r\nContent-Length: 0\r\nConnection: close\r\n\r\n").await?;
2458+
return Ok(());
2459+
}
2460+
};
2461+
2462+
let mut server = match tls_connector.connect(server_name, tcp).await {
2463+
Ok(s) => s,
2464+
Err(e) => {
2465+
tracing::warn!("ws passthrough: TLS to {}:{} failed: {}", host, port, e);
2466+
client.write_all(b"HTTP/1.1 502 Bad Gateway\r\nContent-Length: 0\r\nConnection: close\r\n\r\n").await?;
2467+
return Ok(());
2468+
}
2469+
};
2470+
2471+
// Forward the upgrade request to the real server.
2472+
server.write_all(upgrade_request).await?;
2473+
server.flush().await?;
2474+
2475+
// Read the server's response headers (up to \r\n\r\n) and forward to client.
2476+
let mut resp_buf = Vec::with_capacity(512);
2477+
let mut tmp = [0u8; 1];
2478+
loop {
2479+
server.read_exact(&mut tmp).await?;
2480+
resp_buf.push(tmp[0]);
2481+
if resp_buf.ends_with(b"\r\n\r\n") {
2482+
break;
2483+
}
2484+
if resp_buf.len() > 8192 {
2485+
tracing::warn!("ws passthrough: server response headers too large from {}:{}", host, port);
2486+
return Ok(());
2487+
}
2488+
}
2489+
2490+
// Check the server actually agreed to the upgrade.
2491+
let resp_str = String::from_utf8_lossy(&resp_buf);
2492+
let status_line = resp_str.lines().next().unwrap_or("");
2493+
if !status_line.contains("101") {
2494+
tracing::warn!("ws passthrough: {}:{} refused upgrade ({})", host, port, status_line.trim());
2495+
client.write_all(&resp_buf).await?;
2496+
client.flush().await?;
2497+
return Ok(());
2498+
}
2499+
2500+
client.write_all(&resp_buf).await?;
2501+
client.flush().await?;
2502+
2503+
// Both sides agreed: splice raw bytes bidirectionally.
2504+
tracing::info!("ws passthrough: splicing {}:{}", host, port);
2505+
let _ = tokio::io::copy_bidirectional(client, &mut server).await;
2506+
Ok(())
2507+
}
2508+
23802509
async fn handle_mitm_request<S>(
23812510
stream: &mut S,
23822511
host: &str,
23832512
port: u16,
23842513
fronter: &DomainFronter,
23852514
scheme: &str,
2515+
tls_connector: &TlsConnector,
2516+
upstream_socks5: Option<&str>,
23862517
) -> std::io::Result<bool>
23872518
where
23882519
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
@@ -2415,11 +2546,24 @@ where
24152546
}
24162547
};
24172548

2418-
let (method, path, _version, headers) = match parse_request_head(&head) {
2549+
let (method, path, version, headers) = match parse_request_head(&head) {
24192550
Some(v) => v,
24202551
None => return Ok(false),
24212552
};
24222553

2554+
// WebSocket upgrade: Apps Script cannot relay persistent connections.
2555+
// Detect before read_body (upgrade requests have no body) and splice
2556+
// directly to the real server instead.
2557+
let is_ws_upgrade =
2558+
header_value(&headers, "connection").map(|v| v.to_ascii_lowercase().contains("upgrade")).unwrap_or(false)
2559+
&& header_value(&headers, "upgrade").map(|v| v.eq_ignore_ascii_case("websocket")).unwrap_or(false);
2560+
if is_ws_upgrade {
2561+
tracing::info!("WebSocket upgrade for {}:{} — bypassing Apps Script relay", host, port);
2562+
let raw_request = rebuild_request_bytes(&method, &path, &version, &headers);
2563+
ws_tls_passthrough(stream, host, port, &raw_request, tls_connector, upstream_socks5).await?;
2564+
return Ok(false);
2565+
}
2566+
24232567
let body = read_body(stream, &leftover, &headers).await?;
24242568

24252569
// ── Per-host URL fix-ups ──────────────────────────────────────────

0 commit comments

Comments
 (0)