fix(envd): discard output safely when no subscriber is connected#2639
fix(envd): discard output safely when no subscriber is connected#2639arkamar wants to merge 3 commits into
Conversation
…ing process When a Start/Connect client disconnects while a process is producing output, the fan-out loop drained the Source channel with no subscribers, allocating 32 KiB per read cycle. With a fast producer, envd RSS grew to hundreds of MiB in seconds, OOM-killing sandbox processes. Readers now reuse a single read buffer and only allocate + send when HasSubscribers is true. The fan-out always consumes from Source and drops values with no subscribers. The child process is never blocked so servers and background processes stay responsive during disconnects.
PR SummaryMedium Risk Overview Reviewed by Cursor Bugbot for commit 47ee515. Bugbot is set up for automated code reviews on this repo. Configure here. |
❌ 9 Tests Failed:
View the full list of 13 ❄️ flaky test(s)
To view more test analytics, go to the Test Analytics Dashboard |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 1440f94c7c
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
There was a problem hiding this comment.
Code Review
Iterating over the m.channels slice header outside the lock in the fan-out loop is unsafe and can lead to race conditions where subscribers are skipped or receive duplicate values. The HasSubscribers() check in the process handler prevents stdout and stderr output from being sent to the system logs when no RPC client is connected, which results in a loss of observability for background processes.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 1440f94. Configure here.
…er on Fork Move close(m.done) inside the m.mu critical section in run() so that Fork()'s re-check under the same lock always observes the shutdown. Previously, close(m.done) happened after Unlock, creating a window where Fork could add a subscriber that run() never cleans up — its channel is never closed, leaking the subscriber.
1440f94 to
47ee515
Compare
There was a problem hiding this comment.
Additional findings (outside current diff — PR may have been updated during review):
-
🟣
packages/envd/internal/services/process/handler/multiplex.go:55-67— Pre-existing data race in the fan-out loop that this PR leaves intact.run()takes a slice-header snapshot ofm.channelsunder RLock and iterates after unlocking, whileremove()concurrently mutates the same backing array in place viaappend(m.channels[:i], m.channels[i+1:]...). With 3+ subscribers (Start RPC + concurrent Connect RPCs both callDataEvent.Fork()) and a non-tail cancel mid-iteration, the same value can be delivered twice to one client and skipped on another — silent stdout/stderr corruption. One-line fix: deep-copy under RLock (subs := append([]*subscriber[T]{}, m.channels...)).Extended reasoning...
What is wrong
In
multiplex.gothe fan-out loop snapshots the subscriber slice header under RLock and then iterates after releasing the lock:m.mu.RLock() subs := m.channels // copies header only; shares backing array m.mu.RUnlock() for _, s := range subs { if s.isCancelled() { continue } select { case s.ch <- v: case <-s.done: } }
Meanwhile
remove()(line 159) mutates the same backing array in place under the write Lock:m.channels = append(m.channels[:i], m.channels[i+1:]...)
Because
m.channels[:i]has enough capacity (we are shrinking),appenddoes not allocate a new array — it performs an in-place forwardmemmove. After removing indexifrom a length-N slice, slotsi..N-2are overwritten with the pointers fromi+1..N-1, and the trailing slotN-1retains its old pointer. This is a well-known Go gotcha (see theSliceTrickswiki note about zeroing the trailing element so the GC can reclaim it).Step-by-step proof
Start state:
m.channels = [A, B, C](cap=3). One Start RPC plus two Connect RPCs gives this exact 3-subscriber shape viaproc.DataEvent.Fork()(seestart.go:106andconnect.go:30).run()enters with valuev. It snapshotssubs := m.channels→ header{ptr=base, len=3, cap=3}. RUnlock.i=0: readssubs[0]=A, entersselect { case A.ch <- v: case <-A.done: }.A.chis unbuffered; the goroutine parks.- Concurrently, A’s owner disconnects →
cancel()callsremove(A).removefirst callss.cancel()(closesA.done) — the fan-out send unblocks via the<-s.donebranch (A receives nothing). Thenremovetakes the write Lock and executesappend(m.channels[:0], m.channels[1:]...). The in-place memmove writesbase[0]=B, thenbase[1]=C. Result: backing array is[B, C, C];m.channelsheader is{ptr=base, len=2}. The snapshotsubsstill haslen=3. i=1:range subsreadssubs[1]from the shared backing array. It is nowC. The checks.isCancelled()returns false forC. SendvtoC. ✓i=2:subs[2]is alsoC(the duplicated trailing pointer).isCancelled()is still false. SendvtoCagain. ✗
Net effect: subscriber
Bmissesventirely; subscriberCreceivesvtwice. For a stdout/stderr stream this is a duplicated chunk in one client and a missing chunk in another — silent corruption of the data stream. (With only 2 subscribers[A, B], removing A leaves[B, B]and B receivesvtwice — the bug is actually easier to trigger.)This is also a textbook data race under the Go memory model: the unsynchronized
subs[i]reads inrun()race with the Lock-protected backing-array writes inremove().go test -racewould flag it.Why existing code does not prevent it
- The RLock/RUnlock pair only protects the header read; the backing array remains shared.
- The
isCancelled()check inside the loop does not save the duplicated subscriber —Cis not cancelled, so the send proceeds the second time. s.cancel()is called beforeremove()takes the write lock, so the fan-out’s per-subscriber select can unblock via<-s.donewhileremove()races to shift the backing array — providing the exact timing window.
Trigger conditions and PR relevance
The per-subscriber
case s.ch <- v:blocks until the gRPCstream.Sendcompletes; on a slow network this is arbitrarily long, so the window between iterations is wide. This PR is pre-existing in the strict sense — the snapshot+in-place-shift pattern was introduced in PR #2579 and the two offending lines are not modified here. However, this PR is centrally about fan-out correctness (it rewritesrun(),Fork(), and addsHasSubscribers()), and the new “drain to void after cancel” semantics widen the window: previously a cancelled subscriber would wedge the fan-out at the first blocked send, so a race could only corrupt one value; now iteration continues across many values, so the corruption can repeat per value emitted while two clients remain alive.Fix
One-line change in
run()— deep-copy the slice under RLock:m.mu.RLock() subs := append([]*subscriber[T]{}, m.channels...) m.mu.RUnlock()
Alternatively, change
remove()to allocate a new backing array instead of shifting in place (m.channels = append([]*subscriber[T]{}, append(m.channels[:i:i], m.channels[i+1:]...)...)), but the snapshot fix is simpler and avoids the trailing-pointer pitfall everywhere.

When a Start/Connect client disconnects while a process is producing output, the fan-out loop in MultiplexedChannel blocks trying to deliver to the dead subscriber's channel. This back-pressures the reader goroutine, which allocates a fresh 32 KiB buffer on every Read call, causing envd RSS to grow unboundedly.
The fix makes the fan-out skip cancelled subscribers and keep draining Source to void when none remain, so the child process is never back-pressured. The reader goroutines now reuse a single buffer and only allocate+copy when
HasSubscribers()returns true. Theatomic.Boolexit flag is replaced with adonechannel -- this is more idiomatic for broadcast shutdown signaling and also makes tests deterministic, replacing the oldruntime.NumGoroutine()polling with a directselecton<-m.done.Alternative solution to #2620