Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
85469b2
test(envd): add test demonstrating orphan process leak on client disc…
arkamar May 6, 2026
48912fc
test(envd): add disconnect storm heap growth test
arkamar May 6, 2026
7e876a1
fix(envd): back-pressure child process when no subscribers are connected
arkamar May 6, 2026
8a70e86
test(envd): add reproducer for EndEvent.Source deadlock when all subs…
arkamar May 7, 2026
1b370f8
test(envd): add reproducer for fan-out goroutine leak on direct close…
arkamar May 7, 2026
5293a9d
fix(envd): prevent EndEvent deadlock and startMultiplexer fan-out leak
arkamar May 7, 2026
0636aec
fix(envd): close TOCTOU race in receiveWhenReady that deadlocked fan-out
arkamar May 7, 2026
043dc2e
test(envd): remove flaky DisconnectStormHeapGrowth test
arkamar May 7, 2026
e34e724
chore(envd): bump version to 0.5.18
arkamar May 11, 2026
c55adcc
test(envd): clean up test comments in multiplex_test.go
arkamar May 11, 2026
3026c2c
fix(envd): grab subSignal before checking conditions in receiveWhenReady
arkamar May 11, 2026
abb4404
fix(envd): prevent output truncation on fast commands
arkamar May 11, 2026
4e24efa
fix(envd): fix EndEvent delivery race when process is killed with orp…
arkamar May 12, 2026
ab0315b
test(envd): add tests for output truncation and orphan grandchild
arkamar May 12, 2026
882b36d
fix(envd): close done channel under lock to prevent orphaned subscrib…
arkamar May 12, 2026
c342710
fix(envd): close pipe read-ends after readers finish in Wait
arkamar May 12, 2026
360254e
fix(envd): apply read deadline to PTY so orphan grandchild does not h…
arkamar May 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 73 additions & 34 deletions packages/envd/internal/services/process/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ type Handler struct {

cancel context.CancelFunc

outCtx context.Context //nolint:containedctx // todo: refactor so this can be removed
outCancel context.CancelFunc
readersDone chan struct{} // closed when stdout/stderr reader goroutines have exited

stdinMu sync.Mutex
stdin io.WriteCloser
stdinMu sync.Mutex
stdin io.WriteCloser
pipeRead []*os.File // read-ends of stdout/stderr

DataEvent *MultiplexedChannel[rpc.ProcessEvent_Data]
EndEvent *MultiplexedChannel[rpc.ProcessEvent_End]
Expand Down Expand Up @@ -172,20 +172,15 @@ func New(

var outWg sync.WaitGroup

// Create a context for waiting for and cancelling output pipes.
// Cancellation of the process via timeout will propagate and cancel this context too.
outCtx, outCancel := context.WithCancel(ctx)

h := &Handler{
Config: req.GetProcess(),
cmd: cmd,
Tag: req.Tag,
DataEvent: outMultiplex,
cancel: cancel,
outCtx: outCtx,
outCancel: outCancel,
EndEvent: NewMultiplexedChannel[rpc.ProcessEvent_End](0),
logger: logger,
Config: req.GetProcess(),
cmd: cmd,
Tag: req.Tag,
DataEvent: outMultiplex,
cancel: cancel,
readersDone: make(chan struct{}),
EndEvent: NewMultiplexedChannel[rpc.ProcessEvent_End](1),
logger: logger,
}

if req.GetPty() != nil {
Expand All @@ -206,16 +201,18 @@ func New(
n, readErr := tty.Read(buf)

if n > 0 {
outMultiplex.Source <- rpc.ProcessEvent_Data{
event := rpc.ProcessEvent_Data{
Data: &rpc.ProcessEvent_DataEvent{
Output: &rpc.ProcessEvent_DataEvent_Pty{
Pty: buf[:n],
},
},
}

outMultiplex.Source <- event
}

if errors.Is(readErr, io.EOF) {
if errors.Is(readErr, io.EOF) || os.IsTimeout(readErr) {
break
}

Expand All @@ -229,12 +226,17 @@ func New(

h.tty = tty
} else {
stdout, err := cmd.StdoutPipe()
stdoutR, stdoutW, err := os.Pipe()
if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("error creating stdout pipe for command '%s': %w", userCmd, err))
}

cmd.Stdout = stdoutW
stdout := stdoutR

outWg.Go(func() {
defer stdout.Close()

stdoutLogs := make(chan []byte, outputBufferSize)
defer close(stdoutLogs)

Expand All @@ -248,18 +250,20 @@ func New(
n, readErr := stdout.Read(buf)

if n > 0 {
outMultiplex.Source <- rpc.ProcessEvent_Data{
event := rpc.ProcessEvent_Data{
Data: &rpc.ProcessEvent_DataEvent{
Output: &rpc.ProcessEvent_DataEvent_Stdout{
Stdout: buf[:n],
},
},
}

outMultiplex.Source <- event

stdoutLogs <- buf[:n]
}

if errors.Is(readErr, io.EOF) {
if errors.Is(readErr, io.EOF) || os.IsTimeout(readErr) {
break
}

Expand All @@ -271,12 +275,19 @@ func New(
}
})

stderr, err := cmd.StderrPipe()
stderrR, stderrW, err := os.Pipe()
if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("error creating stderr pipe for command '%s': %w", userCmd, err))
}

cmd.Stderr = stderrW
stderr := stderrR

h.pipeRead = []*os.File{stdoutR, stderrR}

outWg.Go(func() {
defer stderr.Close()

stderrLogs := make(chan []byte, outputBufferSize)
defer close(stderrLogs)

Expand All @@ -290,18 +301,20 @@ func New(
n, readErr := stderr.Read(buf)

if n > 0 {
outMultiplex.Source <- rpc.ProcessEvent_Data{
event := rpc.ProcessEvent_Data{
Data: &rpc.ProcessEvent_DataEvent{
Output: &rpc.ProcessEvent_DataEvent_Stderr{
Stderr: buf[:n],
},
},
}

outMultiplex.Source <- event

stderrLogs <- buf[:n]
}

if errors.Is(readErr, io.EOF) {
if errors.Is(readErr, io.EOF) || os.IsTimeout(readErr) {
break
}

Expand All @@ -328,9 +341,9 @@ func New(
go func() {
outWg.Wait()

close(outMultiplex.Source)
close(h.readersDone)

outCancel()
outMultiplex.CloseSource()
}()

return h, nil
Expand All @@ -349,10 +362,6 @@ func (p *Handler) SendSignal(signal syscall.Signal) error {
return errors.New("process not started")
}

if signal == syscall.SIGKILL || signal == syscall.SIGTERM {
p.outCancel()
}

return p.cmd.Process.Signal(signal)
}

Expand Down Expand Up @@ -426,6 +435,16 @@ func (p *Handler) Start(requestTimeout time.Duration) (uint32, error) {
if err != nil {
return 0, fmt.Errorf("error starting process '%s': %w", p.userCommand(), err)
}

Comment thread
arkamar marked this conversation as resolved.
// Close parent's copy of the write-ends so readers see EOF
// when the child (and any orphan grandchildren) exit.
if p.cmd.Stdout != nil {
p.cmd.Stdout.(*os.File).Close()
}

if p.cmd.Stderr != nil {
p.cmd.Stderr.(*os.File).Close()
}
}

p.logger.
Expand All @@ -440,11 +459,30 @@ func (p *Handler) Start(requestTimeout time.Duration) (uint32, error) {
}

func (p *Handler) Wait() {
// Wait for the output pipes to be closed or cancelled.
<-p.outCtx.Done()

// Reap the child. With manual os.Pipe, cmd.Wait does not
// close our read-ends, so readers can finish draining.
err := p.cmd.Wait()

// Disable back-pressure so any reader stuck on a full Source
// send unblocks, loops back, and sees EOF.
p.DataEvent.Drain()

// Set a read deadline on the pipe/pty read-ends so readers drain
// any buffered data (reads with available data return instantly)
// and then exit cleanly instead of blocking forever when an
// orphan grandchild holds the write-end open.
deadline := time.Now().Add(1 * time.Second)

for _, f := range p.pipeRead {
f.SetReadDeadline(deadline)
}

if p.tty != nil {
p.tty.SetReadDeadline(deadline)
}
Comment thread
cursor[bot] marked this conversation as resolved.

<-p.readersDone
Comment thread
cursor[bot] marked this conversation as resolved.

p.tty.Close()

var errMsg *string
Expand All @@ -466,6 +504,7 @@ func (p *Handler) Wait() {
}

p.EndEvent.Source <- event
p.EndEvent.CloseSource()

p.logger.
Info().
Expand Down
Loading
Loading