From 85469b2eeb916274fe4fa2eac44232f123500efd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Van=C4=9Bk?= Date: Wed, 6 May 2026 16:44:51 +0200 Subject: [PATCH 01/17] test(envd): add test demonstrating orphan process leak on client disconnect TestStart_ClientDisconnectLeavesOrphanProcess shows that when a client disconnects from a Start RPC, the child process and handler goroutines remain alive because procCtx is derived from context.Background(). --- .../internal/services/process/start_test.go | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/packages/envd/internal/services/process/start_test.go b/packages/envd/internal/services/process/start_test.go index 2e851d6d3e..389cb82933 100644 --- a/packages/envd/internal/services/process/start_test.go +++ b/packages/envd/internal/services/process/start_test.go @@ -2,6 +2,7 @@ package process import ( "context" + "fmt" "net/http" "net/http/httptest" "os" @@ -127,3 +128,55 @@ func TestStart_ClientDisconnectMidStream(t *testing.T) { } _ = stream.Close() } + +// TestStart_ProcessSurvivesClientDisconnect verifies that a child +// process keeps running after the Start RPC client disconnects. +// This is intentional: procCtx is context.Background() so clients +// can reconnect via Connect later. +func TestStart_ProcessSurvivesClientDisconnect(t *testing.T) { + t.Parallel() + + client, cleanup := newTestService(t) + defer cleanup() + + ctx, cancel := context.WithCancel(context.Background()) + + stream, err := client.Start(ctx, connect.NewRequest(&rpc.StartRequest{ + Process: &rpc.ProcessConfig{ + Cmd: "timeout", + Args: []string{"5", "yes"}, + }, + })) + require.NoError(t, err) + + // Wait for the start event to get the PID. + require.True(t, stream.Receive(), "expected start event") + startEvt := stream.Msg().GetEvent().GetStart() + require.NotNil(t, startEvt) + pid := int(startEvt.GetPid()) + require.Positive(t, pid) + + // Disconnect the client. + cancel() + for stream.Receive() { + } + _ = stream.Close() + + time.Sleep(200 * time.Millisecond) + + // The child must still be alive — this is the Start-then-Connect contract. + assert.True(t, processAlive(pid), + "child process %d should survive client disconnect", pid) + + // Clean up. + proc, _ := os.FindProcess(pid) + _ = proc.Kill() +} + +// processAlive checks whether a process with the given PID exists. +func processAlive(pid int) bool { + // /proc//stat exists iff the process is alive (Linux-specific). + _, err := os.Stat(fmt.Sprintf("/proc/%d/stat", pid)) + + return err == nil +} From 48912fc946974a6a7a502c1fcaf5af425c71da69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Van=C4=9Bk?= Date: Wed, 6 May 2026 16:50:23 +0200 Subject: [PATCH 02/17] test(envd): add disconnect storm heap growth test TestStart_DisconnectStormHeapGrowth runs 5 Start RPC cycles with a fast stdout producer (yes), disconnects each time, and asserts that heap growth stays under 50 MiB. Currently fails because orphaned handlers keep pumping data into unbounded channel buffers. --- .../internal/services/process/start_test.go | 88 +++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/packages/envd/internal/services/process/start_test.go b/packages/envd/internal/services/process/start_test.go index 389cb82933..cff34bf22b 100644 --- a/packages/envd/internal/services/process/start_test.go +++ b/packages/envd/internal/services/process/start_test.go @@ -7,6 +7,7 @@ import ( "net/http/httptest" "os" "os/user" + "runtime" "testing" "time" @@ -173,6 +174,93 @@ func TestStart_ProcessSurvivesClientDisconnect(t *testing.T) { _ = proc.Kill() } +// TestStart_DisconnectStormHeapGrowth demonstrates the memory leak +// from bug #2: each abandoned Start RPC with a fast-producing child +// leaves behind channel buffers and reader goroutines that accumulate +// memory. We run several disconnect cycles and assert that heap +// usage stays bounded. +func TestStart_DisconnectStormHeapGrowth(t *testing.T) { + t.Parallel() + + client, cleanup := newTestService(t) + defer cleanup() + + const cycles = 5 + + // Force a GC and record baseline heap. + runtime.GC() //nolint:revive // intentional: need accurate heap baseline + var baseline runtime.MemStats + runtime.ReadMemStats(&baseline) + + var pids []int + + for i := range cycles { + ctx, cancel := context.WithCancel(context.Background()) + + stream, err := client.Start(ctx, connect.NewRequest(&rpc.StartRequest{ + Process: &rpc.ProcessConfig{ + Cmd: "timeout", + Args: []string{"10", "yes"}, + }, + })) + require.NoError(t, err, "cycle %d", i) + + // Wait for the start event to get the PID. + require.True(t, stream.Receive(), "cycle %d: expected start event", i) + startEvt := stream.Msg().GetEvent().GetStart() + require.NotNil(t, startEvt, "cycle %d", i) + pids = append(pids, int(startEvt.GetPid())) + + // Receive a few data events so the producer is actively + // writing into the handler's channel buffers. + for range 5 { + if !stream.Receive() { + break + } + } + + // Disconnect. + cancel() + for stream.Receive() { + } + _ = stream.Close() + + // Let the orphaned producer fill buffers for a moment. + time.Sleep(500 * time.Millisecond) + } + + // Measure heap after all cycles. + runtime.GC() //nolint:revive // intentional: need accurate heap measurement + var after runtime.MemStats + runtime.ReadMemStats(&after) + + heapGrowthMiB := float64(after.HeapInuse-baseline.HeapInuse) / (1024 * 1024) + t.Logf("heap: baseline=%.1f MiB after=%.1f MiB growth=%.1f MiB (%d cycles)", + float64(baseline.HeapInuse)/(1024*1024), + float64(after.HeapInuse)/(1024*1024), + heapGrowthMiB, cycles) + + // BUG: with procCtx=context.Background(), each orphaned `yes` + // process keeps pumping ~32 KiB chunks into channel buffers + // that nobody drains. Heap grows roughly linearly with time + // and number of cycles. A healthy implementation should stay + // well under 50 MiB for 5 cycles. + const maxHeapGrowthMiB = 50.0 + if heapGrowthMiB > maxHeapGrowthMiB { + t.Errorf("heap grew %.1f MiB over %d disconnect cycles "+ + "(limit %.1f MiB); orphaned handlers leaking memory", + heapGrowthMiB, cycles, maxHeapGrowthMiB) + } + + // Kill orphaned processes. + for _, pid := range pids { + if processAlive(pid) { + proc, _ := os.FindProcess(pid) + _ = proc.Kill() + } + } +} + // processAlive checks whether a process with the given PID exists. func processAlive(pid int) bool { // /proc//stat exists iff the process is alive (Linux-specific). From 7e876a1330e2e32b8b12f3b1aa3d44a76bca04bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Van=C4=9Bk?= Date: Wed, 6 May 2026 23:32:31 +0200 Subject: [PATCH 03/17] fix(envd): back-pressure child process when no subscribers are connected MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When all Start/Connect RPC subscribers disconnect, the fan-out loop stops consuming from Source (via receiveWhenReady), which lets the Source buffer fill, which blocks the reader goroutine, which fills the pipe, which pauses the child process — natural Unix back-pressure with zero memory growth. Reader goroutines select on outCtx.Done() to unblock from a full Source send when the child exits (cmd.Wait cancels outCtx), preventing deadlock between the reader and the cleanup path. Also restructures handler.Wait() to call cmd.Wait() first (reap the child and close pipes), then cancel outCtx to unblock readers. --- .../services/process/handler/handler.go | 33 +++++-- .../services/process/handler/multiplex.go | 91 ++++++++++++++++++- .../process/handler/multiplex_test.go | 32 ++++--- 3 files changed, 134 insertions(+), 22 deletions(-) diff --git a/packages/envd/internal/services/process/handler/handler.go b/packages/envd/internal/services/process/handler/handler.go index 98108ea881..b5b7d8e625 100644 --- a/packages/envd/internal/services/process/handler/handler.go +++ b/packages/envd/internal/services/process/handler/handler.go @@ -206,13 +206,19 @@ func New( n, readErr := tty.Read(buf) if n > 0 { - outMultiplex.Source <- rpc.ProcessEvent_Data{ + event := rpc.ProcessEvent_Data{ Data: &rpc.ProcessEvent_DataEvent{ Output: &rpc.ProcessEvent_DataEvent_Pty{ Pty: buf[:n], }, }, } + + select { + case outMultiplex.Source <- event: + case <-outCtx.Done(): + return + } } if errors.Is(readErr, io.EOF) { @@ -248,7 +254,7 @@ func New( n, readErr := stdout.Read(buf) if n > 0 { - outMultiplex.Source <- rpc.ProcessEvent_Data{ + event := rpc.ProcessEvent_Data{ Data: &rpc.ProcessEvent_DataEvent{ Output: &rpc.ProcessEvent_DataEvent_Stdout{ Stdout: buf[:n], @@ -256,6 +262,12 @@ func New( }, } + select { + case outMultiplex.Source <- event: + case <-outCtx.Done(): + return + } + stdoutLogs <- buf[:n] } @@ -290,7 +302,7 @@ func New( n, readErr := stderr.Read(buf) if n > 0 { - outMultiplex.Source <- rpc.ProcessEvent_Data{ + event := rpc.ProcessEvent_Data{ Data: &rpc.ProcessEvent_DataEvent{ Output: &rpc.ProcessEvent_DataEvent_Stderr{ Stderr: buf[:n], @@ -298,6 +310,12 @@ func New( }, } + select { + case outMultiplex.Source <- event: + case <-outCtx.Done(): + return + } + stderrLogs <- buf[:n] } @@ -328,7 +346,7 @@ func New( go func() { outWg.Wait() - close(outMultiplex.Source) + outMultiplex.CloseSource() outCancel() }() @@ -440,10 +458,11 @@ func (p *Handler) Start(requestTimeout time.Duration) (uint32, error) { } func (p *Handler) Wait() { - // Wait for the output pipes to be closed or cancelled. - <-p.outCtx.Done() - + // cmd.Wait reaps the child and closes the pipe read-ends. + // Then we cancel outCtx to unblock any reader goroutine that + // is blocked on a full Source channel send (back-pressure). err := p.cmd.Wait() + p.outCancel() p.tty.Close() diff --git a/packages/envd/internal/services/process/handler/multiplex.go b/packages/envd/internal/services/process/handler/multiplex.go index 131f097981..b3e5478c8e 100644 --- a/packages/envd/internal/services/process/handler/multiplex.go +++ b/packages/envd/internal/services/process/handler/multiplex.go @@ -8,12 +8,20 @@ import ( // MultiplexedChannel fans out values written to Source to every subscriber // obtained via Fork. Each subscriber send is guarded by a done channel so // a cancelled consumer can never wedge the fan-out loop. +// +// When no subscribers are connected, the fan-out loop blocks instead of +// draining Source. This lets the Source buffer fill up and back-pressure +// the producer through the pipe. type MultiplexedChannel[T any] struct { Source chan T mu sync.RWMutex channels []*subscriber[T] exited atomic.Bool + + subMu sync.Mutex + subSignal chan struct{} // closed+recreated on subscriber list change + closed atomic.Bool // true after CloseSource } type subscriber[T any] struct { @@ -41,7 +49,8 @@ func (s *subscriber[T]) isCancelled() bool { func NewMultiplexedChannel[T any](buffer int) *MultiplexedChannel[T] { c := &MultiplexedChannel[T]{ - Source: make(chan T, buffer), + Source: make(chan T, buffer), + subSignal: make(chan struct{}), } go c.run() @@ -51,8 +60,19 @@ func NewMultiplexedChannel[T any](buffer int) *MultiplexedChannel[T] { // run is the fan-out loop. It delivers each Source value to every live // subscriber and closes all consumer channels when Source is closed. +// +// When no active subscribers remain after delivering a value, run +// stops consuming from Source until a subscriber appears. This lets +// Source fill up, which back-pressures the reader goroutine, which +// in turn back-pressures the child process through the pipe. func (m *MultiplexedChannel[T]) run() { - for v := range m.Source { + for { + // Wait for either a value from Source or a subscriber to appear. + v, ok := m.receiveWhenReady() + if !ok { + break + } + m.mu.RLock() subs := m.channels m.mu.RUnlock() @@ -83,6 +103,67 @@ func (m *MultiplexedChannel[T]) run() { m.channels = nil } +// receiveWhenReady reads the next value from Source, but only when at +// least one active subscriber exists. When there are no subscribers +// it stops consuming so Source fills up and back-pressures the +// producer. Returns (zero, false) when Source is closed. +// +// Callers that close Source must also call NotifySubscriberChange so +// the fan-out loop wakes up and observes the closed channel. +func (m *MultiplexedChannel[T]) receiveWhenReady() (v T, ok bool) { + for { + if m.closed.Load() { + // Drain any remaining buffered values. + v, ok = <-m.Source + + return v, ok + } + + if m.HasSubscribers() { + v, ok = <-m.Source + + return v, ok + } + + // No active subscribers — wait for a change notification. + m.subMu.Lock() + sig := m.subSignal + m.subMu.Unlock() + + <-sig + } +} + +// CloseSource closes the Source channel and wakes the fan-out loop. +func (m *MultiplexedChannel[T]) CloseSource() { + m.closed.Store(true) + close(m.Source) + m.NotifySubscriberChange() +} + +// NotifySubscriberChange wakes the fan-out if it's waiting. +func (m *MultiplexedChannel[T]) NotifySubscriberChange() { + m.subMu.Lock() + defer m.subMu.Unlock() + + close(m.subSignal) + m.subSignal = make(chan struct{}) +} + +// HasSubscribers reports whether any non-cancelled subscriber exists. +func (m *MultiplexedChannel[T]) HasSubscribers() bool { + m.mu.RLock() + defer m.mu.RUnlock() + + for _, s := range m.channels { + if !s.isCancelled() { + return true + } + } + + return false +} + // Fork registers a new subscriber and returns its channel plus a cancel func. // If Source is already closed it returns a pre-closed channel and a no-op cancel. // The channel is bidirectional for backwards compat with start.go which writes @@ -112,6 +193,7 @@ func (m *MultiplexedChannel[T]) Fork() (chan T, func()) { } m.channels = append(m.channels, s) + m.NotifySubscriberChange() return s.ch, func() { m.remove(s) @@ -124,13 +206,16 @@ func (m *MultiplexedChannel[T]) remove(s *subscriber[T]) { s.cancel() m.mu.Lock() - defer m.mu.Unlock() for i, sub := range m.channels { if sub == s { m.channels = append(m.channels[:i], m.channels[i+1:]...) + m.mu.Unlock() + m.NotifySubscriberChange() return } } + + m.mu.Unlock() } diff --git a/packages/envd/internal/services/process/handler/multiplex_test.go b/packages/envd/internal/services/process/handler/multiplex_test.go index ae79495157..0e7cd70566 100644 --- a/packages/envd/internal/services/process/handler/multiplex_test.go +++ b/packages/envd/internal/services/process/handler/multiplex_test.go @@ -78,7 +78,7 @@ func TestMultiplexedChannel_BasicFanOut(t *testing.T) { ) } - close(m.Source) + m.CloseSource() wg.Wait() assert.Equal(t, []int{1, 2, 3}, gotA) @@ -90,7 +90,7 @@ func TestMultiplexedChannel_AbandonedConsumerDoesNotWedgeFanOut(t *testing.T) { t.Parallel() m := NewMultiplexedChannel[int](1) - t.Cleanup(func() { close(m.Source) }) + t.Cleanup(func() { m.CloseSource() }) abandoned, cancelAbandoned := m.Fork() @@ -124,13 +124,20 @@ func TestMultiplexedChannel_AbandonedConsumerDoesNotWedgeFanOut(t *testing.T) { t.Fatal("cancel func did not return promptly; fan-out is wedged") } - // Producer should still make progress through Source. + // With no subscribers, the fan-out stops draining Source to apply + // back-pressure. After the in-flight value is consumed, Source + // fills and further sends block. + sent := 0 for i := 2; i <= 8; i++ { - require.Truef(t, - sendOrTimeout(t, m.Source, i, multiplexTestTimeout), - "send %d should not be back-pressured by an abandoned consumer", i, - ) + if !sendOrTimeout(t, m.Source, i, 100*time.Millisecond) { + break + } + sent++ } + // The buffer (size 1) plus at most one in-flight value means + // at most ~2 sends succeed before blocking. + assert.LessOrEqual(t, sent, 3, + "fan-out should stop draining Source when no subscribers remain (back-pressure)") } // Regression: an abandoned consumer must not starve other subscribers. @@ -138,7 +145,7 @@ func TestMultiplexedChannel_AbandonedConsumerDoesNotStarveOthers(t *testing.T) { t.Parallel() m := NewMultiplexedChannel[int](1) - t.Cleanup(func() { close(m.Source) }) + t.Cleanup(func() { m.CloseSource() }) healthy, cancelHealthy := m.Fork() t.Cleanup(cancelHealthy) @@ -202,7 +209,7 @@ func TestMultiplexedChannel_CancelIsIdempotentAndPrompt(t *testing.T) { t.Cleanup(func() { close(stop) <-producerDone - close(m.Source) + m.CloseSource() }) // Give the fan-out a chance to enter a per-subscriber select. @@ -243,7 +250,7 @@ func TestMultiplexedChannel_SourceCloseClosesLiveSubscribers(t *testing.T) { "send should succeed", ) - close(m.Source) + m.CloseSource() select { case <-done: @@ -258,7 +265,7 @@ func TestMultiplexedChannel_ForkAfterSourceCloseReturnsClosedChan(t *testing.T) t.Parallel() m := NewMultiplexedChannel[int](0) - close(m.Source) + m.CloseSource() // Wait for the fan-out goroutine to observe Source close. deadline := time.Now().Add(multiplexTestTimeout) @@ -290,7 +297,8 @@ func TestMultiplexedChannel_NoGoroutineLeakOnAbandon(t *testing.T) { //nolint:pa // Park one value so fan-out has work, then cancel mid-iteration. m.Source <- 0 cancel() - close(m.Source) + m.CloseSource() + m.NotifySubscriberChange() } // Allow scheduled goroutines to finish. From 8a70e866a552c3b2c59514930ef53c8b4d89a92b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Van=C4=9Bk?= Date: Thu, 7 May 2026 10:04:42 +0200 Subject: [PATCH 04/17] test(envd): add reproducer for EndEvent.Source deadlock when all subscribers gone Send to an unbuffered Source channel deadlocks after the last subscriber is cancelled. receiveWhenReady parks on <-sig waiting for a subscriber that will never arrive, so the producer (Wait) blocks forever. This leaks the Wait goroutine and prevents processes.Delete from running. --- .../process/handler/multiplex_test.go | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/packages/envd/internal/services/process/handler/multiplex_test.go b/packages/envd/internal/services/process/handler/multiplex_test.go index 0e7cd70566..be688ab3fd 100644 --- a/packages/envd/internal/services/process/handler/multiplex_test.go +++ b/packages/envd/internal/services/process/handler/multiplex_test.go @@ -283,6 +283,48 @@ func TestMultiplexedChannel_ForkAfterSourceCloseReturnsClosedChan(t *testing.T) assert.False(t, ok, "Fork after shutdown must return a pre-closed channel") } +// Regression: sending to an unbuffered Source after the last subscriber +// cancelled must not deadlock. This reproduces the bug where +// handler.Wait() calls `p.EndEvent.Source <- event` on an unbuffered +// (buffer=0) EndEvent channel after the handleStart/handleConnect +// handler has returned and run `defer endCancel()`. With +// back-pressure, receiveWhenReady parks on <-sig waiting for a +// subscriber that will never come, so the Source send blocks forever. +func TestMultiplexedChannel_SendToUnbufferedSourceAfterLastCancelDeadlocks(t *testing.T) { + t.Parallel() + + // Mirror EndEvent: buffer=0. + m := NewMultiplexedChannel[int](0) + + _, cancel := m.Fork() + + // Cancel the only subscriber immediately, without ever + // consuming a value. This simulates the case where + // handleStart returns (running `defer endCancel()`) before + // the child process exits and Wait() tries to send the + // EndEvent. + // + // After cancel, the fan-out loop will observe zero subscribers + // and park on <-sig in receiveWhenReady. + cancel() + + // Give the fan-out goroutine time to re-enter receiveWhenReady, + // observe HasSubscribers()==false, and park on <-sig. + time.Sleep(50 * time.Millisecond) + + // Now attempt to send — this models handler.Wait()'s + // `p.EndEvent.Source <- event`. With the bug, this blocks + // forever because receiveWhenReady is parked waiting for a + // subscriber and nobody will ever wake it. + sent := sendOrTimeout(t, m.Source, 1, 2*time.Second) + if !sent { + t.Fatal("Source send deadlocked after last subscriber cancelled; " + + "receiveWhenReady must not block the producer when Source is unbuffered") + } + + m.CloseSource() +} + // Goroutine count must return to baseline after cancelled subscribers settle. func TestMultiplexedChannel_NoGoroutineLeakOnAbandon(t *testing.T) { //nolint:paralleltest // relies on a stable goroutine count const wedges = 16 From 1b370f8794e066fa57ab0f4d012ec83e5112590a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Van=C4=9Bk?= Date: Thu, 7 May 2026 10:05:23 +0200 Subject: [PATCH 05/17] test(envd): add reproducer for fan-out goroutine leak on direct close(Source) Calling close(Source) instead of CloseSource() after the last subscriber is cancelled leaks the fan-out goroutine. The closed flag is never set and NotifySubscriberChange is never called, so receiveWhenReady stays parked on <-sig forever. This matches start.go:101 where `defer close(startMultiplexer.Source)` bypasses CloseSource(). --- .../process/handler/multiplex_test.go | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/packages/envd/internal/services/process/handler/multiplex_test.go b/packages/envd/internal/services/process/handler/multiplex_test.go index be688ab3fd..ff78feee57 100644 --- a/packages/envd/internal/services/process/handler/multiplex_test.go +++ b/packages/envd/internal/services/process/handler/multiplex_test.go @@ -325,6 +325,52 @@ func TestMultiplexedChannel_SendToUnbufferedSourceAfterLastCancelDeadlocks(t *te m.CloseSource() } +// Regression: closing Source directly (bypassing CloseSource) after the +// last subscriber is cancelled leaks the fan-out goroutine. This +// reproduces the bug in start.go:101 where `defer close(startMultiplexer.Source)` +// is used instead of `defer startMultiplexer.CloseSource()`. +// +// Because defers run LIFO, startCancel() (line 104) runs first and +// removes the only subscriber. Then close(Source) fires, but +// the fan-out goroutine is parked on <-sig in receiveWhenReady +// (because closed flag was never set and no NotifySubscriberChange +// was called), so it never observes the channel close and leaks. +func TestMultiplexedChannel_DirectCloseSourceAfterCancelLeaksFanOut(t *testing.T) { //nolint:paralleltest // relies on a stable goroutine count + const iterations = 8 + + time.Sleep(50 * time.Millisecond) + runtime.GC() //nolint:revive // intentional: settle goroutines before measuring baseline + before := runtime.NumGoroutine() + + for range iterations { + m := NewMultiplexedChannel[int](0) + _, cancel := m.Fork() + + // Simulate LIFO defer order in handleStart: + // 1. startCancel() runs first — removes subscriber + // 2. close(startMultiplexer.Source) runs second + cancel() + time.Sleep(10 * time.Millisecond) // let fan-out park on <-sig + close(m.Source) // bypass CloseSource — the bug + } + + // Allow goroutines to settle. + time.Sleep(200 * time.Millisecond) + runtime.GC() //nolint:revive // intentional: help goroutines finalize + runtime.Gosched() + + after := runtime.NumGoroutine() + leaked := after - before + + // Each iteration leaks one fan-out goroutine with the bug. + assert.LessOrEqualf(t, leaked, 2, + "goroutine count grew by %d after %d iterations of "+ + "cancel-then-close(Source); before=%d after=%d — "+ + "fan-out goroutines are leaking (use CloseSource instead "+ + "of close(Source))", leaked, iterations, before, after, + ) +} + // Goroutine count must return to baseline after cancelled subscribers settle. func TestMultiplexedChannel_NoGoroutineLeakOnAbandon(t *testing.T) { //nolint:paralleltest // relies on a stable goroutine count const wedges = 16 From 5293a9dcf332826a71f5592d1d6ee80613e4cd35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Van=C4=9Bk?= Date: Thu, 7 May 2026 10:10:25 +0200 Subject: [PATCH 06/17] fix(envd): prevent EndEvent deadlock and startMultiplexer fan-out leak Two related fixes for regressions introduced by the back-pressure commit (3e6e57e52): 1. EndEvent.Source send deadlock: Wait() sends to EndEvent.Source after all subscribers are gone. With buffer=0, this blocks forever because receiveWhenReady parks on <-sig. Fix: use buffer=1 for EndEvent so the single send always succeeds, and call CloseSource() after the send so the fan-out exits. 2. startMultiplexer fan-out leak: start.go used bare close(Source) instead of CloseSource(), so the closed flag was never set and NotifySubscriberChange was never called. The fan-out goroutine stayed parked on <-sig forever. Fix: use CloseSource(). --- .../services/process/handler/handler.go | 3 +- .../services/process/handler/multiplex.go | 4 +- .../process/handler/multiplex_test.go | 73 +++++++++---------- .../envd/internal/services/process/start.go | 2 +- 4 files changed, 41 insertions(+), 41 deletions(-) diff --git a/packages/envd/internal/services/process/handler/handler.go b/packages/envd/internal/services/process/handler/handler.go index b5b7d8e625..34bc1a1da3 100644 --- a/packages/envd/internal/services/process/handler/handler.go +++ b/packages/envd/internal/services/process/handler/handler.go @@ -184,7 +184,7 @@ func New( cancel: cancel, outCtx: outCtx, outCancel: outCancel, - EndEvent: NewMultiplexedChannel[rpc.ProcessEvent_End](0), + EndEvent: NewMultiplexedChannel[rpc.ProcessEvent_End](1), logger: logger, } @@ -485,6 +485,7 @@ func (p *Handler) Wait() { } p.EndEvent.Source <- event + p.EndEvent.CloseSource() p.logger. Info(). diff --git a/packages/envd/internal/services/process/handler/multiplex.go b/packages/envd/internal/services/process/handler/multiplex.go index b3e5478c8e..43e3b51961 100644 --- a/packages/envd/internal/services/process/handler/multiplex.go +++ b/packages/envd/internal/services/process/handler/multiplex.go @@ -108,8 +108,8 @@ func (m *MultiplexedChannel[T]) run() { // it stops consuming so Source fills up and back-pressures the // producer. Returns (zero, false) when Source is closed. // -// Callers that close Source must also call NotifySubscriberChange so -// the fan-out loop wakes up and observes the closed channel. +// Callers that close Source must use CloseSource (not bare close) +// so the fan-out loop wakes up and observes the closed channel. func (m *MultiplexedChannel[T]) receiveWhenReady() (v T, ok bool) { for { if m.closed.Load() { diff --git a/packages/envd/internal/services/process/handler/multiplex_test.go b/packages/envd/internal/services/process/handler/multiplex_test.go index ff78feee57..e500924f6c 100644 --- a/packages/envd/internal/services/process/handler/multiplex_test.go +++ b/packages/envd/internal/services/process/handler/multiplex_test.go @@ -283,18 +283,19 @@ func TestMultiplexedChannel_ForkAfterSourceCloseReturnsClosedChan(t *testing.T) assert.False(t, ok, "Fork after shutdown must return a pre-closed channel") } -// Regression: sending to an unbuffered Source after the last subscriber -// cancelled must not deadlock. This reproduces the bug where -// handler.Wait() calls `p.EndEvent.Source <- event` on an unbuffered -// (buffer=0) EndEvent channel after the handleStart/handleConnect -// handler has returned and run `defer endCancel()`. With -// back-pressure, receiveWhenReady parks on <-sig waiting for a -// subscriber that will never come, so the Source send blocks forever. -func TestMultiplexedChannel_SendToUnbufferedSourceAfterLastCancelDeadlocks(t *testing.T) { +// Regression: sending to Source after the last subscriber cancelled +// must not deadlock. This reproduces the bug where handler.Wait() +// calls `p.EndEvent.Source <- event` after the handleStart/handleConnect +// handler has returned and run `defer endCancel()`. +// +// The fix is to give EndEvent a buffer of 1 so the single send in +// Wait() always succeeds, and then call CloseSource() so the fan-out +// goroutine can exit cleanly. +func TestMultiplexedChannel_SendToSourceAfterLastCancelDoesNotDeadlock(t *testing.T) { t.Parallel() - // Mirror EndEvent: buffer=0. - m := NewMultiplexedChannel[int](0) + // Mirror EndEvent: buffer=1 (the fix — was 0 before). + m := NewMultiplexedChannel[int](1) _, cancel := m.Fork() @@ -303,9 +304,6 @@ func TestMultiplexedChannel_SendToUnbufferedSourceAfterLastCancelDeadlocks(t *te // handleStart returns (running `defer endCancel()`) before // the child process exits and Wait() tries to send the // EndEvent. - // - // After cancel, the fan-out loop will observe zero subscribers - // and park on <-sig in receiveWhenReady. cancel() // Give the fan-out goroutine time to re-enter receiveWhenReady, @@ -313,29 +311,26 @@ func TestMultiplexedChannel_SendToUnbufferedSourceAfterLastCancelDeadlocks(t *te time.Sleep(50 * time.Millisecond) // Now attempt to send — this models handler.Wait()'s - // `p.EndEvent.Source <- event`. With the bug, this blocks - // forever because receiveWhenReady is parked waiting for a - // subscriber and nobody will ever wake it. + // `p.EndEvent.Source <- event`. With buffer=1 the send + // succeeds immediately even though no subscriber is draining. sent := sendOrTimeout(t, m.Source, 1, 2*time.Second) if !sent { t.Fatal("Source send deadlocked after last subscriber cancelled; " + - "receiveWhenReady must not block the producer when Source is unbuffered") + "EndEvent must use buffer >= 1 so Wait() never blocks") } + // CloseSource wakes the fan-out so it drains the buffered + // value and exits. Without this the fan-out goroutine leaks. m.CloseSource() } -// Regression: closing Source directly (bypassing CloseSource) after the -// last subscriber is cancelled leaks the fan-out goroutine. This -// reproduces the bug in start.go:101 where `defer close(startMultiplexer.Source)` -// is used instead of `defer startMultiplexer.CloseSource()`. -// -// Because defers run LIFO, startCancel() (line 104) runs first and -// removes the only subscriber. Then close(Source) fires, but -// the fan-out goroutine is parked on <-sig in receiveWhenReady -// (because closed flag was never set and no NotifySubscriberChange -// was called), so it never observes the channel close and leaks. -func TestMultiplexedChannel_DirectCloseSourceAfterCancelLeaksFanOut(t *testing.T) { //nolint:paralleltest // relies on a stable goroutine count +// Regression: CloseSource after the last subscriber is cancelled must +// not leak the fan-out goroutine. This reproduces the pattern from +// start.go where startCancel() runs before CloseSource() (LIFO defer +// order). Previously start.go used bare close(Source) which bypassed +// the closed flag and NotifySubscriberChange, leaving the fan-out +// goroutine parked on <-sig forever. +func TestMultiplexedChannel_CloseSourceAfterCancelDoesNotLeakFanOut(t *testing.T) { //nolint:paralleltest // relies on a stable goroutine count const iterations = 8 time.Sleep(50 * time.Millisecond) @@ -348,26 +343,30 @@ func TestMultiplexedChannel_DirectCloseSourceAfterCancelLeaksFanOut(t *testing.T // Simulate LIFO defer order in handleStart: // 1. startCancel() runs first — removes subscriber - // 2. close(startMultiplexer.Source) runs second + // 2. CloseSource() runs second (the fix — was bare close before) cancel() time.Sleep(10 * time.Millisecond) // let fan-out park on <-sig - close(m.Source) // bypass CloseSource — the bug + m.CloseSource() } // Allow goroutines to settle. - time.Sleep(200 * time.Millisecond) - runtime.GC() //nolint:revive // intentional: help goroutines finalize - runtime.Gosched() + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + runtime.GC() //nolint:revive // intentional: help goroutines finalize + runtime.Gosched() + time.Sleep(20 * time.Millisecond) + if runtime.NumGoroutine() <= before+2 { + break + } + } after := runtime.NumGoroutine() leaked := after - before - // Each iteration leaks one fan-out goroutine with the bug. assert.LessOrEqualf(t, leaked, 2, "goroutine count grew by %d after %d iterations of "+ - "cancel-then-close(Source); before=%d after=%d — "+ - "fan-out goroutines are leaking (use CloseSource instead "+ - "of close(Source))", leaked, iterations, before, after, + "cancel-then-CloseSource; before=%d after=%d — "+ + "fan-out goroutines are leaking", leaked, iterations, before, after, ) } diff --git a/packages/envd/internal/services/process/start.go b/packages/envd/internal/services/process/start.go index ef75a9e6d4..2458ec06ef 100644 --- a/packages/envd/internal/services/process/start.go +++ b/packages/envd/internal/services/process/start.go @@ -98,7 +98,7 @@ func (s *Service) handleStart(ctx context.Context, req *connect.Request[rpc.Star exitChan := make(chan struct{}) startMultiplexer := handler.NewMultiplexedChannel[rpc.ProcessEvent_Start](0) - defer close(startMultiplexer.Source) + defer startMultiplexer.CloseSource() start, startCancel := startMultiplexer.Fork() defer startCancel() From 0636aece7b0fd4c074ff674ff4c2f2aaa5e8c9b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Van=C4=9Bk?= Date: Thu, 7 May 2026 15:38:28 +0200 Subject: [PATCH 07/17] fix(envd): close TOCTOU race in receiveWhenReady that deadlocked fan-out When cancel() and CloseSource() fired in quick succession, the fan-out goroutine could grab a signal channel created *by* CloseSource's NotifySubscriberChange, then park on it forever since no further notifications would arrive. Re-check closed after acquiring the signal to close the window. Replace exited atomic.Bool with a done channel closed when run() finishes. Fork() selects on it to detect shutdown, and tests use it to wait deterministically for run() to exit. --- .../services/process/handler/multiplex.go | 32 ++++--- .../process/handler/multiplex_test.go | 89 +++++++------------ 2 files changed, 52 insertions(+), 69 deletions(-) diff --git a/packages/envd/internal/services/process/handler/multiplex.go b/packages/envd/internal/services/process/handler/multiplex.go index 43e3b51961..84567914f4 100644 --- a/packages/envd/internal/services/process/handler/multiplex.go +++ b/packages/envd/internal/services/process/handler/multiplex.go @@ -17,7 +17,7 @@ type MultiplexedChannel[T any] struct { mu sync.RWMutex channels []*subscriber[T] - exited atomic.Bool + done chan struct{} // closed when run() returns subMu sync.Mutex subSignal chan struct{} // closed+recreated on subscriber list change @@ -50,6 +50,7 @@ func (s *subscriber[T]) isCancelled() bool { func NewMultiplexedChannel[T any](buffer int) *MultiplexedChannel[T] { c := &MultiplexedChannel[T]{ Source: make(chan T, buffer), + done: make(chan struct{}), subSignal: make(chan struct{}), } @@ -90,17 +91,19 @@ func (m *MultiplexedChannel[T]) run() { } } - m.exited.Store(true) - // Close all remaining consumer channels so `for range` loops exit. m.mu.Lock() - defer m.mu.Unlock() for _, s := range m.channels { s.cancel() close(s.ch) } m.channels = nil + + m.mu.Unlock() + + // Signal that run() has finished. Fork() uses this to detect shutdown. + close(m.done) } // receiveWhenReady reads the next value from Source, but only when at @@ -112,13 +115,6 @@ func (m *MultiplexedChannel[T]) run() { // so the fan-out loop wakes up and observes the closed channel. func (m *MultiplexedChannel[T]) receiveWhenReady() (v T, ok bool) { for { - if m.closed.Load() { - // Drain any remaining buffered values. - v, ok = <-m.Source - - return v, ok - } - if m.HasSubscribers() { v, ok = <-m.Source @@ -130,6 +126,12 @@ func (m *MultiplexedChannel[T]) receiveWhenReady() (v T, ok bool) { sig := m.subSignal m.subMu.Unlock() + if m.closed.Load() { + v, ok = <-m.Source + + return v, ok + } + <-sig } } @@ -169,22 +171,26 @@ func (m *MultiplexedChannel[T]) HasSubscribers() bool { // The channel is bidirectional for backwards compat with start.go which writes // a bootstrap event into it; new callers should treat it as receive-only. func (m *MultiplexedChannel[T]) Fork() (chan T, func()) { - if m.exited.Load() { + select { + case <-m.done: ch := make(chan T) close(ch) return ch, func() {} + default: } m.mu.Lock() defer m.mu.Unlock() // Re-check under lock in case run() finished between the fast path and here. - if m.exited.Load() { + select { + case <-m.done: ch := make(chan T) close(ch) return ch, func() {} + default: } s := &subscriber[T]{ diff --git a/packages/envd/internal/services/process/handler/multiplex_test.go b/packages/envd/internal/services/process/handler/multiplex_test.go index e500924f6c..7ec86de6d4 100644 --- a/packages/envd/internal/services/process/handler/multiplex_test.go +++ b/packages/envd/internal/services/process/handler/multiplex_test.go @@ -1,7 +1,6 @@ package handler import ( - "runtime" "sync" "testing" "time" @@ -90,7 +89,6 @@ func TestMultiplexedChannel_AbandonedConsumerDoesNotWedgeFanOut(t *testing.T) { t.Parallel() m := NewMultiplexedChannel[int](1) - t.Cleanup(func() { m.CloseSource() }) abandoned, cancelAbandoned := m.Fork() @@ -138,6 +136,14 @@ func TestMultiplexedChannel_AbandonedConsumerDoesNotWedgeFanOut(t *testing.T) { // at most ~2 sends succeed before blocking. assert.LessOrEqual(t, sent, 3, "fan-out should stop draining Source when no subscribers remain (back-pressure)") + + m.CloseSource() + + select { + case <-m.done: + case <-time.After(2 * time.Second): + t.Fatal("fan-out did not exit after CloseSource") + } } // Regression: an abandoned consumer must not starve other subscribers. @@ -267,13 +273,10 @@ func TestMultiplexedChannel_ForkAfterSourceCloseReturnsClosedChan(t *testing.T) m := NewMultiplexedChannel[int](0) m.CloseSource() - // Wait for the fan-out goroutine to observe Source close. - deadline := time.Now().Add(multiplexTestTimeout) - for !m.exited.Load() { - if time.Now().After(deadline) { - t.Fatal("fan-out did not mark itself exited after Source close") - } - time.Sleep(time.Millisecond) + select { + case <-m.done: + case <-time.After(multiplexTestTimeout): + t.Fatal("fan-out did not exit after Source close") } cons, cancel := m.Fork() @@ -322,6 +325,12 @@ func TestMultiplexedChannel_SendToSourceAfterLastCancelDoesNotDeadlock(t *testin // CloseSource wakes the fan-out so it drains the buffered // value and exits. Without this the fan-out goroutine leaks. m.CloseSource() + + select { + case <-m.done: + case <-time.After(2 * time.Second): + t.Fatal("fan-out did not drain buffered value and exit after CloseSource") + } } // Regression: CloseSource after the last subscriber is cancelled must @@ -330,12 +339,10 @@ func TestMultiplexedChannel_SendToSourceAfterLastCancelDoesNotDeadlock(t *testin // order). Previously start.go used bare close(Source) which bypassed // the closed flag and NotifySubscriberChange, leaving the fan-out // goroutine parked on <-sig forever. -func TestMultiplexedChannel_CloseSourceAfterCancelDoesNotLeakFanOut(t *testing.T) { //nolint:paralleltest // relies on a stable goroutine count - const iterations = 8 +func TestMultiplexedChannel_CloseSourceAfterCancelDoesNotLeakFanOut(t *testing.T) { + t.Parallel() - time.Sleep(50 * time.Millisecond) - runtime.GC() //nolint:revive // intentional: settle goroutines before measuring baseline - before := runtime.NumGoroutine() + const iterations = 8 for range iterations { m := NewMultiplexedChannel[int](0) @@ -347,36 +354,20 @@ func TestMultiplexedChannel_CloseSourceAfterCancelDoesNotLeakFanOut(t *testing.T cancel() time.Sleep(10 * time.Millisecond) // let fan-out park on <-sig m.CloseSource() - } - // Allow goroutines to settle. - deadline := time.Now().Add(2 * time.Second) - for time.Now().Before(deadline) { - runtime.GC() //nolint:revive // intentional: help goroutines finalize - runtime.Gosched() - time.Sleep(20 * time.Millisecond) - if runtime.NumGoroutine() <= before+2 { - break + select { + case <-m.done: + case <-time.After(2 * time.Second): + t.Fatal("fan-out goroutine did not exit after cancel-then-CloseSource") } } - - after := runtime.NumGoroutine() - leaked := after - before - - assert.LessOrEqualf(t, leaked, 2, - "goroutine count grew by %d after %d iterations of "+ - "cancel-then-CloseSource; before=%d after=%d — "+ - "fan-out goroutines are leaking", leaked, iterations, before, after, - ) } -// Goroutine count must return to baseline after cancelled subscribers settle. -func TestMultiplexedChannel_NoGoroutineLeakOnAbandon(t *testing.T) { //nolint:paralleltest // relies on a stable goroutine count - const wedges = 16 +// Fan-out goroutine must exit after cancelled subscribers and CloseSource. +func TestMultiplexedChannel_NoGoroutineLeakOnAbandon(t *testing.T) { + t.Parallel() - time.Sleep(50 * time.Millisecond) - runtime.GC() //nolint:revive // intentional: settle goroutines before measuring baseline - before := runtime.NumGoroutine() + const wedges = 16 for range wedges { m := NewMultiplexedChannel[int](1) @@ -385,25 +376,11 @@ func TestMultiplexedChannel_NoGoroutineLeakOnAbandon(t *testing.T) { //nolint:pa m.Source <- 0 cancel() m.CloseSource() - m.NotifySubscriberChange() - } - // Allow scheduled goroutines to finish. - deadline := time.Now().Add(2 * time.Second) - for time.Now().Before(deadline) { - runtime.GC() //nolint:revive // intentional: help goroutines finalize - runtime.Gosched() - time.Sleep(20 * time.Millisecond) - if runtime.NumGoroutine() <= before+2 { - break + select { + case <-m.done: + case <-time.After(2 * time.Second): + t.Fatal("fan-out goroutine did not exit after cancel + CloseSource") } } - - after := runtime.NumGoroutine() - leaked := after - before - // Small slack for runtime bookkeeping; the old bug leaked >= wedges. - assert.LessOrEqualf(t, leaked, 2, - "goroutine count grew by %d after %d cancelled wedges; "+ - "before=%d after=%d (expected ~0)", leaked, wedges, before, after, - ) } From 043dc2e0ad0fbe970018a5e502894806367b4f52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Van=C4=9Bk?= Date: Thu, 7 May 2026 14:33:40 +0200 Subject: [PATCH 08/17] test(envd): remove flaky DisconnectStormHeapGrowth test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The heap measurement is inherently racy — GC timing, ambient allocations, and uint64 underflow on heap shrinkage make it unreliable at high -count values. The back-pressure behavior it validated is already covered by the multiplex and disconnect tests. --- .../internal/services/process/start_test.go | 88 ------------------- 1 file changed, 88 deletions(-) diff --git a/packages/envd/internal/services/process/start_test.go b/packages/envd/internal/services/process/start_test.go index cff34bf22b..389cb82933 100644 --- a/packages/envd/internal/services/process/start_test.go +++ b/packages/envd/internal/services/process/start_test.go @@ -7,7 +7,6 @@ import ( "net/http/httptest" "os" "os/user" - "runtime" "testing" "time" @@ -174,93 +173,6 @@ func TestStart_ProcessSurvivesClientDisconnect(t *testing.T) { _ = proc.Kill() } -// TestStart_DisconnectStormHeapGrowth demonstrates the memory leak -// from bug #2: each abandoned Start RPC with a fast-producing child -// leaves behind channel buffers and reader goroutines that accumulate -// memory. We run several disconnect cycles and assert that heap -// usage stays bounded. -func TestStart_DisconnectStormHeapGrowth(t *testing.T) { - t.Parallel() - - client, cleanup := newTestService(t) - defer cleanup() - - const cycles = 5 - - // Force a GC and record baseline heap. - runtime.GC() //nolint:revive // intentional: need accurate heap baseline - var baseline runtime.MemStats - runtime.ReadMemStats(&baseline) - - var pids []int - - for i := range cycles { - ctx, cancel := context.WithCancel(context.Background()) - - stream, err := client.Start(ctx, connect.NewRequest(&rpc.StartRequest{ - Process: &rpc.ProcessConfig{ - Cmd: "timeout", - Args: []string{"10", "yes"}, - }, - })) - require.NoError(t, err, "cycle %d", i) - - // Wait for the start event to get the PID. - require.True(t, stream.Receive(), "cycle %d: expected start event", i) - startEvt := stream.Msg().GetEvent().GetStart() - require.NotNil(t, startEvt, "cycle %d", i) - pids = append(pids, int(startEvt.GetPid())) - - // Receive a few data events so the producer is actively - // writing into the handler's channel buffers. - for range 5 { - if !stream.Receive() { - break - } - } - - // Disconnect. - cancel() - for stream.Receive() { - } - _ = stream.Close() - - // Let the orphaned producer fill buffers for a moment. - time.Sleep(500 * time.Millisecond) - } - - // Measure heap after all cycles. - runtime.GC() //nolint:revive // intentional: need accurate heap measurement - var after runtime.MemStats - runtime.ReadMemStats(&after) - - heapGrowthMiB := float64(after.HeapInuse-baseline.HeapInuse) / (1024 * 1024) - t.Logf("heap: baseline=%.1f MiB after=%.1f MiB growth=%.1f MiB (%d cycles)", - float64(baseline.HeapInuse)/(1024*1024), - float64(after.HeapInuse)/(1024*1024), - heapGrowthMiB, cycles) - - // BUG: with procCtx=context.Background(), each orphaned `yes` - // process keeps pumping ~32 KiB chunks into channel buffers - // that nobody drains. Heap grows roughly linearly with time - // and number of cycles. A healthy implementation should stay - // well under 50 MiB for 5 cycles. - const maxHeapGrowthMiB = 50.0 - if heapGrowthMiB > maxHeapGrowthMiB { - t.Errorf("heap grew %.1f MiB over %d disconnect cycles "+ - "(limit %.1f MiB); orphaned handlers leaking memory", - heapGrowthMiB, cycles, maxHeapGrowthMiB) - } - - // Kill orphaned processes. - for _, pid := range pids { - if processAlive(pid) { - proc, _ := os.FindProcess(pid) - _ = proc.Kill() - } - } -} - // processAlive checks whether a process with the given PID exists. func processAlive(pid int) bool { // /proc//stat exists iff the process is alive (Linux-specific). From e34e7247f43348ffdc168ae06ec0ad89b37ed2cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Van=C4=9Bk?= Date: Mon, 11 May 2026 12:46:25 +0200 Subject: [PATCH 09/17] chore(envd): bump version to 0.5.18 --- packages/envd/pkg/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/envd/pkg/version.go b/packages/envd/pkg/version.go index a5ccaf4cab..e251abd82f 100644 --- a/packages/envd/pkg/version.go +++ b/packages/envd/pkg/version.go @@ -1,3 +1,3 @@ package pkg -const Version = "0.5.17" +const Version = "0.5.18" From c55adcc55957a2a9625ed8e333da8341f0619840 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Van=C4=9Bk?= Date: Mon, 11 May 2026 14:23:27 +0200 Subject: [PATCH 10/17] test(envd): clean up test comments in multiplex_test.go --- .../process/handler/multiplex_test.go | 64 +++++-------------- 1 file changed, 16 insertions(+), 48 deletions(-) diff --git a/packages/envd/internal/services/process/handler/multiplex_test.go b/packages/envd/internal/services/process/handler/multiplex_test.go index 7ec86de6d4..b53f178c78 100644 --- a/packages/envd/internal/services/process/handler/multiplex_test.go +++ b/packages/envd/internal/services/process/handler/multiplex_test.go @@ -9,8 +9,7 @@ import ( "github.com/stretchr/testify/require" ) -// Tests for MultiplexedChannel fan-out, covering the fix for the goroutine -// leak that occurred when a subscriber disconnected mid-send. +// Tests for MultiplexedChannel fan-out. const multiplexTestTimeout = 500 * time.Millisecond @@ -84,7 +83,7 @@ func TestMultiplexedChannel_BasicFanOut(t *testing.T) { assert.Equal(t, []int{1, 2, 3}, gotB) } -// Regression: an abandoned consumer must not wedge the fan-out loop. +// An abandoned consumer must not wedge the fan-out loop. func TestMultiplexedChannel_AbandonedConsumerDoesNotWedgeFanOut(t *testing.T) { t.Parallel() @@ -92,7 +91,7 @@ func TestMultiplexedChannel_AbandonedConsumerDoesNotWedgeFanOut(t *testing.T) { abandoned, cancelAbandoned := m.Fork() - // Consumer reads one value then exits, modeling a disconnected client. + // Consumer reads one value then exits. abandonReader := make(chan struct{}) go func() { <-abandoned @@ -110,7 +109,7 @@ func TestMultiplexedChannel_AbandonedConsumerDoesNotWedgeFanOut(t *testing.T) { t.Fatal("abandoned consumer should have read its single value") } - // Simulate the handler's deferred cancel after return. + // Cancel the subscriber. cancelDone := make(chan struct{}) go func() { cancelAbandoned() @@ -122,9 +121,7 @@ func TestMultiplexedChannel_AbandonedConsumerDoesNotWedgeFanOut(t *testing.T) { t.Fatal("cancel func did not return promptly; fan-out is wedged") } - // With no subscribers, the fan-out stops draining Source to apply - // back-pressure. After the in-flight value is consumed, Source - // fills and further sends block. + // With no subscribers, back-pressure kicks in and further sends block. sent := 0 for i := 2; i <= 8; i++ { if !sendOrTimeout(t, m.Source, i, 100*time.Millisecond) { @@ -132,8 +129,7 @@ func TestMultiplexedChannel_AbandonedConsumerDoesNotWedgeFanOut(t *testing.T) { } sent++ } - // The buffer (size 1) plus at most one in-flight value means - // at most ~2 sends succeed before blocking. + // Buffer (size 1) plus at most one in-flight value. assert.LessOrEqual(t, sent, 3, "fan-out should stop draining Source when no subscribers remain (back-pressure)") @@ -146,7 +142,7 @@ func TestMultiplexedChannel_AbandonedConsumerDoesNotWedgeFanOut(t *testing.T) { } } -// Regression: an abandoned consumer must not starve other subscribers. +// An abandoned consumer must not starve other subscribers. func TestMultiplexedChannel_AbandonedConsumerDoesNotStarveOthers(t *testing.T) { t.Parallel() @@ -191,7 +187,7 @@ func TestMultiplexedChannel_AbandonedConsumerDoesNotStarveOthers(t *testing.T) { } } -// cancel must be idempotent and non-blocking even under producer load. +// Cancel must be idempotent and non-blocking even under producer load. func TestMultiplexedChannel_CancelIsIdempotentAndPrompt(t *testing.T) { t.Parallel() @@ -199,7 +195,7 @@ func TestMultiplexedChannel_CancelIsIdempotentAndPrompt(t *testing.T) { _, cancel := m.Fork() - // Concurrently push values without anyone draining the consumer chan. + // Push values without anyone draining. stop := make(chan struct{}) producerDone := make(chan struct{}) go func() { @@ -286,44 +282,23 @@ func TestMultiplexedChannel_ForkAfterSourceCloseReturnsClosedChan(t *testing.T) assert.False(t, ok, "Fork after shutdown must return a pre-closed channel") } -// Regression: sending to Source after the last subscriber cancelled -// must not deadlock. This reproduces the bug where handler.Wait() -// calls `p.EndEvent.Source <- event` after the handleStart/handleConnect -// handler has returned and run `defer endCancel()`. -// -// The fix is to give EndEvent a buffer of 1 so the single send in -// Wait() always succeeds, and then call CloseSource() so the fan-out -// goroutine can exit cleanly. +// Sending to a buffered Source after the last subscriber cancelled must +// not deadlock. This mirrors the EndEvent pattern in handler.Wait(). func TestMultiplexedChannel_SendToSourceAfterLastCancelDoesNotDeadlock(t *testing.T) { t.Parallel() - // Mirror EndEvent: buffer=1 (the fix — was 0 before). m := NewMultiplexedChannel[int](1) _, cancel := m.Fork() - - // Cancel the only subscriber immediately, without ever - // consuming a value. This simulates the case where - // handleStart returns (running `defer endCancel()`) before - // the child process exits and Wait() tries to send the - // EndEvent. cancel() - // Give the fan-out goroutine time to re-enter receiveWhenReady, - // observe HasSubscribers()==false, and park on <-sig. time.Sleep(50 * time.Millisecond) - // Now attempt to send — this models handler.Wait()'s - // `p.EndEvent.Source <- event`. With buffer=1 the send - // succeeds immediately even though no subscriber is draining. sent := sendOrTimeout(t, m.Source, 1, 2*time.Second) if !sent { - t.Fatal("Source send deadlocked after last subscriber cancelled; " + - "EndEvent must use buffer >= 1 so Wait() never blocks") + t.Fatal("Source send deadlocked after last subscriber cancelled") } - // CloseSource wakes the fan-out so it drains the buffered - // value and exits. Without this the fan-out goroutine leaks. m.CloseSource() select { @@ -333,12 +308,8 @@ func TestMultiplexedChannel_SendToSourceAfterLastCancelDoesNotDeadlock(t *testin } } -// Regression: CloseSource after the last subscriber is cancelled must -// not leak the fan-out goroutine. This reproduces the pattern from -// start.go where startCancel() runs before CloseSource() (LIFO defer -// order). Previously start.go used bare close(Source) which bypassed -// the closed flag and NotifySubscriberChange, leaving the fan-out -// goroutine parked on <-sig forever. +// CloseSource after the last subscriber is cancelled must not leak +// the fan-out goroutine. Mirrors the LIFO defer order in handleStart. func TestMultiplexedChannel_CloseSourceAfterCancelDoesNotLeakFanOut(t *testing.T) { t.Parallel() @@ -348,11 +319,8 @@ func TestMultiplexedChannel_CloseSourceAfterCancelDoesNotLeakFanOut(t *testing.T m := NewMultiplexedChannel[int](0) _, cancel := m.Fork() - // Simulate LIFO defer order in handleStart: - // 1. startCancel() runs first — removes subscriber - // 2. CloseSource() runs second (the fix — was bare close before) cancel() - time.Sleep(10 * time.Millisecond) // let fan-out park on <-sig + time.Sleep(10 * time.Millisecond) m.CloseSource() select { @@ -363,7 +331,7 @@ func TestMultiplexedChannel_CloseSourceAfterCancelDoesNotLeakFanOut(t *testing.T } } -// Fan-out goroutine must exit after cancelled subscribers and CloseSource. +// Fan-out goroutine exits after cancelled subscribers and CloseSource. func TestMultiplexedChannel_NoGoroutineLeakOnAbandon(t *testing.T) { t.Parallel() From 3026c2c31489e514833ac404063352a4cf0550e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Van=C4=9Bk?= Date: Mon, 11 May 2026 14:44:38 +0200 Subject: [PATCH 11/17] fix(envd): grab subSignal before checking conditions in receiveWhenReady Avoids a race where a subscriber added between HasSubscribers() and fetching subSignal could leave the fan-out parked on a fresh unclosed signal channel. --- .../internal/services/process/handler/multiplex.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/packages/envd/internal/services/process/handler/multiplex.go b/packages/envd/internal/services/process/handler/multiplex.go index 84567914f4..10322467be 100644 --- a/packages/envd/internal/services/process/handler/multiplex.go +++ b/packages/envd/internal/services/process/handler/multiplex.go @@ -115,18 +115,13 @@ func (m *MultiplexedChannel[T]) run() { // so the fan-out loop wakes up and observes the closed channel. func (m *MultiplexedChannel[T]) receiveWhenReady() (v T, ok bool) { for { - if m.HasSubscribers() { - v, ok = <-m.Source - - return v, ok - } - - // No active subscribers — wait for a change notification. + // Grab the signal before checking conditions so any change + // that happens after the check closes the channel we wait on. m.subMu.Lock() sig := m.subSignal m.subMu.Unlock() - if m.closed.Load() { + if m.HasSubscribers() || m.closed.Load() { v, ok = <-m.Source return v, ok From abb4404c9f5b73d643c798f633b9a6acd889ce55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Van=C4=9Bk?= Date: Mon, 11 May 2026 16:12:07 +0200 Subject: [PATCH 12/17] fix(envd): prevent output truncation on fast commands cmd.StdoutPipe/StderrPipe are managed by cmd.Wait which closes the pipe read-ends on return, racing with readers that haven't finished. Replace them with manual os.Pipe so we control the lifecycle: write-ends are closed after Start (child inherited them), read-ends stay open until readers finish naturally via EOF. After cmd.Wait reaps the child, call Drain() on the data multiplexer to disable back-pressure, letting stuck readers unblock and see EOF. Then wait for all readers to exit before proceeding. --- .../services/process/handler/handler.go | 68 ++++++++++++------- .../services/process/handler/multiplex.go | 9 +++ .../internal/services/process/start_test.go | 39 +++++++++++ 3 files changed, 93 insertions(+), 23 deletions(-) diff --git a/packages/envd/internal/services/process/handler/handler.go b/packages/envd/internal/services/process/handler/handler.go index 34bc1a1da3..199d437050 100644 --- a/packages/envd/internal/services/process/handler/handler.go +++ b/packages/envd/internal/services/process/handler/handler.go @@ -54,8 +54,9 @@ type Handler struct { outCtx context.Context //nolint:containedctx // todo: refactor so this can be removed outCancel context.CancelFunc - stdinMu sync.Mutex - stdin io.WriteCloser + stdinMu sync.Mutex + stdin io.WriteCloser + pipeRead []*os.File // read-ends of stdout/stderr; closed in Wait DataEvent *MultiplexedChannel[rpc.ProcessEvent_Data] EndEvent *MultiplexedChannel[rpc.ProcessEvent_End] @@ -214,11 +215,7 @@ func New( }, } - select { - case outMultiplex.Source <- event: - case <-outCtx.Done(): - return - } + outMultiplex.Source <- event } if errors.Is(readErr, io.EOF) { @@ -235,11 +232,14 @@ func New( h.tty = tty } else { - stdout, err := cmd.StdoutPipe() + stdoutR, stdoutW, err := os.Pipe() if err != nil { return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("error creating stdout pipe for command '%s': %w", userCmd, err)) } + cmd.Stdout = stdoutW + stdout := stdoutR + outWg.Go(func() { stdoutLogs := make(chan []byte, outputBufferSize) defer close(stdoutLogs) @@ -262,11 +262,7 @@ func New( }, } - select { - case outMultiplex.Source <- event: - case <-outCtx.Done(): - return - } + outMultiplex.Source <- event stdoutLogs <- buf[:n] } @@ -283,11 +279,16 @@ func New( } }) - stderr, err := cmd.StderrPipe() + stderrR, stderrW, err := os.Pipe() if err != nil { return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("error creating stderr pipe for command '%s': %w", userCmd, err)) } + cmd.Stderr = stderrW + stderr := stderrR + + h.pipeRead = []*os.File{stdoutR, stderrR} + outWg.Go(func() { stderrLogs := make(chan []byte, outputBufferSize) defer close(stderrLogs) @@ -310,11 +311,7 @@ func New( }, } - select { - case outMultiplex.Source <- event: - case <-outCtx.Done(): - return - } + outMultiplex.Source <- event stderrLogs <- buf[:n] } @@ -444,6 +441,16 @@ func (p *Handler) Start(requestTimeout time.Duration) (uint32, error) { if err != nil { return 0, fmt.Errorf("error starting process '%s': %w", p.userCommand(), err) } + + // Close parent's copy of the write-ends so readers see EOF + // when the child (and any orphan grandchildren) exit. + if p.cmd.Stdout != nil { + p.cmd.Stdout.(*os.File).Close() + } + + if p.cmd.Stderr != nil { + p.cmd.Stderr.(*os.File).Close() + } } p.logger. @@ -458,11 +465,26 @@ func (p *Handler) Start(requestTimeout time.Duration) (uint32, error) { } func (p *Handler) Wait() { - // cmd.Wait reaps the child and closes the pipe read-ends. - // Then we cancel outCtx to unblock any reader goroutine that - // is blocked on a full Source channel send (back-pressure). + // Reap the child. With manual os.Pipe, cmd.Wait does not + // close our read-ends, so readers can finish draining. err := p.cmd.Wait() - p.outCancel() + + // Disable back-pressure so any reader stuck on a full Source + // send unblocks, loops back, and sees EOF. + p.DataEvent.Drain() + + // Wait for readers to finish. If they don't exit promptly + // (orphan grandchildren keeping the pipe open), close the + // read-ends to force them out. + select { + case <-p.outCtx.Done(): + case <-time.After(5 * time.Second): + for _, f := range p.pipeRead { + f.Close() + } + + <-p.outCtx.Done() + } p.tty.Close() diff --git a/packages/envd/internal/services/process/handler/multiplex.go b/packages/envd/internal/services/process/handler/multiplex.go index 10322467be..c1c18e7cd7 100644 --- a/packages/envd/internal/services/process/handler/multiplex.go +++ b/packages/envd/internal/services/process/handler/multiplex.go @@ -131,6 +131,15 @@ func (m *MultiplexedChannel[T]) receiveWhenReady() (v T, ok bool) { } } +// Drain disables back-pressure so the fan-out drains Source even +// with no subscribers. Call this when the producer is done (e.g. +// child process exited) so readers can flush their last chunks +// without blocking. Source remains open for further sends. +func (m *MultiplexedChannel[T]) Drain() { + m.closed.Store(true) + m.NotifySubscriberChange() +} + // CloseSource closes the Source channel and wakes the fan-out loop. func (m *MultiplexedChannel[T]) CloseSource() { m.closed.Store(true) diff --git a/packages/envd/internal/services/process/start_test.go b/packages/envd/internal/services/process/start_test.go index 389cb82933..323a9fdc4b 100644 --- a/packages/envd/internal/services/process/start_test.go +++ b/packages/envd/internal/services/process/start_test.go @@ -7,6 +7,7 @@ import ( "net/http/httptest" "os" "os/user" + "strings" "testing" "time" @@ -173,6 +174,44 @@ func TestStart_ProcessSurvivesClientDisconnect(t *testing.T) { _ = proc.Kill() } +// TestStart_FastCommandOutputNotTruncated verifies that a fast command's +// stdout is fully delivered. This catches the bug where cmd.Wait() closing +// the pipe races with outCancel(), causing the reader to drop its last chunk. +func TestStart_FastCommandOutputNotTruncated(t *testing.T) { + t.Parallel() + + client, cleanup := newTestService(t) + defer cleanup() + + const expected = "hello-truncation-test\n" + + for i := range 50 { + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) + + stream, err := client.Start(ctx, connect.NewRequest(&rpc.StartRequest{ + Process: &rpc.ProcessConfig{ + Cmd: "echo", + Args: []string{"hello-truncation-test"}, + }, + })) + require.NoError(t, err) + + var stdout strings.Builder + for stream.Receive() { + if data := stream.Msg().GetEvent().GetData(); data != nil { + if out := data.GetStdout(); out != nil { + stdout.Write(out) + } + } + } + require.NoError(t, stream.Err()) + _ = stream.Close() + cancel() + + require.Equalf(t, expected, stdout.String(), "iteration %d: output mismatch", i) + } +} + // processAlive checks whether a process with the given PID exists. func processAlive(pid int) bool { // /proc//stat exists iff the process is alive (Linux-specific). From 4e24efaeeaaeb9a0d7925612d14d8b1434f7c902 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Van=C4=9Bk?= Date: Tue, 12 May 2026 10:39:14 +0200 Subject: [PATCH 13/17] fix(envd): fix EndEvent delivery race when process is killed with orphan children MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After cmd.Wait() reaps the child, use SetReadDeadline on the pipe read-ends so readers drain any buffered data (reads with available data return instantly) then exit on deadline instead of blocking forever when an orphan grandchild holds the write-end open. Readers treat the deadline timeout the same as EOF — clean exit, no data loss. Add a dedicated readersDone channel that closes when stdout/stderr reader goroutines actually exit, replacing the outCtx/outCancel mechanism which is no longer needed. Fixes TestStart_OrphanGrandchildDoesNotHangStream and the TestCommandKillNextApp CI failure. --- .../services/process/handler/handler.go | 55 +++++++------------ 1 file changed, 21 insertions(+), 34 deletions(-) diff --git a/packages/envd/internal/services/process/handler/handler.go b/packages/envd/internal/services/process/handler/handler.go index 199d437050..6d30a0122b 100644 --- a/packages/envd/internal/services/process/handler/handler.go +++ b/packages/envd/internal/services/process/handler/handler.go @@ -51,8 +51,7 @@ type Handler struct { cancel context.CancelFunc - outCtx context.Context //nolint:containedctx // todo: refactor so this can be removed - outCancel context.CancelFunc + readersDone chan struct{} // closed when stdout/stderr reader goroutines have exited stdinMu sync.Mutex stdin io.WriteCloser @@ -173,20 +172,15 @@ func New( var outWg sync.WaitGroup - // Create a context for waiting for and cancelling output pipes. - // Cancellation of the process via timeout will propagate and cancel this context too. - outCtx, outCancel := context.WithCancel(ctx) - h := &Handler{ - Config: req.GetProcess(), - cmd: cmd, - Tag: req.Tag, - DataEvent: outMultiplex, - cancel: cancel, - outCtx: outCtx, - outCancel: outCancel, - EndEvent: NewMultiplexedChannel[rpc.ProcessEvent_End](1), - logger: logger, + Config: req.GetProcess(), + cmd: cmd, + Tag: req.Tag, + DataEvent: outMultiplex, + cancel: cancel, + readersDone: make(chan struct{}), + EndEvent: NewMultiplexedChannel[rpc.ProcessEvent_End](1), + logger: logger, } if req.GetPty() != nil { @@ -267,7 +261,7 @@ func New( stdoutLogs <- buf[:n] } - if errors.Is(readErr, io.EOF) { + if errors.Is(readErr, io.EOF) || os.IsTimeout(readErr) { break } @@ -316,7 +310,7 @@ func New( stderrLogs <- buf[:n] } - if errors.Is(readErr, io.EOF) { + if errors.Is(readErr, io.EOF) || os.IsTimeout(readErr) { break } @@ -343,9 +337,9 @@ func New( go func() { outWg.Wait() - outMultiplex.CloseSource() + close(h.readersDone) - outCancel() + outMultiplex.CloseSource() }() return h, nil @@ -364,10 +358,6 @@ func (p *Handler) SendSignal(signal syscall.Signal) error { return errors.New("process not started") } - if signal == syscall.SIGKILL || signal == syscall.SIGTERM { - p.outCancel() - } - return p.cmd.Process.Signal(signal) } @@ -473,19 +463,16 @@ func (p *Handler) Wait() { // send unblocks, loops back, and sees EOF. p.DataEvent.Drain() - // Wait for readers to finish. If they don't exit promptly - // (orphan grandchildren keeping the pipe open), close the - // read-ends to force them out. - select { - case <-p.outCtx.Done(): - case <-time.After(5 * time.Second): - for _, f := range p.pipeRead { - f.Close() - } - - <-p.outCtx.Done() + // Set a read deadline on the pipe read-ends so readers drain + // any buffered data (reads with available data return instantly) + // and then exit cleanly instead of blocking forever when an + // orphan grandchild holds the write-end open. + for _, f := range p.pipeRead { + f.SetReadDeadline(time.Now().Add(1 * time.Second)) } + <-p.readersDone + p.tty.Close() var errMsg *string From ab0315bc3e6c7a286141edd9ad0b5e769422f498 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Van=C4=9Bk?= Date: Tue, 12 May 2026 17:05:34 +0200 Subject: [PATCH 14/17] test(envd): add tests for output truncation and orphan grandchild Add TestStart_OrphanGrandchildDoesNotHangStream: verifies that killing a process whose grandchild holds stdout open delivers the EndEvent within the stream timeout instead of hanging. --- .../internal/services/process/start_test.go | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/packages/envd/internal/services/process/start_test.go b/packages/envd/internal/services/process/start_test.go index 323a9fdc4b..13a5210b89 100644 --- a/packages/envd/internal/services/process/start_test.go +++ b/packages/envd/internal/services/process/start_test.go @@ -212,6 +212,55 @@ func TestStart_FastCommandOutputNotTruncated(t *testing.T) { } } +// TestStart_OrphanGrandchildDoesNotHangStream verifies that killing a +// process whose child still holds stdout open does not hang the stream. +// The stream must deliver the EndEvent within a reasonable time. +func TestStart_OrphanGrandchildDoesNotHangStream(t *testing.T) { + t.Parallel() + + client, cleanup := newTestService(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(t.Context(), 30*time.Second) + defer cancel() + + // bash spawns a background child that inherits stdout and keeps + // the pipe open after the parent is killed. + stream, err := client.Start(ctx, connect.NewRequest(&rpc.StartRequest{ + Process: &rpc.ProcessConfig{ + Cmd: "bash", + Args: []string{"-c", "sleep 300 & wait"}, + }, + })) + require.NoError(t, err) + + // Wait for start event to get PID. + require.True(t, stream.Receive(), "expected start event") + startEvt := stream.Msg().GetEvent().GetStart() + require.NotNil(t, startEvt) + pid := startEvt.GetPid() + + // Kill the process via SendSignal (same as TestCommandKillNextApp). + _, err = client.SendSignal(ctx, connect.NewRequest(&rpc.SendSignalRequest{ + Signal: rpc.Signal_SIGNAL_SIGKILL, + Process: &rpc.ProcessSelector{ + Selector: &rpc.ProcessSelector_Pid{Pid: pid}, + }, + })) + require.NoError(t, err) + + var gotEnd bool + for stream.Receive() { + if stream.Msg().GetEvent().GetEnd() != nil { + gotEnd = true + } + } + require.NoError(t, stream.Err()) + _ = stream.Close() + + assert.True(t, gotEnd, "stream should deliver EndEvent even with orphan grandchild") +} + // processAlive checks whether a process with the given PID exists. func processAlive(pid int) bool { // /proc//stat exists iff the process is alive (Linux-specific). From 882b36da26ed3c5f4326ecd7c341b7364ce74e9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Van=C4=9Bk?= Date: Tue, 12 May 2026 20:02:58 +0200 Subject: [PATCH 15/17] fix(envd): close done channel under lock to prevent orphaned subscriber on Fork MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move close(m.done) inside the m.mu critical section in run() so that Fork()'s re-check under the same lock always observes the shutdown. Previously, close(m.done) happened after Unlock, creating a window where Fork could add a subscriber that run() never cleans up — leaving the channel open forever and hanging consumers. --- packages/envd/internal/services/process/handler/multiplex.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/envd/internal/services/process/handler/multiplex.go b/packages/envd/internal/services/process/handler/multiplex.go index c1c18e7cd7..81c4a2c683 100644 --- a/packages/envd/internal/services/process/handler/multiplex.go +++ b/packages/envd/internal/services/process/handler/multiplex.go @@ -93,6 +93,7 @@ func (m *MultiplexedChannel[T]) run() { // Close all remaining consumer channels so `for range` loops exit. m.mu.Lock() + defer m.mu.Unlock() for _, s := range m.channels { s.cancel() @@ -100,8 +101,6 @@ func (m *MultiplexedChannel[T]) run() { } m.channels = nil - m.mu.Unlock() - // Signal that run() has finished. Fork() uses this to detect shutdown. close(m.done) } From c342710f12ab04c4152d2d3eeab1f2a4ce6d866e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Van=C4=9Bk?= Date: Tue, 12 May 2026 20:04:56 +0200 Subject: [PATCH 16/17] fix(envd): close pipe read-ends after readers finish in Wait Readers exit after the SetReadDeadline timeout fires, but the read-end file descriptors were never closed. Each non-PTY process leaked two fds. Close them after readersDone signals. --- packages/envd/internal/services/process/handler/handler.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/envd/internal/services/process/handler/handler.go b/packages/envd/internal/services/process/handler/handler.go index 6d30a0122b..db6522ded6 100644 --- a/packages/envd/internal/services/process/handler/handler.go +++ b/packages/envd/internal/services/process/handler/handler.go @@ -55,7 +55,7 @@ type Handler struct { stdinMu sync.Mutex stdin io.WriteCloser - pipeRead []*os.File // read-ends of stdout/stderr; closed in Wait + pipeRead []*os.File // read-ends of stdout/stderr DataEvent *MultiplexedChannel[rpc.ProcessEvent_Data] EndEvent *MultiplexedChannel[rpc.ProcessEvent_End] @@ -235,6 +235,8 @@ func New( stdout := stdoutR outWg.Go(func() { + defer stdout.Close() + stdoutLogs := make(chan []byte, outputBufferSize) defer close(stdoutLogs) @@ -284,6 +286,8 @@ func New( h.pipeRead = []*os.File{stdoutR, stderrR} outWg.Go(func() { + defer stderr.Close() + stderrLogs := make(chan []byte, outputBufferSize) defer close(stderrLogs) From 360254ed54f18325f68c6648ec79bbca70868dec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Van=C4=9Bk?= Date: Tue, 12 May 2026 22:13:48 +0200 Subject: [PATCH 17/17] fix(envd): apply read deadline to PTY so orphan grandchild does not hang Wait The SetReadDeadline escape mechanism only applied to non-PTY pipe read-ends. For PTY processes, the reader goroutine would block on tty.Read() forever when an orphan grandchild held the PTY slave open, deadlocking Wait() at <-readersDone. Set the same deadline on p.tty and treat timeout as EOF in the PTY reader loop. --- .../internal/services/process/handler/handler.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/packages/envd/internal/services/process/handler/handler.go b/packages/envd/internal/services/process/handler/handler.go index db6522ded6..b3ad0cc511 100644 --- a/packages/envd/internal/services/process/handler/handler.go +++ b/packages/envd/internal/services/process/handler/handler.go @@ -212,7 +212,7 @@ func New( outMultiplex.Source <- event } - if errors.Is(readErr, io.EOF) { + if errors.Is(readErr, io.EOF) || os.IsTimeout(readErr) { break } @@ -467,12 +467,18 @@ func (p *Handler) Wait() { // send unblocks, loops back, and sees EOF. p.DataEvent.Drain() - // Set a read deadline on the pipe read-ends so readers drain + // Set a read deadline on the pipe/pty read-ends so readers drain // any buffered data (reads with available data return instantly) // and then exit cleanly instead of blocking forever when an // orphan grandchild holds the write-end open. + deadline := time.Now().Add(1 * time.Second) + for _, f := range p.pipeRead { - f.SetReadDeadline(time.Now().Add(1 * time.Second)) + f.SetReadDeadline(deadline) + } + + if p.tty != nil { + p.tty.SetReadDeadline(deadline) } <-p.readersDone