Skip to content

Commit f1bc82f

Browse files
fix(tunnel-node): bound per-session memory to prevent OOM
Validated locally: cd tunnel-node && cargo test (38 passed, 0 failed).
1 parent 56ca356 commit f1bc82f

1 file changed

Lines changed: 35 additions & 3 deletions

File tree

tunnel-node/src/main.rs

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,13 @@ const UDP_QUEUE_LIMIT: usize = 256;
8383
/// a maximum-size IPv4 datagram without truncation.
8484
const UDP_RECV_BUF_BYTES: usize = 65536;
8585

86+
/// Hard cap on each TCP session's `read_buf`. The reader_task pauses
87+
/// when the buffer reaches this size and resumes once the client drains
88+
/// it below the cap. Bounds per-session memory to ~32 MB regardless of
89+
/// upstream throughput — prevents OOM when fast upstreams (video,
90+
/// downloads) outpace Apps Script's 2-7 s polling interval.
91+
const READ_BUF_CAP: usize = 32 * 1024 * 1024;
92+
8693
/// Maximum raw bytes per TCP drain that we hand back to Apps Script in
8794
/// one batch response. Apps Script's hard cap on Web App response body
8895
/// is ~50 MiB. Accounting for base64 encoding (1.33×) and JSON envelope
@@ -252,6 +259,16 @@ fn create_udpgw_session() -> ManagedSession {
252259
async fn reader_task(mut reader: impl AsyncRead + Unpin, session: Arc<SessionInner>) {
253260
let mut buf = vec![0u8; 2 * 1024 * 1024];
254261
loop {
262+
// Backpressure: pause reads when the buffer hits READ_BUF_CAP.
263+
// Bounds per-session memory regardless of upstream throughput.
264+
// The drain on the batch side drops the lock between calls, so
265+
// this sleep loop will observe the updated size and resume.
266+
loop {
267+
if session.read_buf.lock().await.len() < READ_BUF_CAP {
268+
break;
269+
}
270+
tokio::time::sleep(Duration::from_millis(50)).await;
271+
}
255272
match reader.read(&mut buf).await {
256273
Ok(0) => {
257274
session.eof.store(true, Ordering::Release);
@@ -927,7 +944,10 @@ async fn handle_batch(
927944
sessions.get(&sid).map(|s| s.inner.clone())
928945
};
929946
if let Some(inner) = inner {
930-
*inner.last_active.lock().await = Instant::now();
947+
// last_active is bumped only on real uplink activity
948+
// here, or on actual downstream drain below. Empty
949+
// polls must not refresh it — matches udp_data branch.
950+
let mut had_uplink = false;
931951
if let Some(ref data_b64) = op.d {
932952
if !data_b64.is_empty() {
933953
// Decode first; only count this op as a real
@@ -947,6 +967,7 @@ async fn handle_batch(
947967
};
948968
if !bytes.is_empty() {
949969
had_writes_or_connects = true;
970+
had_uplink = true;
950971
tracing::info!(
951972
"session {} upload {}B wseq={:?}",
952973
&sid[..sid.len().min(8)], bytes.len(), op.wseq,
@@ -998,6 +1019,9 @@ async fn handle_batch(
9981019
}
9991020
}
10001021
}
1022+
if had_uplink {
1023+
*inner.last_active.lock().await = Instant::now();
1024+
}
10011025
tcp_drains.push((i, sid, inner, op.seq));
10021026
} else {
10031027
results.push((i, eof_response(sid, op.seq)));
@@ -1205,6 +1229,7 @@ async fn handle_batch(
12051229
let drained = all_data.len();
12061230
if drained > 0 {
12071231
tracing::info!("session {} drained {}KB", &sid[..sid.len().min(8)], drained / 1024);
1232+
*inner.last_active.lock().await = Instant::now();
12081233
}
12091234
if final_eof {
12101235
tcp_eof_sids.push(sid.clone());
@@ -1219,7 +1244,7 @@ async fn handle_batch(
12191244
let mut sessions = state.sessions.lock().await;
12201245
for sid in &tcp_eof_sids {
12211246
if let Some(s) = sessions.remove(sid) {
1222-
s.reader_handle.abort();
1247+
s.abort_all();
12231248
tracing::info!("session {} closed by remote (batch)", sid);
12241249
}
12251250
}
@@ -1566,15 +1591,22 @@ async fn cleanup_task(
15661591
{
15671592
let mut map = sessions.lock().await;
15681593
let mut stale = Vec::new();
1594+
let mut total_read_buf: usize = 0;
15691595
for (k, s) in map.iter() {
15701596
let last = *s.inner.last_active.lock().await;
1597+
total_read_buf += s.inner.read_buf.lock().await.len();
15711598
if now.duration_since(last) > Duration::from_secs(300) {
15721599
stale.push(k.clone());
15731600
}
15741601
}
1602+
tracing::info!(
1603+
"cleanup: {} tcp sessions, {:.1} MB total read_buf",
1604+
map.len(),
1605+
total_read_buf as f64 / (1024.0 * 1024.0),
1606+
);
15751607
for k in &stale {
15761608
if let Some(s) = map.remove(k) {
1577-
s.reader_handle.abort();
1609+
s.abort_all();
15781610
tracing::info!("reaped idle session {}", k);
15791611
}
15801612
}

0 commit comments

Comments
 (0)