Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions packages/envd/internal/services/process/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,21 +242,28 @@ func New(

go logs.LogBufferedDataEvents(stdoutLogs, &stdoutLogger, "data")

// Reusable read buffer — avoids allocating 32 KiB per Read
// cycle when no subscribers are connected.
readBuf := make([]byte, stdChunkSize)

for {
buf := make([]byte, stdChunkSize)
n, readErr := stdout.Read(readBuf)

n, readErr := stdout.Read(buf)
if n > 0 && outMultiplex.HasSubscribers() {
// Copy data out of readBuf so the next Read
// doesn't overwrite it during delivery.
data := make([]byte, n)
copy(data, readBuf[:n])

if n > 0 {
outMultiplex.Source <- rpc.ProcessEvent_Data{
Data: &rpc.ProcessEvent_DataEvent{
Output: &rpc.ProcessEvent_DataEvent_Stdout{
Stdout: buf[:n],
Stdout: data,
},
},
}

stdoutLogs <- buf[:n]
stdoutLogs <- data
Comment thread
arkamar marked this conversation as resolved.
}
Comment thread
arkamar marked this conversation as resolved.

Comment thread
arkamar marked this conversation as resolved.
if errors.Is(readErr, io.EOF) {
Expand Down Expand Up @@ -284,21 +291,24 @@ func New(

go logs.LogBufferedDataEvents(stderrLogs, &stderrLogger, "data")

readBuf := make([]byte, stdChunkSize)

for {
buf := make([]byte, stdChunkSize)
n, readErr := stderr.Read(readBuf)

n, readErr := stderr.Read(buf)
if n > 0 && outMultiplex.HasSubscribers() {
data := make([]byte, n)
copy(data, readBuf[:n])

if n > 0 {
outMultiplex.Source <- rpc.ProcessEvent_Data{
Data: &rpc.ProcessEvent_DataEvent{
Output: &rpc.ProcessEvent_DataEvent_Stderr{
Stderr: buf[:n],
Stderr: data,
},
},
}

stderrLogs <- buf[:n]
stderrLogs <- data
}
Comment thread
arkamar marked this conversation as resolved.

if errors.Is(readErr, io.EOF) {
Expand Down
39 changes: 33 additions & 6 deletions packages/envd/internal/services/process/handler/multiplex.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@ package handler

import (
"sync"
"sync/atomic"
)

// MultiplexedChannel fans out values written to Source to every subscriber
// obtained via Fork. Each subscriber send is guarded by a done channel so
// a cancelled consumer can never wedge the fan-out loop.
//
// When no subscribers are connected, the fan-out keeps consuming from
// Source and discards the values. Producers should check HasSubscribers
// before allocating expensive payloads to avoid GC pressure.
type MultiplexedChannel[T any] struct {
Source chan T

mu sync.RWMutex
channels []*subscriber[T]
exited atomic.Bool
done chan struct{} // closed when run() returns
}

type subscriber[T any] struct {
Expand Down Expand Up @@ -42,6 +45,7 @@ func (s *subscriber[T]) isCancelled() bool {
func NewMultiplexedChannel[T any](buffer int) *MultiplexedChannel[T] {
c := &MultiplexedChannel[T]{
Source: make(chan T, buffer),
done: make(chan struct{}),
}

go c.run()
Expand All @@ -51,6 +55,10 @@ func NewMultiplexedChannel[T any](buffer int) *MultiplexedChannel[T] {

// run is the fan-out loop. It delivers each Source value to every live
// subscriber and closes all consumer channels when Source is closed.
//
// Values are always consumed from Source. When no subscribers exist
// the value is silently dropped. This keeps the producer (reader
// goroutine) unblocked so the child process is never back-pressured.
func (m *MultiplexedChannel[T]) run() {
for v := range m.Source {
m.mu.RLock()
Expand All @@ -70,8 +78,6 @@ func (m *MultiplexedChannel[T]) run() {
}
}

m.exited.Store(true)

// Close all remaining consumer channels so `for range` loops exit.
m.mu.Lock()
defer m.mu.Unlock()
Expand All @@ -81,29 +87,50 @@ func (m *MultiplexedChannel[T]) run() {
close(s.ch)
}
m.channels = nil

// Signal that run() has finished. Fork() uses this to detect shutdown.
close(m.done)
}

// HasSubscribers reports whether any non-cancelled subscriber exists.
func (m *MultiplexedChannel[T]) HasSubscribers() bool {
m.mu.RLock()
defer m.mu.RUnlock()

for _, s := range m.channels {
if !s.isCancelled() {
return true
}
}

return false
}

// Fork registers a new subscriber and returns its channel plus a cancel func.
// If Source is already closed it returns a pre-closed channel and a no-op cancel.
// The channel is bidirectional for backwards compat with start.go which writes
// a bootstrap event into it; new callers should treat it as receive-only.
func (m *MultiplexedChannel[T]) Fork() (chan T, func()) {
if m.exited.Load() {
select {
case <-m.done:
ch := make(chan T)
close(ch)

return ch, func() {}
default:
}

m.mu.Lock()
defer m.mu.Unlock()

// Re-check under lock in case run() finished between the fast path and here.
if m.exited.Load() {
select {
case <-m.done:
ch := make(chan T)
close(ch)

return ch, func() {}
default:
}

s := &subscriber[T]{
Expand Down
120 changes: 79 additions & 41 deletions packages/envd/internal/services/process/handler/multiplex_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package handler

import (
"runtime"
"sync"
"testing"
"time"
Expand All @@ -10,8 +9,7 @@ import (
"github.com/stretchr/testify/require"
)

// Tests for MultiplexedChannel fan-out, covering the fix for the goroutine
// leak that occurred when a subscriber disconnected mid-send.
// Tests for MultiplexedChannel fan-out.

const multiplexTestTimeout = 500 * time.Millisecond

Expand Down Expand Up @@ -85,16 +83,17 @@ func TestMultiplexedChannel_BasicFanOut(t *testing.T) {
assert.Equal(t, []int{1, 2, 3}, gotB)
}

// Regression: an abandoned consumer must not wedge the fan-out loop.
// An abandoned consumer must not wedge the fan-out loop. After the
// subscriber is cancelled, the fan-out drains Source to void so
// subsequent sends must not block.
func TestMultiplexedChannel_AbandonedConsumerDoesNotWedgeFanOut(t *testing.T) {
t.Parallel()

m := NewMultiplexedChannel[int](1)
t.Cleanup(func() { close(m.Source) })

abandoned, cancelAbandoned := m.Fork()

// Consumer reads one value then exits, modeling a disconnected client.
// Consumer reads one value then exits.
abandonReader := make(chan struct{})
go func() {
<-abandoned
Expand All @@ -112,7 +111,7 @@ func TestMultiplexedChannel_AbandonedConsumerDoesNotWedgeFanOut(t *testing.T) {
t.Fatal("abandoned consumer should have read its single value")
}

// Simulate the handler's deferred cancel after return.
// Cancel the subscriber.
cancelDone := make(chan struct{})
go func() {
cancelAbandoned()
Expand All @@ -124,16 +123,24 @@ func TestMultiplexedChannel_AbandonedConsumerDoesNotWedgeFanOut(t *testing.T) {
t.Fatal("cancel func did not return promptly; fan-out is wedged")
}

// Producer should still make progress through Source.
// With no subscribers, fan-out drains to void — sends must not block.
for i := 2; i <= 8; i++ {
require.Truef(t,
sendOrTimeout(t, m.Source, i, multiplexTestTimeout),
"send %d should not be back-pressured by an abandoned consumer", i,
"send %d should not block when fan-out drains to void", i,
)
}

close(m.Source)

select {
case <-m.done:
case <-time.After(2 * time.Second):
t.Fatal("fan-out did not exit after CloseSource")
}
}

// Regression: an abandoned consumer must not starve other subscribers.
// An abandoned consumer must not starve other subscribers.
func TestMultiplexedChannel_AbandonedConsumerDoesNotStarveOthers(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -178,15 +185,15 @@ func TestMultiplexedChannel_AbandonedConsumerDoesNotStarveOthers(t *testing.T) {
}
}

// cancel must be idempotent and non-blocking even under producer load.
// Cancel must be idempotent and non-blocking even under producer load.
func TestMultiplexedChannel_CancelIsIdempotentAndPrompt(t *testing.T) {
t.Parallel()

m := NewMultiplexedChannel[int](0)

_, cancel := m.Fork()

// Concurrently push values without anyone draining the consumer chan.
// Push values without anyone draining.
stop := make(chan struct{})
producerDone := make(chan struct{})
go func() {
Expand Down Expand Up @@ -260,13 +267,10 @@ func TestMultiplexedChannel_ForkAfterSourceCloseReturnsClosedChan(t *testing.T)
m := NewMultiplexedChannel[int](0)
close(m.Source)

// Wait for the fan-out goroutine to observe Source close.
deadline := time.Now().Add(multiplexTestTimeout)
for !m.exited.Load() {
if time.Now().After(deadline) {
t.Fatal("fan-out did not mark itself exited after Source close")
}
time.Sleep(time.Millisecond)
select {
case <-m.done:
case <-time.After(multiplexTestTimeout):
t.Fatal("fan-out did not exit after Source close")
}

cons, cancel := m.Fork()
Expand All @@ -276,13 +280,60 @@ func TestMultiplexedChannel_ForkAfterSourceCloseReturnsClosedChan(t *testing.T)
assert.False(t, ok, "Fork after shutdown must return a pre-closed channel")
}

// Goroutine count must return to baseline after cancelled subscribers settle.
func TestMultiplexedChannel_NoGoroutineLeakOnAbandon(t *testing.T) { //nolint:paralleltest // relies on a stable goroutine count
const wedges = 16
// Sending to a buffered Source after the last subscriber cancelled must
// not deadlock. This mirrors the EndEvent pattern in handler.Wait().
func TestMultiplexedChannel_SendToSourceAfterLastCancelDoesNotDeadlock(t *testing.T) {
t.Parallel()

m := NewMultiplexedChannel[int](1)

_, cancel := m.Fork()
cancel()

time.Sleep(50 * time.Millisecond)
runtime.GC() //nolint:revive // intentional: settle goroutines before measuring baseline
before := runtime.NumGoroutine()

sent := sendOrTimeout(t, m.Source, 1, 2*time.Second)
if !sent {
t.Fatal("Source send deadlocked after last subscriber cancelled")
}

close(m.Source)

select {
case <-m.done:
case <-time.After(2 * time.Second):
t.Fatal("fan-out did not drain buffered value and exit after CloseSource")
}
}

// CloseSource after the last subscriber is cancelled must not leak
// the fan-out goroutine. Mirrors the LIFO defer order in handleStart.
func TestMultiplexedChannel_CloseSourceAfterCancelDoesNotLeakFanOut(t *testing.T) {
t.Parallel()

const iterations = 8

for range iterations {
m := NewMultiplexedChannel[int](0)
_, cancel := m.Fork()

cancel()
time.Sleep(10 * time.Millisecond)
close(m.Source)

select {
case <-m.done:
case <-time.After(2 * time.Second):
t.Fatal("fan-out goroutine did not exit after cancel-then-CloseSource")
}
}
}

// Fan-out goroutine exits after cancelled subscribers and CloseSource.
func TestMultiplexedChannel_NoGoroutineLeakOnAbandon(t *testing.T) {
t.Parallel()

const wedges = 16

for range wedges {
m := NewMultiplexedChannel[int](1)
Expand All @@ -291,24 +342,11 @@ func TestMultiplexedChannel_NoGoroutineLeakOnAbandon(t *testing.T) { //nolint:pa
m.Source <- 0
cancel()
close(m.Source)
}

// Allow scheduled goroutines to finish.
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
runtime.GC() //nolint:revive // intentional: help goroutines finalize
runtime.Gosched()
time.Sleep(20 * time.Millisecond)
if runtime.NumGoroutine() <= before+2 {
break
select {
case <-m.done:
case <-time.After(2 * time.Second):
t.Fatal("fan-out goroutine did not exit after cancel + CloseSource")
}
}

after := runtime.NumGoroutine()
leaked := after - before
// Small slack for runtime bookkeeping; the old bug leaked >= wedges.
assert.LessOrEqualf(t, leaked, 2,
"goroutine count grew by %d after %d cancelled wedges; "+
"before=%d after=%d (expected ~0)", leaked, wedges, before, after,
)
}
Loading
Loading