From 8ac65ab8766419b0bf76f87f1523f64e92ad26ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Van=C4=9Bk?= Date: Wed, 13 May 2026 12:54:49 +0200 Subject: [PATCH 1/2] fix(envd): stop memory exhaustion when client disconnects from streaming process When a Start/Connect client disconnects while a process is producing output, the fan-out loop drained the Source channel with no subscribers, allocating 32 KiB per read cycle. With a fast producer, envd RSS grew to hundreds of MiB in seconds, OOM-killing sandbox processes. Readers now reuse a single read buffer and only allocate + send when HasSubscribers is true. The fan-out always consumes from Source and drops values with no subscribers. The child process is never blocked so servers and background processes stay responsive during disconnects. --- .../services/process/handler/handler.go | 27 ++++++++++++------- .../services/process/handler/multiplex.go | 14 ++++++++++ 2 files changed, 31 insertions(+), 10 deletions(-) diff --git a/packages/envd/internal/services/process/handler/handler.go b/packages/envd/internal/services/process/handler/handler.go index 98108ea881..fca00c0b8f 100644 --- a/packages/envd/internal/services/process/handler/handler.go +++ b/packages/envd/internal/services/process/handler/handler.go @@ -8,6 +8,7 @@ import ( "os" "os/exec" "os/user" + "slices" "strconv" "strings" "sync" @@ -242,21 +243,25 @@ func New( go logs.LogBufferedDataEvents(stdoutLogs, &stdoutLogger, "data") + // Reusable read buffer to avoid allocation per Read cycle when no + // subscribers are connected. + readBuf := make([]byte, stdChunkSize) + for { - buf := make([]byte, stdChunkSize) + n, readErr := stdout.Read(readBuf) - n, readErr := stdout.Read(buf) + if n > 0 && outMultiplex.HasSubscribers() { + data := slices.Clone(readBuf[:n]) - if n > 0 { outMultiplex.Source <- rpc.ProcessEvent_Data{ Data: &rpc.ProcessEvent_DataEvent{ Output: &rpc.ProcessEvent_DataEvent_Stdout{ - Stdout: buf[:n], + Stdout: data, }, }, } - stdoutLogs <- buf[:n] + stdoutLogs <- data } if errors.Is(readErr, io.EOF) { @@ -284,21 +289,23 @@ func New( go logs.LogBufferedDataEvents(stderrLogs, &stderrLogger, "data") + readBuf := make([]byte, stdChunkSize) + for { - buf := make([]byte, stdChunkSize) + n, readErr := stderr.Read(readBuf) - n, readErr := stderr.Read(buf) + if n > 0 && outMultiplex.HasSubscribers() { + data := slices.Clone(readBuf[:n]) - if n > 0 { outMultiplex.Source <- rpc.ProcessEvent_Data{ Data: &rpc.ProcessEvent_DataEvent{ Output: &rpc.ProcessEvent_DataEvent_Stderr{ - Stderr: buf[:n], + Stderr: data, }, }, } - stderrLogs <- buf[:n] + stderrLogs <- data } if errors.Is(readErr, io.EOF) { diff --git a/packages/envd/internal/services/process/handler/multiplex.go b/packages/envd/internal/services/process/handler/multiplex.go index ff699812db..9b9b3ad12d 100644 --- a/packages/envd/internal/services/process/handler/multiplex.go +++ b/packages/envd/internal/services/process/handler/multiplex.go @@ -84,6 +84,20 @@ func (m *MultiplexedChannel[T]) run() { m.channels = nil } +// HasSubscribers reports whether any non-cancelled subscriber exists. +func (m *MultiplexedChannel[T]) HasSubscribers() bool { + m.mu.RLock() + defer m.mu.RUnlock() + + for _, s := range m.channels { + if !s.isCancelled() { + return true + } + } + + return false +} + // Fork registers a new subscriber and returns its channel plus a cancel func. // If Source is already closed it returns a pre-closed channel and a no-op cancel. // The channel is bidirectional for backwards compat with start.go which writes From 1069a1e0a9a47fbf3ffe95b6854ae1c52c21a176 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Van=C4=9Bk?= Date: Mon, 11 May 2026 12:46:25 +0200 Subject: [PATCH 2/2] chore(envd): bump version to 0.5.19 --- packages/envd/pkg/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/envd/pkg/version.go b/packages/envd/pkg/version.go index e251abd82f..01857d46e4 100644 --- a/packages/envd/pkg/version.go +++ b/packages/envd/pkg/version.go @@ -1,3 +1,3 @@ package pkg -const Version = "0.5.18" +const Version = "0.5.19"