Skip to content

Commit bfdd0d6

Browse files
CaptainMiragemaybeknott
authored andcommitted
fix(domain_fronter): resume large downloads reliably
Handle idempotent exit-node timeouts by falling back to direct Apps Script, route Range: bytes=N- requests through parallel resume streaming, and send TLS close_notify on shutdown so wget/curl can resume from the correct offset.
1 parent 87594b1 commit bfdd0d6

2 files changed

Lines changed: 266 additions & 166 deletions

File tree

src/domain_fronter.rs

Lines changed: 255 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1808,20 +1808,34 @@ impl DomainFronter {
18081808
return bytes;
18091809
}
18101810
Err(e) if !e.is_retryable() => {
1811-
// The exit node may have already processed this
1812-
// request (h2 post-send failure on a POST etc.).
1813-
// Don't fall through to the direct path — that
1814-
// would re-send to the same destination via Apps
1815-
// Script and duplicate the side effect.
1816-
tracing::warn!(
1817-
"exit node failed for {} and request was already sent ({}); not falling back to direct Apps Script",
1818-
url,
1819-
e,
1820-
);
1821-
self.relay_failures.fetch_add(1, Ordering::Relaxed);
1822-
let inner = e.into_inner();
1823-
self.record_site(url, false, 0, t0.elapsed().as_nanos() as u64);
1824-
return error_response(502, &format!("Relay error: {}", inner));
1811+
// The NonRetryable guard exists to prevent duplicate
1812+
// side-effects on POST/PUT/PATCH/DELETE: if the h2
1813+
// outer call reached Apps Script and timed out, the
1814+
// inner request may have already been executed by the
1815+
// exit node. Falling through would re-send it.
1816+
//
1817+
// For idempotent methods (GET/HEAD/OPTIONS) there are
1818+
// no side-effects, so re-sending via direct Apps Script
1819+
// is always safe. Range downloads are GET — if a script
1820+
// ID hits its 6-minute cap and times out, falling back
1821+
// to direct Apps Script (round-robining to a fresh ID)
1822+
// is the correct behaviour rather than returning 502.
1823+
if is_method_safe_for_fanout(method) {
1824+
tracing::warn!(
1825+
"exit node non-retryable timeout for {} {} — method is idempotent, falling back to direct Apps Script",
1826+
method, url,
1827+
);
1828+
// fall through to the regular relay path below
1829+
} else {
1830+
tracing::warn!(
1831+
"exit node failed for {} {} and request was already sent ({}); not falling back to direct Apps Script",
1832+
method, url, e,
1833+
);
1834+
self.relay_failures.fetch_add(1, Ordering::Relaxed);
1835+
let inner = e.into_inner();
1836+
self.record_site(url, false, 0, t0.elapsed().as_nanos() as u64);
1837+
return error_response(502, &format!("Relay error: {}", inner));
1838+
}
18251839
}
18261840
Err(e) => {
18271841
tracing::warn!(
@@ -2006,10 +2020,53 @@ impl DomainFronter {
20062020
let raw = self.relay(method, url, headers, body).await;
20072021
return write_response_with_head_transform(writer, &raw, &transform_head).await;
20082022
}
2009-
// If the client already sent a Range header, honour it as-is —
2010-
// don't second-guess a caller that knows what bytes they want.
2011-
if headers.iter().any(|(k, _)| k.eq_ignore_ascii_case("range")) {
2023+
// If the client already sent a Range header, inspect it:
2024+
//
2025+
// • bytes=N- or bytes=N-M with N>0 (resume / mid-file seek): route
2026+
// through the parallel chunk path starting at offset N. Passing the
2027+
// raw header to relay() would ask Apps Script to return everything
2028+
// from byte N to EOF in one call — for a 3 GiB file that's well
2029+
// over Apps Script's 50 MiB response cap, guaranteed 504 every try.
2030+
//
2031+
// • bytes=0-M (small specific range from the start): pass through
2032+
// to relay() as-is. On relay failure close cleanly so the client
2033+
// retries with its Range intact rather than restarting from byte 0.
2034+
if let Some(range_val) = headers
2035+
.iter()
2036+
.find(|(k, _)| k.eq_ignore_ascii_case("range"))
2037+
.map(|(_, v)| v.clone())
2038+
{
2039+
if let Some(start) = parse_range_start(&range_val).filter(|&s| s > 0) {
2040+
tracing::debug!(
2041+
"range-parallel-resume: client Range {} for {}; probing from offset {}",
2042+
range_val, url, start,
2043+
);
2044+
return self
2045+
.stream_range_from_offset(
2046+
writer,
2047+
method,
2048+
url,
2049+
headers,
2050+
body,
2051+
start,
2052+
chunk,
2053+
transform_head,
2054+
)
2055+
.await;
2056+
}
2057+
// start == 0 or unparseable — honour as-is with clean-close on failure.
20122058
let raw = self.relay(method, url, headers, body).await;
2059+
let status = split_response(&raw).map(|(s, _, _)| s).unwrap_or(0);
2060+
if status >= 400 || status == 0 {
2061+
tracing::warn!(
2062+
"range relay returned status {} for request {}; closing cleanly so client retries with Range",
2063+
status, url,
2064+
);
2065+
return Err(std::io::Error::other(format!(
2066+
"range relay status {} — closing for clean resume",
2067+
status
2068+
)));
2069+
}
20132070
return write_response_with_head_transform(writer, &raw, &transform_head).await;
20142071
}
20152072

@@ -2216,6 +2273,119 @@ impl DomainFronter {
22162273
write_response_with_head_transform(writer, &raw, &transform_head).await
22172274
}
22182275

2276+
/// Resume a large download from a byte offset by probing at
2277+
/// `[start, start+chunk-1]` and streaming the remaining chunks in
2278+
/// parallel — exactly like the initial download path but starting
2279+
/// mid-file. Called when the client sends `Range: bytes=N-` with
2280+
/// N > 0 (wget `-c`, browser resume). Responds with `206 Partial
2281+
/// Content` so the client appends to its existing partial file.
2282+
async fn stream_range_from_offset<W, F>(
2283+
&self,
2284+
writer: &mut W,
2285+
method: &str,
2286+
url: &str,
2287+
client_headers: &[(String, String)],
2288+
body: &[u8],
2289+
start: u64,
2290+
chunk: u64,
2291+
transform_head: &F,
2292+
) -> std::io::Result<()>
2293+
where
2294+
W: tokio::io::AsyncWrite + Unpin,
2295+
F: Fn(&[u8]) -> Vec<u8>,
2296+
{
2297+
const MAX_PARALLEL: usize = 16;
2298+
2299+
// Strip client's Range header; add our probe range [start, start+chunk-1].
2300+
let mut probe_headers: Vec<(String, String)> = client_headers
2301+
.iter()
2302+
.filter(|(k, _)| !k.eq_ignore_ascii_case("range"))
2303+
.cloned()
2304+
.collect();
2305+
probe_headers.push((
2306+
"Range".into(),
2307+
format!("bytes={}-{}", start, start + chunk - 1),
2308+
));
2309+
2310+
let first = self.relay(method, url, &probe_headers, body).await;
2311+
let (status, resp_headers, resp_body) = match split_response(&first) {
2312+
Some(v) => v,
2313+
None => {
2314+
tracing::warn!(
2315+
"range-parallel-resume: malformed probe response for {}; closing cleanly",
2316+
url
2317+
);
2318+
return Err(std::io::Error::other(
2319+
"range-parallel-resume: malformed probe — closing for clean resume",
2320+
));
2321+
}
2322+
};
2323+
2324+
if status != 206 {
2325+
if status >= 400 {
2326+
tracing::warn!(
2327+
"range-parallel-resume: probe returned {} for {}; closing cleanly",
2328+
status, url,
2329+
);
2330+
return Err(std::io::Error::other(format!(
2331+
"range-parallel-resume: probe status {} — closing for clean resume",
2332+
status,
2333+
)));
2334+
}
2335+
// Non-206 success (origin sent 200 for the full body) — forward as-is.
2336+
return write_response_with_head_transform(writer, &first, transform_head).await;
2337+
}
2338+
2339+
let probe_range =
2340+
match validate_probe_range_at_offset(status, &resp_headers, resp_body, start, start + chunk - 1)
2341+
{
2342+
Some(r) => r,
2343+
None => {
2344+
tracing::warn!(
2345+
"range-parallel-resume: invalid 206 for {}; closing cleanly",
2346+
url,
2347+
);
2348+
return Err(std::io::Error::other(
2349+
"range-parallel-resume: invalid 206 — closing for clean resume",
2350+
));
2351+
}
2352+
};
2353+
let total = probe_range.total;
2354+
2355+
// Probe covered the rest of the file — forward this 206 as-is.
2356+
if (probe_range.end + 1) >= total {
2357+
return write_response_with_head_transform(writer, &first, transform_head).await;
2358+
}
2359+
2360+
let probe_end = probe_range.end;
2361+
let body_total = total - start;
2362+
let expected_chunks = (total - probe_end - 1).div_ceil(chunk);
2363+
tracing::info!(
2364+
"range-parallel-resume: {} total, resuming from byte {}, {} more chunks after probe, up to {} in flight for {}",
2365+
total, start, expected_chunks, MAX_PARALLEL, url,
2366+
);
2367+
2368+
// base_headers for fetch_chunks_stream must not include Range
2369+
// (fetch_chunks_stream adds its own per-chunk Range header).
2370+
let base_headers: Vec<(String, String)> = client_headers
2371+
.iter()
2372+
.filter(|(k, _)| !k.eq_ignore_ascii_case("range"))
2373+
.cloned()
2374+
.collect();
2375+
2376+
let fetches = self.fetch_chunks_stream(
2377+
url,
2378+
&base_headers,
2379+
plan_remaining_ranges(probe_end, total, chunk),
2380+
total,
2381+
MAX_PARALLEL,
2382+
);
2383+
2384+
let head = assemble_206_head(&resp_headers, start, total);
2385+
let head = transform_head(&head);
2386+
stream_chunks_to_writer(writer, &head, resp_body, body_total, fetches, url).await
2387+
}
2388+
22192389
/// Backward-compatible wrapper around `relay_parallel_range_to`
22202390
/// that buffers the full response into a `Vec<u8>` before
22212391
/// returning. Retained so downstream callers (and external
@@ -3431,6 +3601,34 @@ fn validate_probe_range(
34313601
None
34323602
}
34333603

3604+
/// Parse the start byte from a `Range: bytes=N-` or `Range: bytes=N-M` header value.
3605+
fn parse_range_start(range_header: &str) -> Option<u64> {
3606+
let s = range_header.trim().strip_prefix("bytes=")?;
3607+
s.split('-').next()?.trim().parse::<u64>().ok()
3608+
}
3609+
3610+
/// Variant of `validate_probe_range` for mid-file resume probes where
3611+
/// `Content-Range: bytes N-M/total` has a non-zero start.
3612+
fn validate_probe_range_at_offset(
3613+
status: u16,
3614+
headers: &[(String, String)],
3615+
body: &[u8],
3616+
req_start: u64,
3617+
req_end: u64,
3618+
) -> Option<ContentRange> {
3619+
if status != 206 {
3620+
return None;
3621+
}
3622+
let range = parse_content_range(headers)?;
3623+
if range.start != req_start || range.end > req_end {
3624+
return None;
3625+
}
3626+
if content_range_matches_body(range, body.len()) {
3627+
return Some(range);
3628+
}
3629+
None
3630+
}
3631+
34343632
fn probe_range_covers_complete_entity(range: ContentRange, requested_end: u64) -> bool {
34353633
// Apps Script may decode a gzip body while preserving the origin's
34363634
// compressed Content-Range. For the synthetic first probe only, a
@@ -3520,6 +3718,46 @@ fn assemble_200_head(src_headers: &[(String, String)], declared_length: u64) ->
35203718
out
35213719
}
35223720

3721+
/// Build a `HTTP/1.1 206 Partial Content` head for the resume streaming
3722+
/// path. `start` is the first byte the client requested; `total` is the
3723+
/// full file size reported by the origin's `Content-Range`. Mirrors
3724+
/// `assemble_200_head`'s header-skip rules.
3725+
fn assemble_206_head(src_headers: &[(String, String)], start: u64, total: u64) -> Vec<u8> {
3726+
let skip = |k: &str| {
3727+
matches!(
3728+
k.to_ascii_lowercase().as_str(),
3729+
"content-length"
3730+
| "content-range"
3731+
| "content-encoding"
3732+
| "transfer-encoding"
3733+
| "connection"
3734+
| "keep-alive",
3735+
)
3736+
};
3737+
let length = total.saturating_sub(start);
3738+
let mut out: Vec<u8> = b"HTTP/1.1 206 Partial Content\r\n".to_vec();
3739+
for (k, v) in src_headers {
3740+
if skip(k) {
3741+
continue;
3742+
}
3743+
out.extend_from_slice(k.as_bytes());
3744+
out.extend_from_slice(b": ");
3745+
out.extend_from_slice(v.as_bytes());
3746+
out.extend_from_slice(b"\r\n");
3747+
}
3748+
out.extend_from_slice(
3749+
format!(
3750+
"Content-Range: bytes {}-{}/{}\r\nContent-Length: {}\r\n\r\n",
3751+
start,
3752+
total - 1,
3753+
total,
3754+
length,
3755+
)
3756+
.as_bytes(),
3757+
);
3758+
out
3759+
}
3760+
35233761
/// Apply `transform_head` to the head block of an HTTP/1.x response
35243762
/// (everything up to and including the first `\r\n\r\n` terminator),
35253763
/// then write the transformed head followed by the unchanged body to

0 commit comments

Comments
 (0)