Skip to content

Commit c1d19b7

Browse files
Pangjipingclaude
andauthored
fix(execd): preserve blank lines in command stdout SSE stream (#902)
* fix(execd): preserve blank lines in command stdout SSE stream readFromPos previously skipped consecutive line terminators when the buffer was empty, dropping standalone newline lines from the SSE output. Emit "\n" for blank lines while keeping the existing line-content emit behavior (terminator is still stripped from non-empty lines). \r\n pairs are coalesced via lastWasCR to avoid duplicate blank emits. Adds a unit test covering blank lines, leading blank, and CRLF, and an end-to-end smoke test that runs `printf 'a\n\nb\n\n\nc\n'` and asserts the stdout event sequence preserves the blanks. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix(execd): wait for stdout/stderr tail to flush before complete on Windows Windows runCommand fired OnExecuteComplete immediately after closing the done channel, racing the tail goroutines that emit pending stdout/stderr SSE events. Clients that break on execution_complete then missed final output, e.g. the blank-line smoke assertion saw an empty sequence while the server log showed the events were emitted. Mirror the Linux path: track the tail goroutines with a sync.WaitGroup and wg.Wait() after close(done) so all buffered output drains before the completion event is sent. Also covers the cmd.Start failure path. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix(execd): persist CRLF state across tail polls readFromPos previously declared lastWasCR as a per-call local, so the CRLF coalescing only worked when \r and \n were read in the same invocation. When tailStdPipe polls a writer that flushes \r before \n (common on Windows/cmd), the \r and \n can land in separate polls; the second call starts with lastWasCR=false and emits a spurious "\n" blank line for the trailing \n. A bare blank \r\n line split across polls would surface as two blanks. Hoist the state into tailStdPipe and thread it through readFromPos so the CR detection survives between polls. Add regression tests covering split CRLF after content and split blank CRLF. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * test(execd): make blank-lines smoke command cross-platform Replace POSIX printf with single-quoted format string with a python -c one-liner. cmd /C does not strip single quotes, so the previous command only worked on Windows runners that happened to have Git for Windows in PATH (MSYS2 argv pre-processing strips the quotes); on a bare Windows sandbox the smoke would fail before reaching the filesystem checks. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * test(execd): branch blank-lines smoke per platform The previous python -c attempt produced a SyntaxError on Windows because Go's syscall.EscapeArg wraps the cmd /C argument in quotes, escaping the inner quotes as \". cmd /C strips the outer quotes (rule 2 of its parser) but leaves the literal \" inside, and MSVCRT's argv parser then treats \" as a literal double-quote character without toggling quote state, so the first embedded space terminates argv[2] and python sees an unterminated string literal. Use a cmd-native echo chain on Windows (no inner quotes, & is sequential) and keep POSIX printf on Linux/macOS. The execd reader collapses CRLF to LF, so both platforms yield the same event sequence. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
1 parent db9ec8f commit c1d19b7

4 files changed

Lines changed: 164 additions & 11 deletions

File tree

components/execd/pkg/runtime/command_common.go

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,14 @@ func (c *Controller) tailStdPipe(file string, onExecute func(text string), done
3232
defer ticker.Stop()
3333

3434
mutex := &sync.Mutex{}
35+
var lastWasCR bool
3536
for {
3637
select {
3738
case <-done:
38-
c.readFromPos(mutex, file, lastPos, onExecute, true)
39+
c.readFromPos(mutex, file, lastPos, onExecute, true, &lastWasCR)
3940
return
4041
case <-ticker.C:
41-
newPos := c.readFromPos(mutex, file, lastPos, onExecute, false)
42+
newPos := c.readFromPos(mutex, file, lastPos, onExecute, false, &lastWasCR)
4243
lastPos = newPos
4344
}
4445
}
@@ -104,7 +105,9 @@ func (c *Controller) combinedOutputFileName(session string) string {
104105
}
105106

106107
// readFromPos streams new content from a file starting at startPos.
107-
func (c *Controller) readFromPos(mutex *sync.Mutex, filepath string, startPos int64, onExecute func(string), flushIncomplete bool) int64 {
108+
// lastWasCR persists CRLF detection across calls so a \r\n pair split between
109+
// two polls does not surface a spurious blank line for the trailing \n.
110+
func (c *Controller) readFromPos(mutex *sync.Mutex, filepath string, startPos int64, onExecute func(string), flushIncomplete bool, lastWasCR *bool) int64 {
108111
if !mutex.TryLock() {
109112
return -1
110113
}
@@ -121,6 +124,15 @@ func (c *Controller) readFromPos(mutex *sync.Mutex, filepath string, startPos in
121124
reader := bufio.NewReader(file)
122125
var buffer bytes.Buffer
123126
var currentPos int64 = startPos
127+
cr := false
128+
if lastWasCR != nil {
129+
cr = *lastWasCR
130+
}
131+
defer func() {
132+
if lastWasCR != nil {
133+
*lastWasCR = cr
134+
}
135+
}()
124136

125137
for {
126138
b, err := reader.ReadByte()
@@ -138,15 +150,22 @@ func (c *Controller) readFromPos(mutex *sync.Mutex, filepath string, startPos in
138150

139151
// Check if it's a line terminator (\n or \r)
140152
if b == '\n' || b == '\r' {
141-
// If buffer has content, output this line
142-
if buffer.Len() > 0 {
153+
switch {
154+
case buffer.Len() > 0:
155+
// Flush the line content without the terminator
143156
onExecute(buffer.String())
144157
buffer.Reset()
158+
case b == '\n' && cr:
159+
// Second half of a \r\n pair; already emitted on \r
160+
default:
161+
// Standalone blank line; surface it so callers see the gap
162+
onExecute("\n")
145163
}
146-
// Skip line terminator
164+
cr = (b == '\r')
147165
continue
148166
}
149167

168+
cr = false
150169
buffer.WriteByte(b)
151170
}
152171

components/execd/pkg/runtime/command_test.go

Lines changed: 76 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func TestReadFromPos_SplitsOnCRAndLF(t *testing.T) {
4242

4343
var got []string
4444
c := &Controller{}
45-
nextPos := c.readFromPos(mutex, logFile, 0, func(s string) { got = append(got, s) }, false)
45+
nextPos := c.readFromPos(mutex, logFile, 0, func(s string) { got = append(got, s) }, false, nil)
4646

4747
want := []string{"line1", "prog 10%", "prog 20%", "prog 30%", "last"}
4848
require.Len(t, got, len(want))
@@ -59,7 +59,7 @@ func TestReadFromPos_SplitsOnCRAndLF(t *testing.T) {
5959
_ = f.Close()
6060

6161
got = got[:0]
62-
c.readFromPos(mutex, logFile, nextPos, func(s string) { got = append(got, s) }, false)
62+
c.readFromPos(mutex, logFile, nextPos, func(s string) { got = append(got, s) }, false, nil)
6363
want = []string{"tail1", "tail2"}
6464
require.Len(t, got, len(want))
6565
for i := range want {
@@ -77,7 +77,7 @@ func TestReadFromPos_LongLine(t *testing.T) {
7777

7878
var got []string
7979
c := &Controller{}
80-
c.readFromPos(&sync.Mutex{}, logFile, 0, func(s string) { got = append(got, s) }, false)
80+
c.readFromPos(&sync.Mutex{}, logFile, 0, func(s string) { got = append(got, s) }, false, nil)
8181

8282
require.Len(t, got, 1, "expected one token")
8383
require.Equal(t, strings.TrimSuffix(longLine, "\n"), got[0], "long line mismatch")
@@ -98,15 +98,86 @@ func TestReadFromPos_FlushesTrailingLine(t *testing.T) {
9898
}
9999

100100
// First read: should only get complete lines with newlines
101-
pos := c.readFromPos(mutex, file, 0, onExecute, false)
101+
pos := c.readFromPos(mutex, file, 0, onExecute, false, nil)
102102
assert.GreaterOrEqual(t, pos, int64(0))
103103
assert.Equal(t, []string{"line1"}, lines)
104104

105105
// Flush at end: should output the last line (without newline)
106-
c.readFromPos(mutex, file, pos, onExecute, true)
106+
c.readFromPos(mutex, file, pos, onExecute, true, nil)
107107
assert.Equal(t, []string{"line1", "lastline-without-newline"}, lines)
108108
}
109109

110+
func TestReadFromPos_PreservesBlankLines(t *testing.T) {
111+
tmp := t.TempDir()
112+
logFile := filepath.Join(tmp, "stdout.log")
113+
114+
// Mix of single newlines, consecutive blank lines, leading blank, and CRLF.
115+
initial := "a\n\nb\n\n\nc\n\r\nd\n"
116+
require.NoError(t, os.WriteFile(logFile, []byte(initial), 0o644))
117+
118+
var got []string
119+
c := &Controller{}
120+
c.readFromPos(&sync.Mutex{}, logFile, 0, func(s string) { got = append(got, s) }, false, nil)
121+
122+
want := []string{"a", "\n", "b", "\n", "\n", "c", "\n", "d"}
123+
require.Equal(t, want, got)
124+
}
125+
126+
// TestReadFromPos_CRLFAcrossPolls ensures a \r\n pair that arrives in two
127+
// successive polls does not emit a spurious blank line for the trailing \n.
128+
// Reproduces the regression on Windows/cmd writers that flush \r before \n.
129+
func TestReadFromPos_CRLFAcrossPolls(t *testing.T) {
130+
tmp := t.TempDir()
131+
logFile := filepath.Join(tmp, "stdout.log")
132+
133+
require.NoError(t, os.WriteFile(logFile, []byte("a\r"), 0o644))
134+
135+
var got []string
136+
c := &Controller{}
137+
mutex := &sync.Mutex{}
138+
var lastWasCR bool
139+
pos := c.readFromPos(mutex, logFile, 0, func(s string) { got = append(got, s) }, false, &lastWasCR)
140+
require.Equal(t, []string{"a"}, got)
141+
require.True(t, lastWasCR, "CR state must persist for next poll")
142+
143+
f, err := os.OpenFile(logFile, os.O_APPEND|os.O_WRONLY, 0o644)
144+
require.NoError(t, err)
145+
_, err = f.WriteString("\nb\n")
146+
require.NoError(t, err)
147+
_ = f.Close()
148+
149+
got = got[:0]
150+
c.readFromPos(mutex, logFile, pos, func(s string) { got = append(got, s) }, false, &lastWasCR)
151+
require.Equal(t, []string{"b"}, got, "trailing \\n of split CRLF must not emit a blank line")
152+
}
153+
154+
// TestReadFromPos_BlankCRLFAcrossPolls ensures a blank \r\n line split across
155+
// polls is emitted as a single blank, not duplicated.
156+
func TestReadFromPos_BlankCRLFAcrossPolls(t *testing.T) {
157+
tmp := t.TempDir()
158+
logFile := filepath.Join(tmp, "stdout.log")
159+
160+
require.NoError(t, os.WriteFile(logFile, []byte("\r"), 0o644))
161+
162+
var got []string
163+
c := &Controller{}
164+
mutex := &sync.Mutex{}
165+
var lastWasCR bool
166+
pos := c.readFromPos(mutex, logFile, 0, func(s string) { got = append(got, s) }, false, &lastWasCR)
167+
require.Equal(t, []string{"\n"}, got)
168+
require.True(t, lastWasCR)
169+
170+
f, err := os.OpenFile(logFile, os.O_APPEND|os.O_WRONLY, 0o644)
171+
require.NoError(t, err)
172+
_, err = f.WriteString("\n")
173+
require.NoError(t, err)
174+
_ = f.Close()
175+
176+
got = got[:0]
177+
c.readFromPos(mutex, logFile, pos, func(s string) { got = append(got, s) }, false, &lastWasCR)
178+
require.Empty(t, got, "trailing \\n of split blank CRLF must not emit a second blank")
179+
}
180+
110181
func TestRunCommand_Echo(t *testing.T) {
111182
if goruntime.GOOS == "windows" {
112183
t.Skip("bash not available on windows")

components/execd/pkg/runtime/command_windows.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"os"
2525
"os/exec"
2626
"strconv"
27+
"sync"
2728
"time"
2829

2930
"github.com/alibaba/opensandbox/execd/pkg/jupyter/execute"
@@ -57,15 +58,21 @@ func (c *Controller) runCommand(ctx context.Context, request *ExecuteCodeRequest
5758
cmd.Env = mergeEnvs(os.Environ(), extraEnv)
5859

5960
done := make(chan struct{}, 1)
61+
var wg sync.WaitGroup
62+
wg.Add(2)
6063
safego.Go(func() {
64+
defer wg.Done()
6165
c.tailStdPipe(c.stdoutFileName(session), request.Hooks.OnExecuteStdout, done)
6266
})
6367
safego.Go(func() {
68+
defer wg.Done()
6469
c.tailStdPipe(c.stderrFileName(session), request.Hooks.OnExecuteStderr, done)
6570
})
6671

6772
err = cmd.Start()
6873
if err != nil {
74+
close(done)
75+
wg.Wait()
6976
request.Hooks.OnExecuteError(&execute.ErrorOutput{EName: "CommandExecError", EValue: err.Error()})
7077
log.Error("CommandExecError: error starting commands: %v", err)
7178
return nil
@@ -80,6 +87,7 @@ func (c *Controller) runCommand(ctx context.Context, request *ExecuteCodeRequest
8087

8188
err = cmd.Wait()
8289
close(done)
90+
wg.Wait()
8391
if err != nil {
8492
var eName, eValue string
8593
var traceback []string

components/execd/tests/smoke_api.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,58 @@ def fetch_logs(cmd_id: str, cursor: int = 0):
9191
return r.text, r.headers.get("EXECD-COMMANDS-TAIL-CURSOR")
9292

9393

94+
def run_command_blank_lines():
95+
"""
96+
Foreground command whose stdout contains consecutive newlines must surface
97+
blank-line events instead of dropping them. Regression test for the
98+
readFromPos fix that preserves empty lines (a\n\nb -> ["a", "\n", "b"]).
99+
"""
100+
url = f"{BASE_URL}/command"
101+
# Pick a shell-native command per platform so the regression covers both
102+
# POSIX (LF-only) and Windows cmd (CRLF) byte streams without depending on
103+
# Git for Windows / MSYS argv mangling. The execd reader collapses CRLF to
104+
# LF, so both produce ["a", "\n", "b", "\n", "\n", "c"].
105+
if os.name == "nt":
106+
# cmd /C echo chain: each segment writes "<text>\r\n"; "echo." writes
107+
# a bare "\r\n". Order is deterministic because "&" is sequential.
108+
command = "echo a&echo.&echo b&echo.&echo.&echo c"
109+
else:
110+
# printf emits exact bytes: a\n\nb\n\n\nc\n
111+
command = "printf 'a\\n\\nb\\n\\n\\nc\\n'"
112+
payload = {
113+
"command": command,
114+
"background": False,
115+
}
116+
117+
stdout_texts = []
118+
saw_complete = False
119+
with session.post(url, json=payload, stream=True, timeout=15) as resp:
120+
expect(resp.status_code == 200, f"SSE start failed: {resp.status_code} {resp.text}")
121+
for line in resp.iter_lines():
122+
if not line:
123+
continue
124+
try:
125+
if line.startswith(b"data:"):
126+
data = json.loads(line[len(b"data:") :].decode())
127+
else:
128+
data = json.loads(line.decode())
129+
except Exception:
130+
continue
131+
event_type = data.get("type")
132+
if event_type == "stdout":
133+
stdout_texts.append(data.get("text", ""))
134+
elif event_type == "execution_complete":
135+
saw_complete = True
136+
break
137+
138+
expect(saw_complete, "did not observe execution_complete")
139+
want = ["a", "\n", "b", "\n", "\n", "c"]
140+
expect(
141+
stdout_texts == want,
142+
f"blank-line stdout sequence mismatch: got {stdout_texts!r}, want {want!r}",
143+
)
144+
145+
94146
def sse_disconnect_should_stop_ping():
95147
"""
96148
Open an SSE stream for a long-running command, receive init, then close the
@@ -248,6 +300,9 @@ def main():
248300
sse_disconnect_should_stop_ping()
249301
print("[+] SSE disconnect handled")
250302

303+
run_command_blank_lines()
304+
print("[+] run_command preserves blank lines")
305+
251306
cmd_id = sse_get_command_id()
252307
print(f"[+] command id: {cmd_id}")
253308

0 commit comments

Comments
 (0)