Skip to content

Commit 1c6033e

Browse files
vahidlazioclaude
andcommitted
fix(pipeline): reduce idle poll flooding and protect data streaks
Three root-cause fixes for the v1.9.28+ pipelining regression where request count explodes and Instagram videos fail to load: 1. Escalating keepalive backoff (20ms→80ms→200ms→500ms→2s) when the pipeline drains to zero in-flight and consecutive empties grow. The pre-pipelining serial loop had this; the new loop sent polls with zero delay, flooding idle sessions. 2. Suppress refill timer at IDLE depth with consecutive empties — the keepalive path with backoff handles that; the refill timer was scheduling new polls every 1s regardless. 3. Stale empty-poll replies no longer break active data streaks. A poll queued before data started flowing returns empty as expected; now it won't increment consecutive_empty or reset consecutive_data during a streak — fixing premature depth drops that killed video streaming throughput. 4. Reduce can_read overflow from +4 to +1 extra in-flight slot to stop upload reads from inflating request count beyond the pipeline depth budget. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 2e135d7 commit 1c6033e

1 file changed

Lines changed: 61 additions & 10 deletions

File tree

src/tunnel_client.rs

Lines changed: 61 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1439,7 +1439,7 @@ async fn tunnel_loop(
14391439
let mut next_data_write_seq: u64 = 0;
14401440
let mut eof_seen = false;
14411441
let mut client_closed = false;
1442-
let mut pending_writes: BTreeMap<u64, (TunnelResponse, String)> = BTreeMap::new();
1442+
let mut pending_writes: BTreeMap<u64, (TunnelResponse, String, bool)> = BTreeMap::new();
14431443

14441444
// Buffered upload data waiting to be sent (when pipeline is full).
14451445
let mut buffered_upload: Option<Bytes> = None;
@@ -1597,12 +1597,12 @@ async fn tunnel_loop(
15971597
next_write_seq += 1;
15981598
while let Some(entry) = pending_writes.first_entry() {
15991599
if *entry.key() != next_write_seq { break; }
1600-
let (_, (buffered_resp, _)) = entry.remove_entry();
1600+
let (_, (buffered_resp, _, _)) = entry.remove_entry();
16011601
let _ = write_tunnel_response(&mut writer, &buffered_resp).await;
16021602
next_write_seq += 1;
16031603
}
16041604
} else {
1605-
pending_writes.insert(meta.seq, (resp, script_id));
1605+
pending_writes.insert(meta.seq, (resp, script_id, meta.was_empty_poll));
16061606
}
16071607
continue;
16081608
}
@@ -1632,6 +1632,41 @@ async fn tunnel_loop(
16321632
}
16331633
}
16341634

1635+
// Escalating backoff: avoid flooding empty polls on idle
1636+
// sessions. Mirrors the pre-pipelining cadence.
1637+
let keepalive_delay = match consecutive_empty {
1638+
0 => Duration::from_millis(20),
1639+
1 => Duration::from_millis(80),
1640+
2 => Duration::from_millis(200),
1641+
3 => Duration::from_millis(500),
1642+
_ => Duration::from_secs(2),
1643+
};
1644+
if consecutive_empty > 0 {
1645+
// Wait for either the backoff timer or client data.
1646+
if !client_closed {
1647+
read_buf.reserve(65536);
1648+
tokio::select! {
1649+
biased;
1650+
result = reader.read_buf(&mut read_buf) => {
1651+
match result {
1652+
Ok(0) => break,
1653+
Ok(n) => {
1654+
consecutive_empty = 0;
1655+
let data = extract_bytes(&mut read_buf, n);
1656+
let (meta, reply_rx) = send_data_op(sid, data, &mut next_send_seq, &mut next_data_write_seq, mux);
1657+
inflight.push(wrap_reply(meta, reply_rx));
1658+
continue;
1659+
}
1660+
Err(_) => break,
1661+
}
1662+
}
1663+
_ = tokio::time::sleep(keepalive_delay) => {}
1664+
}
1665+
} else {
1666+
tokio::time::sleep(keepalive_delay).await;
1667+
}
1668+
}
1669+
16351670
let (meta, reply_rx) = send_empty_poll(sid, &mut next_send_seq, mux);
16361671
tracing::debug!(
16371672
"sess {}: keepalive poll seq={}", &sid[..sid.len().min(8)], meta.seq
@@ -1640,8 +1675,9 @@ async fn tunnel_loop(
16401675
}
16411676

16421677
// Can we read from the client? Yes if not closed, not eof, and
1643-
// we have room for more inflight ops (fast-path allows +4 extra).
1644-
let can_read = !client_closed && !eof_seen && inflight.len() < max_inflight + 4;
1678+
// we have room for more inflight ops (allow +1 extra for upload
1679+
// data that shouldn't wait for a slot — but not +4 which floods).
1680+
let can_read = !client_closed && !eof_seen && inflight.len() < max_inflight + 1;
16451681

16461682
tokio::select! {
16471683
biased;
@@ -1712,8 +1748,14 @@ async fn tunnel_loop(
17121748
consecutive_data = consecutive_data.saturating_add(1);
17131749
let bytes = resp.d.as_ref().map(|d| d.len() as u64 * 3 / 4).unwrap_or(0);
17141750
total_download_bytes += bytes;
1751+
} else if meta.was_empty_poll && consecutive_data > 0 {
1752+
// Stale empty-poll reply during an active data
1753+
// streak — don't penalise the streak. The poll
1754+
// was queued before data started flowing; the
1755+
// empty result is expected.
17151756
} else {
17161757
consecutive_empty = consecutive_empty.saturating_add(1);
1758+
consecutive_data = 0;
17171759
}
17181760
if is_eof {
17191761
eof_seen = true;
@@ -1722,7 +1764,7 @@ async fn tunnel_loop(
17221764
// Flush buffered out-of-order writes.
17231765
while let Some(entry) = pending_writes.first_entry() {
17241766
if *entry.key() != next_write_seq { break; }
1725-
let (_, (buffered_resp, _)) = entry.remove_entry();
1767+
let (_, (buffered_resp, _, buf_was_empty_poll)) = entry.remove_entry();
17261768
let buf_eof = buffered_resp.eof.unwrap_or(false);
17271769
match write_tunnel_response(&mut writer, &buffered_resp).await? {
17281770
WriteOutcome::Wrote => {
@@ -1732,7 +1774,12 @@ async fn tunnel_loop(
17321774
total_download_bytes += bytes;
17331775
}
17341776
WriteOutcome::NoData => {
1735-
consecutive_empty = consecutive_empty.saturating_add(1);
1777+
if buf_was_empty_poll && consecutive_data > 0 {
1778+
// Stale empty poll — don't break data streak.
1779+
} else {
1780+
consecutive_empty = consecutive_empty.saturating_add(1);
1781+
consecutive_data = 0;
1782+
}
17361783
}
17371784
WriteOutcome::BadBase64 => break,
17381785
}
@@ -1742,7 +1789,7 @@ async fn tunnel_loop(
17421789
}
17431790
}
17441791
} else {
1745-
pending_writes.insert(meta.seq, (resp, script_id));
1792+
pending_writes.insert(meta.seq, (resp, script_id, meta.was_empty_poll));
17461793
}
17471794

17481795
// Send buffered upload data now that a slot freed up.
@@ -1810,9 +1857,12 @@ async fn tunnel_loop(
18101857
}
18111858

18121859
// Schedule refill if pipeline needs more polls.
1860+
// Skip refill at IDLE depth with consecutive empties —
1861+
// the keepalive path handles that with proper backoff.
18131862
if !eof_seen
18141863
&& inflight.len() < max_inflight
18151864
&& refill_at.is_none()
1865+
&& !(max_inflight <= INFLIGHT_IDLE && consecutive_empty >= 2)
18161866
{
18171867
refill_at = Some(Box::pin(tokio::time::sleep(
18181868
if max_inflight > INFLIGHT_IDLE { Duration::from_millis(100) } else { Duration::ZERO }
@@ -1854,8 +1904,9 @@ async fn tunnel_loop(
18541904
let (meta, reply_rx) = send_data_op(sid, data, &mut next_send_seq, &mut next_data_write_seq, mux);
18551905
consecutive_empty = 0;
18561906
inflight.push(wrap_reply(meta, reply_rx));
1857-
} else if inflight.len() < max_inflight + 4 {
1858-
// Fast-path: pipeline full but under +4 extra.
1907+
} else if inflight.len() < max_inflight + 1 {
1908+
// One extra slot for upload data so it doesn't
1909+
// wait for a full pipeline drain.
18591910
let (meta, reply_rx) = send_data_op(sid, data, &mut next_send_seq, &mut next_data_write_seq, mux);
18601911
consecutive_empty = 0;
18611912
inflight.push(wrap_reply(meta, reply_rx));

0 commit comments

Comments
 (0)