Skip to content

Commit 8ac65ab

Browse files
committed
fix(envd): stop memory exhaustion when client disconnects from streaming process
When a Start/Connect client disconnects while a process is producing output, the fan-out loop drained the Source channel with no subscribers, allocating 32 KiB per read cycle. With a fast producer, envd RSS grew to hundreds of MiB in seconds, OOM-killing sandbox processes. Readers now reuse a single read buffer and only allocate + send when HasSubscribers is true. The fan-out always consumes from Source and drops values with no subscribers. The child process is never blocked so servers and background processes stay responsive during disconnects.
1 parent a2bb1cb commit 8ac65ab

2 files changed

Lines changed: 31 additions & 10 deletions

File tree

packages/envd/internal/services/process/handler/handler.go

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"os"
99
"os/exec"
1010
"os/user"
11+
"slices"
1112
"strconv"
1213
"strings"
1314
"sync"
@@ -242,21 +243,25 @@ func New(
242243

243244
go logs.LogBufferedDataEvents(stdoutLogs, &stdoutLogger, "data")
244245

246+
// Reusable read buffer to avoid allocation per Read cycle when no
247+
// subscribers are connected.
248+
readBuf := make([]byte, stdChunkSize)
249+
245250
for {
246-
buf := make([]byte, stdChunkSize)
251+
n, readErr := stdout.Read(readBuf)
247252

248-
n, readErr := stdout.Read(buf)
253+
if n > 0 && outMultiplex.HasSubscribers() {
254+
data := slices.Clone(readBuf[:n])
249255

250-
if n > 0 {
251256
outMultiplex.Source <- rpc.ProcessEvent_Data{
252257
Data: &rpc.ProcessEvent_DataEvent{
253258
Output: &rpc.ProcessEvent_DataEvent_Stdout{
254-
Stdout: buf[:n],
259+
Stdout: data,
255260
},
256261
},
257262
}
258263

259-
stdoutLogs <- buf[:n]
264+
stdoutLogs <- data
260265
}
261266

262267
if errors.Is(readErr, io.EOF) {
@@ -284,21 +289,23 @@ func New(
284289

285290
go logs.LogBufferedDataEvents(stderrLogs, &stderrLogger, "data")
286291

292+
readBuf := make([]byte, stdChunkSize)
293+
287294
for {
288-
buf := make([]byte, stdChunkSize)
295+
n, readErr := stderr.Read(readBuf)
289296

290-
n, readErr := stderr.Read(buf)
297+
if n > 0 && outMultiplex.HasSubscribers() {
298+
data := slices.Clone(readBuf[:n])
291299

292-
if n > 0 {
293300
outMultiplex.Source <- rpc.ProcessEvent_Data{
294301
Data: &rpc.ProcessEvent_DataEvent{
295302
Output: &rpc.ProcessEvent_DataEvent_Stderr{
296-
Stderr: buf[:n],
303+
Stderr: data,
297304
},
298305
},
299306
}
300307

301-
stderrLogs <- buf[:n]
308+
stderrLogs <- data
302309
}
303310

304311
if errors.Is(readErr, io.EOF) {

packages/envd/internal/services/process/handler/multiplex.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,20 @@ func (m *MultiplexedChannel[T]) run() {
8484
m.channels = nil
8585
}
8686

87+
// HasSubscribers reports whether any non-cancelled subscriber exists.
88+
func (m *MultiplexedChannel[T]) HasSubscribers() bool {
89+
m.mu.RLock()
90+
defer m.mu.RUnlock()
91+
92+
for _, s := range m.channels {
93+
if !s.isCancelled() {
94+
return true
95+
}
96+
}
97+
98+
return false
99+
}
100+
87101
// Fork registers a new subscriber and returns its channel plus a cancel func.
88102
// If Source is already closed it returns a pre-closed channel and a no-op cancel.
89103
// The channel is bidirectional for backwards compat with start.go which writes

0 commit comments

Comments
 (0)