diff --git a/packages/envd/internal/services/process/handler/handler.go b/packages/envd/internal/services/process/handler/handler.go index 98108ea881..b3ad0cc511 100644 --- a/packages/envd/internal/services/process/handler/handler.go +++ b/packages/envd/internal/services/process/handler/handler.go @@ -51,11 +51,11 @@ type Handler struct { cancel context.CancelFunc - outCtx context.Context //nolint:containedctx // todo: refactor so this can be removed - outCancel context.CancelFunc + readersDone chan struct{} // closed when stdout/stderr reader goroutines have exited - stdinMu sync.Mutex - stdin io.WriteCloser + stdinMu sync.Mutex + stdin io.WriteCloser + pipeRead []*os.File // read-ends of stdout/stderr DataEvent *MultiplexedChannel[rpc.ProcessEvent_Data] EndEvent *MultiplexedChannel[rpc.ProcessEvent_End] @@ -172,20 +172,15 @@ func New( var outWg sync.WaitGroup - // Create a context for waiting for and cancelling output pipes. - // Cancellation of the process via timeout will propagate and cancel this context too. - outCtx, outCancel := context.WithCancel(ctx) - h := &Handler{ - Config: req.GetProcess(), - cmd: cmd, - Tag: req.Tag, - DataEvent: outMultiplex, - cancel: cancel, - outCtx: outCtx, - outCancel: outCancel, - EndEvent: NewMultiplexedChannel[rpc.ProcessEvent_End](0), - logger: logger, + Config: req.GetProcess(), + cmd: cmd, + Tag: req.Tag, + DataEvent: outMultiplex, + cancel: cancel, + readersDone: make(chan struct{}), + EndEvent: NewMultiplexedChannel[rpc.ProcessEvent_End](1), + logger: logger, } if req.GetPty() != nil { @@ -206,16 +201,18 @@ func New( n, readErr := tty.Read(buf) if n > 0 { - outMultiplex.Source <- rpc.ProcessEvent_Data{ + event := rpc.ProcessEvent_Data{ Data: &rpc.ProcessEvent_DataEvent{ Output: &rpc.ProcessEvent_DataEvent_Pty{ Pty: buf[:n], }, }, } + + outMultiplex.Source <- event } - if errors.Is(readErr, io.EOF) { + if errors.Is(readErr, io.EOF) || os.IsTimeout(readErr) { break } @@ -229,12 +226,17 @@ func New( h.tty = tty } else { - stdout, err := cmd.StdoutPipe() + stdoutR, stdoutW, err := os.Pipe() if err != nil { return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("error creating stdout pipe for command '%s': %w", userCmd, err)) } + cmd.Stdout = stdoutW + stdout := stdoutR + outWg.Go(func() { + defer stdout.Close() + stdoutLogs := make(chan []byte, outputBufferSize) defer close(stdoutLogs) @@ -248,7 +250,7 @@ func New( n, readErr := stdout.Read(buf) if n > 0 { - outMultiplex.Source <- rpc.ProcessEvent_Data{ + event := rpc.ProcessEvent_Data{ Data: &rpc.ProcessEvent_DataEvent{ Output: &rpc.ProcessEvent_DataEvent_Stdout{ Stdout: buf[:n], @@ -256,10 +258,12 @@ func New( }, } + outMultiplex.Source <- event + stdoutLogs <- buf[:n] } - if errors.Is(readErr, io.EOF) { + if errors.Is(readErr, io.EOF) || os.IsTimeout(readErr) { break } @@ -271,12 +275,19 @@ func New( } }) - stderr, err := cmd.StderrPipe() + stderrR, stderrW, err := os.Pipe() if err != nil { return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("error creating stderr pipe for command '%s': %w", userCmd, err)) } + cmd.Stderr = stderrW + stderr := stderrR + + h.pipeRead = []*os.File{stdoutR, stderrR} + outWg.Go(func() { + defer stderr.Close() + stderrLogs := make(chan []byte, outputBufferSize) defer close(stderrLogs) @@ -290,7 +301,7 @@ func New( n, readErr := stderr.Read(buf) if n > 0 { - outMultiplex.Source <- rpc.ProcessEvent_Data{ + event := rpc.ProcessEvent_Data{ Data: &rpc.ProcessEvent_DataEvent{ Output: &rpc.ProcessEvent_DataEvent_Stderr{ Stderr: buf[:n], @@ -298,10 +309,12 @@ func New( }, } + outMultiplex.Source <- event + stderrLogs <- buf[:n] } - if errors.Is(readErr, io.EOF) { + if errors.Is(readErr, io.EOF) || os.IsTimeout(readErr) { break } @@ -328,9 +341,9 @@ func New( go func() { outWg.Wait() - close(outMultiplex.Source) + close(h.readersDone) - outCancel() + outMultiplex.CloseSource() }() return h, nil @@ -349,10 +362,6 @@ func (p *Handler) SendSignal(signal syscall.Signal) error { return errors.New("process not started") } - if signal == syscall.SIGKILL || signal == syscall.SIGTERM { - p.outCancel() - } - return p.cmd.Process.Signal(signal) } @@ -426,6 +435,16 @@ func (p *Handler) Start(requestTimeout time.Duration) (uint32, error) { if err != nil { return 0, fmt.Errorf("error starting process '%s': %w", p.userCommand(), err) } + + // Close parent's copy of the write-ends so readers see EOF + // when the child (and any orphan grandchildren) exit. + if p.cmd.Stdout != nil { + p.cmd.Stdout.(*os.File).Close() + } + + if p.cmd.Stderr != nil { + p.cmd.Stderr.(*os.File).Close() + } } p.logger. @@ -440,11 +459,30 @@ func (p *Handler) Start(requestTimeout time.Duration) (uint32, error) { } func (p *Handler) Wait() { - // Wait for the output pipes to be closed or cancelled. - <-p.outCtx.Done() - + // Reap the child. With manual os.Pipe, cmd.Wait does not + // close our read-ends, so readers can finish draining. err := p.cmd.Wait() + // Disable back-pressure so any reader stuck on a full Source + // send unblocks, loops back, and sees EOF. + p.DataEvent.Drain() + + // Set a read deadline on the pipe/pty read-ends so readers drain + // any buffered data (reads with available data return instantly) + // and then exit cleanly instead of blocking forever when an + // orphan grandchild holds the write-end open. + deadline := time.Now().Add(1 * time.Second) + + for _, f := range p.pipeRead { + f.SetReadDeadline(deadline) + } + + if p.tty != nil { + p.tty.SetReadDeadline(deadline) + } + + <-p.readersDone + p.tty.Close() var errMsg *string @@ -466,6 +504,7 @@ func (p *Handler) Wait() { } p.EndEvent.Source <- event + p.EndEvent.CloseSource() p.logger. Info(). diff --git a/packages/envd/internal/services/process/handler/multiplex.go b/packages/envd/internal/services/process/handler/multiplex.go index 131f097981..81c4a2c683 100644 --- a/packages/envd/internal/services/process/handler/multiplex.go +++ b/packages/envd/internal/services/process/handler/multiplex.go @@ -8,12 +8,20 @@ import ( // MultiplexedChannel fans out values written to Source to every subscriber // obtained via Fork. Each subscriber send is guarded by a done channel so // a cancelled consumer can never wedge the fan-out loop. +// +// When no subscribers are connected, the fan-out loop blocks instead of +// draining Source. This lets the Source buffer fill up and back-pressure +// the producer through the pipe. type MultiplexedChannel[T any] struct { Source chan T mu sync.RWMutex channels []*subscriber[T] - exited atomic.Bool + done chan struct{} // closed when run() returns + + subMu sync.Mutex + subSignal chan struct{} // closed+recreated on subscriber list change + closed atomic.Bool // true after CloseSource } type subscriber[T any] struct { @@ -41,7 +49,9 @@ func (s *subscriber[T]) isCancelled() bool { func NewMultiplexedChannel[T any](buffer int) *MultiplexedChannel[T] { c := &MultiplexedChannel[T]{ - Source: make(chan T, buffer), + Source: make(chan T, buffer), + done: make(chan struct{}), + subSignal: make(chan struct{}), } go c.run() @@ -51,8 +61,19 @@ func NewMultiplexedChannel[T any](buffer int) *MultiplexedChannel[T] { // run is the fan-out loop. It delivers each Source value to every live // subscriber and closes all consumer channels when Source is closed. +// +// When no active subscribers remain after delivering a value, run +// stops consuming from Source until a subscriber appears. This lets +// Source fill up, which back-pressures the reader goroutine, which +// in turn back-pressures the child process through the pipe. func (m *MultiplexedChannel[T]) run() { - for v := range m.Source { + for { + // Wait for either a value from Source or a subscriber to appear. + v, ok := m.receiveWhenReady() + if !ok { + break + } + m.mu.RLock() subs := m.channels m.mu.RUnlock() @@ -70,8 +91,6 @@ func (m *MultiplexedChannel[T]) run() { } } - m.exited.Store(true) - // Close all remaining consumer channels so `for range` loops exit. m.mu.Lock() defer m.mu.Unlock() @@ -81,6 +100,73 @@ func (m *MultiplexedChannel[T]) run() { close(s.ch) } m.channels = nil + + // Signal that run() has finished. Fork() uses this to detect shutdown. + close(m.done) +} + +// receiveWhenReady reads the next value from Source, but only when at +// least one active subscriber exists. When there are no subscribers +// it stops consuming so Source fills up and back-pressures the +// producer. Returns (zero, false) when Source is closed. +// +// Callers that close Source must use CloseSource (not bare close) +// so the fan-out loop wakes up and observes the closed channel. +func (m *MultiplexedChannel[T]) receiveWhenReady() (v T, ok bool) { + for { + // Grab the signal before checking conditions so any change + // that happens after the check closes the channel we wait on. + m.subMu.Lock() + sig := m.subSignal + m.subMu.Unlock() + + if m.HasSubscribers() || m.closed.Load() { + v, ok = <-m.Source + + return v, ok + } + + <-sig + } +} + +// Drain disables back-pressure so the fan-out drains Source even +// with no subscribers. Call this when the producer is done (e.g. +// child process exited) so readers can flush their last chunks +// without blocking. Source remains open for further sends. +func (m *MultiplexedChannel[T]) Drain() { + m.closed.Store(true) + m.NotifySubscriberChange() +} + +// CloseSource closes the Source channel and wakes the fan-out loop. +func (m *MultiplexedChannel[T]) CloseSource() { + m.closed.Store(true) + close(m.Source) + m.NotifySubscriberChange() +} + +// NotifySubscriberChange wakes the fan-out if it's waiting. +func (m *MultiplexedChannel[T]) NotifySubscriberChange() { + m.subMu.Lock() + defer m.subMu.Unlock() + + close(m.subSignal) + m.subSignal = make(chan struct{}) +} + +// HasSubscribers reports whether any non-cancelled subscriber exists. +func (m *MultiplexedChannel[T]) HasSubscribers() bool { + m.mu.RLock() + defer m.mu.RUnlock() + + for _, s := range m.channels { + if !s.isCancelled() { + return true + } + } + + return false } // Fork registers a new subscriber and returns its channel plus a cancel func. @@ -88,22 +174,26 @@ func (m *MultiplexedChannel[T]) run() { // The channel is bidirectional for backwards compat with start.go which writes // a bootstrap event into it; new callers should treat it as receive-only. func (m *MultiplexedChannel[T]) Fork() (chan T, func()) { - if m.exited.Load() { + select { + case <-m.done: ch := make(chan T) close(ch) return ch, func() {} + default: } m.mu.Lock() defer m.mu.Unlock() // Re-check under lock in case run() finished between the fast path and here. - if m.exited.Load() { + select { + case <-m.done: ch := make(chan T) close(ch) return ch, func() {} + default: } s := &subscriber[T]{ @@ -112,6 +202,7 @@ func (m *MultiplexedChannel[T]) Fork() (chan T, func()) { } m.channels = append(m.channels, s) + m.NotifySubscriberChange() return s.ch, func() { m.remove(s) @@ -124,13 +215,16 @@ func (m *MultiplexedChannel[T]) remove(s *subscriber[T]) { s.cancel() m.mu.Lock() - defer m.mu.Unlock() for i, sub := range m.channels { if sub == s { m.channels = append(m.channels[:i], m.channels[i+1:]...) + m.mu.Unlock() + m.NotifySubscriberChange() return } } + + m.mu.Unlock() } diff --git a/packages/envd/internal/services/process/handler/multiplex_test.go b/packages/envd/internal/services/process/handler/multiplex_test.go index ae79495157..b53f178c78 100644 --- a/packages/envd/internal/services/process/handler/multiplex_test.go +++ b/packages/envd/internal/services/process/handler/multiplex_test.go @@ -1,7 +1,6 @@ package handler import ( - "runtime" "sync" "testing" "time" @@ -10,8 +9,7 @@ import ( "github.com/stretchr/testify/require" ) -// Tests for MultiplexedChannel fan-out, covering the fix for the goroutine -// leak that occurred when a subscriber disconnected mid-send. +// Tests for MultiplexedChannel fan-out. const multiplexTestTimeout = 500 * time.Millisecond @@ -78,23 +76,22 @@ func TestMultiplexedChannel_BasicFanOut(t *testing.T) { ) } - close(m.Source) + m.CloseSource() wg.Wait() assert.Equal(t, []int{1, 2, 3}, gotA) assert.Equal(t, []int{1, 2, 3}, gotB) } -// Regression: an abandoned consumer must not wedge the fan-out loop. +// An abandoned consumer must not wedge the fan-out loop. func TestMultiplexedChannel_AbandonedConsumerDoesNotWedgeFanOut(t *testing.T) { t.Parallel() m := NewMultiplexedChannel[int](1) - t.Cleanup(func() { close(m.Source) }) abandoned, cancelAbandoned := m.Fork() - // Consumer reads one value then exits, modeling a disconnected client. + // Consumer reads one value then exits. abandonReader := make(chan struct{}) go func() { <-abandoned @@ -112,7 +109,7 @@ func TestMultiplexedChannel_AbandonedConsumerDoesNotWedgeFanOut(t *testing.T) { t.Fatal("abandoned consumer should have read its single value") } - // Simulate the handler's deferred cancel after return. + // Cancel the subscriber. cancelDone := make(chan struct{}) go func() { cancelAbandoned() @@ -124,21 +121,33 @@ func TestMultiplexedChannel_AbandonedConsumerDoesNotWedgeFanOut(t *testing.T) { t.Fatal("cancel func did not return promptly; fan-out is wedged") } - // Producer should still make progress through Source. + // With no subscribers, back-pressure kicks in and further sends block. + sent := 0 for i := 2; i <= 8; i++ { - require.Truef(t, - sendOrTimeout(t, m.Source, i, multiplexTestTimeout), - "send %d should not be back-pressured by an abandoned consumer", i, - ) + if !sendOrTimeout(t, m.Source, i, 100*time.Millisecond) { + break + } + sent++ + } + // Buffer (size 1) plus at most one in-flight value. + assert.LessOrEqual(t, sent, 3, + "fan-out should stop draining Source when no subscribers remain (back-pressure)") + + m.CloseSource() + + select { + case <-m.done: + case <-time.After(2 * time.Second): + t.Fatal("fan-out did not exit after CloseSource") } } -// Regression: an abandoned consumer must not starve other subscribers. +// An abandoned consumer must not starve other subscribers. func TestMultiplexedChannel_AbandonedConsumerDoesNotStarveOthers(t *testing.T) { t.Parallel() m := NewMultiplexedChannel[int](1) - t.Cleanup(func() { close(m.Source) }) + t.Cleanup(func() { m.CloseSource() }) healthy, cancelHealthy := m.Fork() t.Cleanup(cancelHealthy) @@ -178,7 +187,7 @@ func TestMultiplexedChannel_AbandonedConsumerDoesNotStarveOthers(t *testing.T) { } } -// cancel must be idempotent and non-blocking even under producer load. +// Cancel must be idempotent and non-blocking even under producer load. func TestMultiplexedChannel_CancelIsIdempotentAndPrompt(t *testing.T) { t.Parallel() @@ -186,7 +195,7 @@ func TestMultiplexedChannel_CancelIsIdempotentAndPrompt(t *testing.T) { _, cancel := m.Fork() - // Concurrently push values without anyone draining the consumer chan. + // Push values without anyone draining. stop := make(chan struct{}) producerDone := make(chan struct{}) go func() { @@ -202,7 +211,7 @@ func TestMultiplexedChannel_CancelIsIdempotentAndPrompt(t *testing.T) { t.Cleanup(func() { close(stop) <-producerDone - close(m.Source) + m.CloseSource() }) // Give the fan-out a chance to enter a per-subscriber select. @@ -243,7 +252,7 @@ func TestMultiplexedChannel_SourceCloseClosesLiveSubscribers(t *testing.T) { "send should succeed", ) - close(m.Source) + m.CloseSource() select { case <-done: @@ -258,15 +267,12 @@ func TestMultiplexedChannel_ForkAfterSourceCloseReturnsClosedChan(t *testing.T) t.Parallel() m := NewMultiplexedChannel[int](0) - close(m.Source) + m.CloseSource() - // Wait for the fan-out goroutine to observe Source close. - deadline := time.Now().Add(multiplexTestTimeout) - for !m.exited.Load() { - if time.Now().After(deadline) { - t.Fatal("fan-out did not mark itself exited after Source close") - } - time.Sleep(time.Millisecond) + select { + case <-m.done: + case <-time.After(multiplexTestTimeout): + t.Fatal("fan-out did not exit after Source close") } cons, cancel := m.Fork() @@ -276,13 +282,60 @@ func TestMultiplexedChannel_ForkAfterSourceCloseReturnsClosedChan(t *testing.T) assert.False(t, ok, "Fork after shutdown must return a pre-closed channel") } -// Goroutine count must return to baseline after cancelled subscribers settle. -func TestMultiplexedChannel_NoGoroutineLeakOnAbandon(t *testing.T) { //nolint:paralleltest // relies on a stable goroutine count - const wedges = 16 +// Sending to a buffered Source after the last subscriber cancelled must +// not deadlock. This mirrors the EndEvent pattern in handler.Wait(). +func TestMultiplexedChannel_SendToSourceAfterLastCancelDoesNotDeadlock(t *testing.T) { + t.Parallel() + + m := NewMultiplexedChannel[int](1) + + _, cancel := m.Fork() + cancel() time.Sleep(50 * time.Millisecond) - runtime.GC() //nolint:revive // intentional: settle goroutines before measuring baseline - before := runtime.NumGoroutine() + + sent := sendOrTimeout(t, m.Source, 1, 2*time.Second) + if !sent { + t.Fatal("Source send deadlocked after last subscriber cancelled") + } + + m.CloseSource() + + select { + case <-m.done: + case <-time.After(2 * time.Second): + t.Fatal("fan-out did not drain buffered value and exit after CloseSource") + } +} + +// CloseSource after the last subscriber is cancelled must not leak +// the fan-out goroutine. Mirrors the LIFO defer order in handleStart. +func TestMultiplexedChannel_CloseSourceAfterCancelDoesNotLeakFanOut(t *testing.T) { + t.Parallel() + + const iterations = 8 + + for range iterations { + m := NewMultiplexedChannel[int](0) + _, cancel := m.Fork() + + cancel() + time.Sleep(10 * time.Millisecond) + m.CloseSource() + + select { + case <-m.done: + case <-time.After(2 * time.Second): + t.Fatal("fan-out goroutine did not exit after cancel-then-CloseSource") + } + } +} + +// Fan-out goroutine exits after cancelled subscribers and CloseSource. +func TestMultiplexedChannel_NoGoroutineLeakOnAbandon(t *testing.T) { + t.Parallel() + + const wedges = 16 for range wedges { m := NewMultiplexedChannel[int](1) @@ -290,25 +343,12 @@ func TestMultiplexedChannel_NoGoroutineLeakOnAbandon(t *testing.T) { //nolint:pa // Park one value so fan-out has work, then cancel mid-iteration. m.Source <- 0 cancel() - close(m.Source) - } + m.CloseSource() - // Allow scheduled goroutines to finish. - deadline := time.Now().Add(2 * time.Second) - for time.Now().Before(deadline) { - runtime.GC() //nolint:revive // intentional: help goroutines finalize - runtime.Gosched() - time.Sleep(20 * time.Millisecond) - if runtime.NumGoroutine() <= before+2 { - break + select { + case <-m.done: + case <-time.After(2 * time.Second): + t.Fatal("fan-out goroutine did not exit after cancel + CloseSource") } } - - after := runtime.NumGoroutine() - leaked := after - before - // Small slack for runtime bookkeeping; the old bug leaked >= wedges. - assert.LessOrEqualf(t, leaked, 2, - "goroutine count grew by %d after %d cancelled wedges; "+ - "before=%d after=%d (expected ~0)", leaked, wedges, before, after, - ) } diff --git a/packages/envd/internal/services/process/start.go b/packages/envd/internal/services/process/start.go index ef75a9e6d4..2458ec06ef 100644 --- a/packages/envd/internal/services/process/start.go +++ b/packages/envd/internal/services/process/start.go @@ -98,7 +98,7 @@ func (s *Service) handleStart(ctx context.Context, req *connect.Request[rpc.Star exitChan := make(chan struct{}) startMultiplexer := handler.NewMultiplexedChannel[rpc.ProcessEvent_Start](0) - defer close(startMultiplexer.Source) + defer startMultiplexer.CloseSource() start, startCancel := startMultiplexer.Fork() defer startCancel() diff --git a/packages/envd/internal/services/process/start_test.go b/packages/envd/internal/services/process/start_test.go index 2e851d6d3e..13a5210b89 100644 --- a/packages/envd/internal/services/process/start_test.go +++ b/packages/envd/internal/services/process/start_test.go @@ -2,10 +2,12 @@ package process import ( "context" + "fmt" "net/http" "net/http/httptest" "os" "os/user" + "strings" "testing" "time" @@ -127,3 +129,142 @@ func TestStart_ClientDisconnectMidStream(t *testing.T) { } _ = stream.Close() } + +// TestStart_ProcessSurvivesClientDisconnect verifies that a child +// process keeps running after the Start RPC client disconnects. +// This is intentional: procCtx is context.Background() so clients +// can reconnect via Connect later. +func TestStart_ProcessSurvivesClientDisconnect(t *testing.T) { + t.Parallel() + + client, cleanup := newTestService(t) + defer cleanup() + + ctx, cancel := context.WithCancel(context.Background()) + + stream, err := client.Start(ctx, connect.NewRequest(&rpc.StartRequest{ + Process: &rpc.ProcessConfig{ + Cmd: "timeout", + Args: []string{"5", "yes"}, + }, + })) + require.NoError(t, err) + + // Wait for the start event to get the PID. + require.True(t, stream.Receive(), "expected start event") + startEvt := stream.Msg().GetEvent().GetStart() + require.NotNil(t, startEvt) + pid := int(startEvt.GetPid()) + require.Positive(t, pid) + + // Disconnect the client. + cancel() + for stream.Receive() { + } + _ = stream.Close() + + time.Sleep(200 * time.Millisecond) + + // The child must still be alive — this is the Start-then-Connect contract. + assert.True(t, processAlive(pid), + "child process %d should survive client disconnect", pid) + + // Clean up. + proc, _ := os.FindProcess(pid) + _ = proc.Kill() +} + +// TestStart_FastCommandOutputNotTruncated verifies that a fast command's +// stdout is fully delivered. This catches the bug where cmd.Wait() closing +// the pipe races with outCancel(), causing the reader to drop its last chunk. +func TestStart_FastCommandOutputNotTruncated(t *testing.T) { + t.Parallel() + + client, cleanup := newTestService(t) + defer cleanup() + + const expected = "hello-truncation-test\n" + + for i := range 50 { + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) + + stream, err := client.Start(ctx, connect.NewRequest(&rpc.StartRequest{ + Process: &rpc.ProcessConfig{ + Cmd: "echo", + Args: []string{"hello-truncation-test"}, + }, + })) + require.NoError(t, err) + + var stdout strings.Builder + for stream.Receive() { + if data := stream.Msg().GetEvent().GetData(); data != nil { + if out := data.GetStdout(); out != nil { + stdout.Write(out) + } + } + } + require.NoError(t, stream.Err()) + _ = stream.Close() + cancel() + + require.Equalf(t, expected, stdout.String(), "iteration %d: output mismatch", i) + } +} + +// TestStart_OrphanGrandchildDoesNotHangStream verifies that killing a +// process whose child still holds stdout open does not hang the stream. +// The stream must deliver the EndEvent within a reasonable time. +func TestStart_OrphanGrandchildDoesNotHangStream(t *testing.T) { + t.Parallel() + + client, cleanup := newTestService(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(t.Context(), 30*time.Second) + defer cancel() + + // bash spawns a background child that inherits stdout and keeps + // the pipe open after the parent is killed. + stream, err := client.Start(ctx, connect.NewRequest(&rpc.StartRequest{ + Process: &rpc.ProcessConfig{ + Cmd: "bash", + Args: []string{"-c", "sleep 300 & wait"}, + }, + })) + require.NoError(t, err) + + // Wait for start event to get PID. + require.True(t, stream.Receive(), "expected start event") + startEvt := stream.Msg().GetEvent().GetStart() + require.NotNil(t, startEvt) + pid := startEvt.GetPid() + + // Kill the process via SendSignal (same as TestCommandKillNextApp). + _, err = client.SendSignal(ctx, connect.NewRequest(&rpc.SendSignalRequest{ + Signal: rpc.Signal_SIGNAL_SIGKILL, + Process: &rpc.ProcessSelector{ + Selector: &rpc.ProcessSelector_Pid{Pid: pid}, + }, + })) + require.NoError(t, err) + + var gotEnd bool + for stream.Receive() { + if stream.Msg().GetEvent().GetEnd() != nil { + gotEnd = true + } + } + require.NoError(t, stream.Err()) + _ = stream.Close() + + assert.True(t, gotEnd, "stream should deliver EndEvent even with orphan grandchild") +} + +// processAlive checks whether a process with the given PID exists. +func processAlive(pid int) bool { + // /proc//stat exists iff the process is alive (Linux-specific). + _, err := os.Stat(fmt.Sprintf("/proc/%d/stat", pid)) + + return err == nil +} diff --git a/packages/envd/pkg/version.go b/packages/envd/pkg/version.go index a5ccaf4cab..e251abd82f 100644 --- a/packages/envd/pkg/version.go +++ b/packages/envd/pkg/version.go @@ -1,3 +1,3 @@ package pkg -const Version = "0.5.17" +const Version = "0.5.18"