Skip to content

Commit 2b8c417

Browse files
fix(tunnel-node): eliminate atomic buf_len race in reader_task
Bind the read_buf MutexGuard to a variable and store buf_guard.len() while the lock is held, replacing the previously-unlocked fetch_add. A concurrent drain_now() could otherwise acquire the lock, drain the buffer, and store(0) between extend_from_slice and fetch_add, leaving buf_len inflated until the next drain.
1 parent 4a6332e commit 2b8c417

1 file changed

Lines changed: 8 additions & 11 deletions

File tree

tunnel-node/src/main.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -307,17 +307,14 @@ async fn reader_task(mut reader: impl AsyncRead + Unpin, session: Arc<SessionInn
307307
break;
308308
}
309309
Ok(n) => {
310-
// Extend the buffer before notifying. The MutexGuard is
311-
// dropped at the end of the statement, *before* the
312-
// notify_one call below, so any waiter that wakes on the
313-
// notify and then locks read_buf can immediately observe
314-
// the new bytes — no torn read where the wake fires but
315-
// the buffer still looks empty. Notify::notify_one also
316-
// stores a permit if no waiter is currently registered,
317-
// so we never lose an edge across the spawn race in
318-
// wait_for_any_drainable.
319-
session.read_buf.lock().await.extend_from_slice(&buf[..n]);
320-
session.buf_len.fetch_add(n, Ordering::Release);
310+
let mut read_buf = session.read_buf.lock().await;
311+
read_buf.extend_from_slice(&buf[..n]);
312+
session.buf_len.store(read_buf.len(), Ordering::Release);
313+
// Drop the MutexGuard before notify_one so any waiter
314+
// that wakes on the notify and then locks read_buf can
315+
// immediately observe the new bytes — no torn read where
316+
// the wake fires but the buffer still looks empty.
317+
drop(read_buf);
321318
session.notify.notify_one();
322319
}
323320
Err(_) => {

0 commit comments

Comments
 (0)