Skip to content

Commit cde8dc9

Browse files
committed
fix(envd): prevent output truncation on fast commands
cmd.StdoutPipe/StderrPipe are managed by cmd.Wait which closes the pipe read-ends on return, racing with readers that haven't finished. Replace them with manual os.Pipe so we control the lifecycle: write-ends are closed after Start (child inherited them), read-ends stay open until readers finish naturally via EOF. After cmd.Wait reaps the child, call Drain() on the data multiplexer to disable back-pressure, letting stuck readers unblock and see EOF. Then wait for all readers to exit before proceeding.
1 parent 3026c2c commit cde8dc9

3 files changed

Lines changed: 75 additions & 8 deletions

File tree

packages/envd/internal/services/process/handler/handler.go

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,9 @@ type Handler struct {
5454
outCtx context.Context //nolint:containedctx // todo: refactor so this can be removed
5555
outCancel context.CancelFunc
5656

57-
stdinMu sync.Mutex
58-
stdin io.WriteCloser
57+
stdinMu sync.Mutex
58+
stdin io.WriteCloser
59+
pipeEnds []*os.File // write-ends of stdout/stderr pipes; closed after Start
5960

6061
DataEvent *MultiplexedChannel[rpc.ProcessEvent_Data]
6162
EndEvent *MultiplexedChannel[rpc.ProcessEvent_End]
@@ -235,11 +236,14 @@ func New(
235236

236237
h.tty = tty
237238
} else {
238-
stdout, err := cmd.StdoutPipe()
239+
stdoutR, stdoutW, err := os.Pipe()
239240
if err != nil {
240241
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("error creating stdout pipe for command '%s': %w", userCmd, err))
241242
}
242243

244+
cmd.Stdout = stdoutW
245+
stdout := stdoutR
246+
243247
outWg.Go(func() {
244248
stdoutLogs := make(chan []byte, outputBufferSize)
245249
defer close(stdoutLogs)
@@ -283,11 +287,16 @@ func New(
283287
}
284288
})
285289

286-
stderr, err := cmd.StderrPipe()
290+
stderrR, stderrW, err := os.Pipe()
287291
if err != nil {
288292
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("error creating stderr pipe for command '%s': %w", userCmd, err))
289293
}
290294

295+
cmd.Stderr = stderrW
296+
stderr := stderrR
297+
298+
h.pipeEnds = []*os.File{stdoutW, stderrW}
299+
291300
outWg.Go(func() {
292301
stderrLogs := make(chan []byte, outputBufferSize)
293302
defer close(stderrLogs)
@@ -444,6 +453,12 @@ func (p *Handler) Start(requestTimeout time.Duration) (uint32, error) {
444453
if err != nil {
445454
return 0, fmt.Errorf("error starting process '%s': %w", p.userCommand(), err)
446455
}
456+
457+
// Close parent's copy of the pipe write-ends; the child
458+
// inherited them. Readers will see EOF when the child exits.
459+
for _, f := range p.pipeEnds {
460+
f.Close()
461+
}
447462
}
448463

449464
p.logger.
@@ -458,11 +473,15 @@ func (p *Handler) Start(requestTimeout time.Duration) (uint32, error) {
458473
}
459474

460475
func (p *Handler) Wait() {
461-
// cmd.Wait reaps the child and closes the pipe read-ends.
462-
// Then we cancel outCtx to unblock any reader goroutine that
463-
// is blocked on a full Source channel send (back-pressure).
476+
// Reap the child. With manual os.Pipe() this does NOT close the
477+
// read-ends, so readers can still drain remaining pipe data.
464478
err := p.cmd.Wait()
465-
p.outCancel()
479+
480+
// Disable back-pressure so the fan-out drains Source even with
481+
// no subscribers. Readers will see EOF (child closed write-end
482+
// on exit), flush their last chunk, and exit.
483+
p.DataEvent.Drain()
484+
<-p.outCtx.Done()
466485

467486
p.tty.Close()
468487

packages/envd/internal/services/process/handler/multiplex.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,15 @@ func (m *MultiplexedChannel[T]) receiveWhenReady() (v T, ok bool) {
131131
}
132132
}
133133

134+
// Drain disables back-pressure so the fan-out drains Source even
135+
// with no subscribers. Call this when the producer is done (e.g.
136+
// child process exited) so readers can flush their last chunks
137+
// without blocking. Source remains open for further sends.
138+
func (m *MultiplexedChannel[T]) Drain() {
139+
m.closed.Store(true)
140+
m.NotifySubscriberChange()
141+
}
142+
134143
// CloseSource closes the Source channel and wakes the fan-out loop.
135144
func (m *MultiplexedChannel[T]) CloseSource() {
136145
m.closed.Store(true)

packages/envd/internal/services/process/start_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"net/http/httptest"
88
"os"
99
"os/user"
10+
"strings"
1011
"testing"
1112
"time"
1213

@@ -173,6 +174,44 @@ func TestStart_ProcessSurvivesClientDisconnect(t *testing.T) {
173174
_ = proc.Kill()
174175
}
175176

177+
// TestStart_FastCommandOutputNotTruncated verifies that a fast command's
178+
// stdout is fully delivered. This catches the bug where cmd.Wait() closing
179+
// the pipe races with outCancel(), causing the reader to drop its last chunk.
180+
func TestStart_FastCommandOutputNotTruncated(t *testing.T) {
181+
t.Parallel()
182+
183+
client, cleanup := newTestService(t)
184+
defer cleanup()
185+
186+
const expected = "hello-truncation-test\n"
187+
188+
for i := range 50 {
189+
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
190+
191+
stream, err := client.Start(ctx, connect.NewRequest(&rpc.StartRequest{
192+
Process: &rpc.ProcessConfig{
193+
Cmd: "echo",
194+
Args: []string{"hello-truncation-test"},
195+
},
196+
}))
197+
require.NoError(t, err)
198+
199+
var stdout strings.Builder
200+
for stream.Receive() {
201+
if data := stream.Msg().GetEvent().GetData(); data != nil {
202+
if out := data.GetStdout(); out != nil {
203+
stdout.Write(out)
204+
}
205+
}
206+
}
207+
require.NoError(t, stream.Err())
208+
_ = stream.Close()
209+
cancel()
210+
211+
require.Equalf(t, expected, stdout.String(), "iteration %d: output mismatch", i)
212+
}
213+
}
214+
176215
// processAlive checks whether a process with the given PID exists.
177216
func processAlive(pid int) bool {
178217
// /proc/<pid>/stat exists iff the process is alive (Linux-specific).

0 commit comments

Comments
 (0)