Skip to content

Commit 3148405

Browse files
Pangjipingclaude
andauthored
fix(execd): defer SSE response headers until first event fires (#912)
* test(e2e): harden flaky cross-SDK sandbox tests Recurring failures in `real-e2e.yml` traced to three races: 1. Go `TestE2E_FullLifecycle` pings execd via the server proxy the moment sandbox state flips to Running; execd's TCP listener can accept before routes register, surfacing as `read: connection reset by peer` on `/proxy/44772/ping`. Wrap Ping + the first SSE RunCommand in `require.Eventually` retries. 2. Go `TestE2E_PauseResume` and `TestFilesystem_SetPermissions` hit `opensandbox: empty sse stream` immediately after Resume / readiness. Add `runCommandWithRetry` helper in base_e2e_test.go and use it in the affected call sites. 3. Java/C# NetworkPolicy tests asserted egress blocking after a fixed `Thread.sleep`/`Task.Delay`; the sidecar accepts the sandbox before iptables/proxy rules apply, so curl occasionally succeeds and `assertNotNull(error)` fails. Replace fixed sleeps with `waitUntilEgressBlocks` / `WaitUntilEgressBlocksAsync` helpers that poll curl until the policy actually blocks (or fail with the last observation). Also retry transient single-line stdout drops on the workingDirectory `pwd` checks (JS, Java) and the C# env-injection baseline — same SSE blank-line / first-event race that has bitten multiple SDKs. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix(execd): defer SSE response headers until first event fires `RunCommand`, `RunCode`, and `RunInSession` previously committed the response as `text/event-stream` (via `setupSSEResponse`) before invoking the runtime. If the runtime returned a synchronous error — for example because `stdLogDescriptor` could not (re)create `/tmp` after a sandbox restart, `pathutil.ExpandPathWithEnv` failed to resolve a working directory under transient env conditions, or `buildCredential` raced with a uid/gid lookup — the handler then called `RespondError`, which set Content-Type to `application/json` and wrote a JSON body on top of the already-committed event-stream response. Clients saw HTTP 200 with `text/event-stream` and a JSON body that no SSE parser could decode, producing zero events; the Go SDK reported `opensandbox: empty sse stream` and JS/Java/C# SDKs surfaced the same race as missing init events or a vanished single-line stdout (e.g. `pwd` with `workingDirectory: "/tmp"` returning empty `stdout[0]`). Make `setupSSEResponse` idempotent (guarded by `sync.Once`) and call it lazily from `writeSingleEvent`, so headers commit only once an event is actually being written. Drop the eager `setupSSEResponse` calls in the three streaming endpoints. Pre-execution synchronous errors now flow through `RespondError` cleanly with `application/json`, and successful runs still emit `text/event-stream` on the first event. Add three regression tests: - `TestRunCodeSyncErrorEmitsJSONNotSSE` - `TestRunInSessionSyncErrorEmitsJSONNotSSE` - `TestRunCodeSuccessStillEmitsSSE` Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * test(e2e): drop SDK-side retries now that execd SSE bug is fixed The execd "empty sse stream" / dropped-event race is fixed at the source in the previous commit (lazy SSE headers so synchronous runtime errors return JSON, not a half-formed event-stream). Real SDK clients do not retry these failures, so the e2e suite shouldn't either — the retries were masking the bug, not exercising production behaviour. Revert the SDK-side workarounds: - JS `pwd workingDirectory:/tmp` retry loop - Java echo + pwd retry loops - C# env-injection `RunWithRetryAsync` wrapper - Go `runCommandWithRetry` helper and its callers in `TestE2E_PauseResume` and `TestFilesystem_SetPermissions` Keep the targeted polls that cover other races not addressed by the execd fix: - Go `TestE2E_FullLifecycle` execd-Ping `Eventually` — bypasses the high-level SDK and pings the server-side proxy directly; the proxy drops the very first connection in the gap between sandbox state Running and execd routes registering. Real users go through `CreateSandbox`/`WaitUntilReady`, which already handles this; the low-level test does not. - Java/C# `waitUntilEgressBlocks` polls — egress sidecar policy is applied asynchronously after the sandbox is marked ready, so a fixed sleep is inherently flaky. This is a separate readiness-gating bug that should be addressed server-side. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 83115a5 commit 3148405

8 files changed

Lines changed: 200 additions & 34 deletions

File tree

components/execd/pkg/web/controller/basic.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,16 @@ import (
1818
"encoding/json"
1919
"net/http"
2020
"strconv"
21+
"sync"
2122

2223
"github.com/gin-gonic/gin"
2324

2425
"github.com/alibaba/opensandbox/execd/pkg/web/model"
2526
)
2627

2728
type basicController struct {
28-
ctx *gin.Context
29+
ctx *gin.Context
30+
sseSetupOnce sync.Once
2931
}
3032

3133
func newBasicController(ctx *gin.Context) *basicController {

components/execd/pkg/web/controller/codeinterpreting.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,9 @@ func (c *CodeInterpretingController) RunCode() {
171171
}
172172
runCodeRequest.Hooks = eventsHandler
173173

174-
c.setupSSEResponse()
174+
// SSE headers are committed lazily on the first event write
175+
// (see writeSingleEvent), so a synchronous error from Execute below can
176+
// still be surfaced as a structured JSON error response.
175177
err = codeRunner.Execute(runCodeRequest)
176178
if err != nil {
177179
recordExecution("failure")
@@ -400,7 +402,9 @@ func (c *CodeInterpretingController) RunInSession() {
400402
}
401403
runReq.Hooks = hooks
402404

403-
c.setupSSEResponse()
405+
// SSE headers are committed lazily on the first event write
406+
// (see writeSingleEvent), so a synchronous error from
407+
// RunInBashSession can still be surfaced as a structured JSON error.
404408
err := codeRunner.RunInBashSession(ctx, runReq)
405409
if err != nil {
406410
recordExecution("failure")

components/execd/pkg/web/controller/codeinterpreting_test.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package controller
1717
import (
1818
"context"
1919
"encoding/json"
20+
"errors"
2021
"net/http"
2122
"testing"
2223
"time"
@@ -333,3 +334,90 @@ func TestRunInSessionReturnsBeforeGracefulShutdownTimeoutAfterImmediateError(t *
333334
require.Equal(t, http.StatusOK, w.Code)
334335
require.Less(t, elapsed, flag.ApiGracefulShutdownTimeout/2)
335336
}
337+
338+
// TestRunCodeSyncErrorEmitsJSONNotSSE guards against regression of the bug
339+
// where Execute returning a synchronous error after setupSSEResponse caused
340+
// the client to receive a text/event-stream response with a JSON body, which
341+
// SDKs parsed as zero events ("empty sse stream"). Headers must stay
342+
// uncommitted until the first event so RespondError can produce a proper
343+
// application/json error response.
344+
func TestRunCodeSyncErrorEmitsJSONNotSSE(t *testing.T) {
345+
previousRunner := codeRunner
346+
codeRunner = &fakeCodeRunner{
347+
execute: func(_ *runtime.ExecuteCodeRequest) error {
348+
return errors.New("synchronous runtime failure")
349+
},
350+
}
351+
t.Cleanup(func() { codeRunner = previousRunner })
352+
353+
body := []byte(`{"code":"print(1)","context":{"id":"ctx-1","language":"python"}}`)
354+
ctx, w := newTestContext(http.MethodPost, "/code/run", body)
355+
ctrl := NewCodeInterpretingController(ctx)
356+
357+
ctrl.RunCode()
358+
359+
require.Equal(t, http.StatusInternalServerError, w.Code)
360+
contentType := w.Header().Get("Content-Type")
361+
require.Contains(t, contentType, "application/json", "should not commit text/event-stream when no event fires")
362+
363+
var resp model.ErrorResponse
364+
require.NoError(t, json.Unmarshal(w.Body.Bytes(), &resp))
365+
require.Equal(t, model.ErrorCodeRuntimeError, resp.Code)
366+
require.Contains(t, resp.Message, "synchronous runtime failure")
367+
}
368+
369+
// TestRunInSessionSyncErrorEmitsJSONNotSSE — see TestRunCodeSyncErrorEmitsJSONNotSSE.
370+
func TestRunInSessionSyncErrorEmitsJSONNotSSE(t *testing.T) {
371+
previousRunner := codeRunner
372+
codeRunner = &fakeCodeRunner{
373+
runInBashSession: func(_ context.Context, _ *runtime.ExecuteCodeRequest) error {
374+
return errors.New("synchronous session failure")
375+
},
376+
}
377+
t.Cleanup(func() { codeRunner = previousRunner })
378+
379+
body := []byte(`{"command":"echo hi","timeout":0}`)
380+
ctx, w := newTestContext(http.MethodPost, "/sessions/session-1/run", body)
381+
ctx.Params = append(ctx.Params, gin.Param{Key: "sessionId", Value: "session-1"})
382+
ctrl := NewCodeInterpretingController(ctx)
383+
384+
ctrl.RunInSession()
385+
386+
require.Equal(t, http.StatusInternalServerError, w.Code)
387+
contentType := w.Header().Get("Content-Type")
388+
require.Contains(t, contentType, "application/json", "should not commit text/event-stream when no event fires")
389+
390+
var resp model.ErrorResponse
391+
require.NoError(t, json.Unmarshal(w.Body.Bytes(), &resp))
392+
require.Equal(t, model.ErrorCodeRuntimeError, resp.Code)
393+
require.Contains(t, resp.Message, "synchronous session failure")
394+
}
395+
396+
// TestRunCodeSuccessStillEmitsSSE confirms the lazy header path still produces
397+
// a text/event-stream response when at least one event fires.
398+
func TestRunCodeSuccessStillEmitsSSE(t *testing.T) {
399+
previousRunner := codeRunner
400+
previousTimeout := flag.ApiGracefulShutdownTimeout
401+
codeRunner = &fakeCodeRunner{
402+
execute: func(request *runtime.ExecuteCodeRequest) error {
403+
request.Hooks.OnExecuteInit("session-1")
404+
request.Hooks.OnExecuteComplete(time.Millisecond)
405+
return nil
406+
},
407+
}
408+
flag.ApiGracefulShutdownTimeout = 50 * time.Millisecond
409+
t.Cleanup(func() {
410+
codeRunner = previousRunner
411+
flag.ApiGracefulShutdownTimeout = previousTimeout
412+
})
413+
414+
body := []byte(`{"code":"print(1)","context":{"id":"ctx-1","language":"python"}}`)
415+
ctx, w := newTestContext(http.MethodPost, "/code/run", body)
416+
ctrl := NewCodeInterpretingController(ctx)
417+
418+
ctrl.RunCode()
419+
420+
require.Equal(t, http.StatusOK, w.Code)
421+
require.Contains(t, w.Header().Get("Content-Type"), "text/event-stream")
422+
require.NotEmpty(t, w.Body.Bytes(), "successful run should write SSE events")
423+
}

components/execd/pkg/web/controller/command.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,9 @@ func (c *CodeInterpretingController) RunCommand() {
8080
}
8181
runCodeRequest.Hooks = eventsHandler
8282

83-
c.setupSSEResponse()
83+
// SSE headers are committed lazily on the first event write
84+
// (see writeSingleEvent), so a synchronous error from Execute below can
85+
// still be surfaced as a structured JSON error response.
8486
err = codeRunner.Execute(runCodeRequest)
8587
if err != nil {
8688
recordExecution("failure")

components/execd/pkg/web/controller/sse.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,21 @@ var sseHeaders = map[string]string{
3636
"X-Accel-Buffering": "no",
3737
}
3838

39+
// setupSSEResponse is idempotent: once headers are committed, subsequent calls
40+
// no-op. Callers that need the headers up front (e.g. long-running streaming
41+
// endpoints with no early-error path) can call it explicitly. Endpoints that
42+
// may fail synchronously before any event fires should leave header commit to
43+
// the lazy path inside writeSingleEvent so pre-execution errors can return a
44+
// proper JSON body instead of a half-formed text/event-stream response.
3945
func (c *basicController) setupSSEResponse() {
40-
for key, value := range sseHeaders {
41-
c.ctx.Writer.Header().Set(key, value)
42-
}
43-
if flusher, ok := c.ctx.Writer.(http.Flusher); ok {
44-
flusher.Flush()
45-
}
46+
c.sseSetupOnce.Do(func() {
47+
for key, value := range sseHeaders {
48+
c.ctx.Writer.Header().Set(key, value)
49+
}
50+
if flusher, ok := c.ctx.Writer.(http.Flusher); ok {
51+
flusher.Flush()
52+
}
53+
})
4654
}
4755

4856
// setServerEventsHandler adapts runtime callbacks to SSE events.
@@ -167,6 +175,10 @@ func (c *CodeInterpretingController) writeSingleEvent(handler string, data []byt
167175

168176
c.chunkWriter.Lock()
169177
defer c.chunkWriter.Unlock()
178+
// Lazily commit SSE response headers on the first event. This lets the
179+
// surrounding handler return a proper JSON error via RespondError if the
180+
// runtime fails synchronously before any event fires.
181+
c.setupSSEResponse()
170182
defer func() {
171183
if flusher, ok := c.ctx.Writer.(http.Flusher); ok {
172184
flusher.Flush()

tests/csharp/OpenSandbox.E2ETests/SandboxE2ETests.cs

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ public async Task Sandbox_Create_With_NetworkPolicy_Get_And_Patch_Egress()
163163

164164
try
165165
{
166-
await Task.Delay(5000);
166+
await WaitUntilEgressBlocksAsync(policySandbox, "https://www.github.com", TimeSpan.FromSeconds(30));
167167

168168
var initialPolicy = await policySandbox.GetEgressPolicyAsync();
169169
Assert.NotNull(initialPolicy);
@@ -184,7 +184,7 @@ await policySandbox.PatchEgressRulesAsync(new List<NetworkRule>
184184
new() { Action = NetworkRuleAction.Allow, Target = "www.github.com" },
185185
new() { Action = NetworkRuleAction.Deny, Target = "pypi.org" }
186186
});
187-
await Task.Delay(2000);
187+
await WaitUntilEgressBlocksAsync(policySandbox, "https://pypi.org", TimeSpan.FromSeconds(30));
188188

189189
var patchedPolicy = await policySandbox.GetEgressPolicyAsync();
190190
Assert.NotNull(patchedPolicy.Egress);
@@ -233,7 +233,7 @@ public async Task Sandbox_Create_With_NetworkPolicy_Get_And_Patch_Egress_Via_Ser
233233

234234
try
235235
{
236-
await Task.Delay(5000);
236+
await WaitUntilEgressBlocksAsync(policySandbox, "https://www.github.com", TimeSpan.FromSeconds(30));
237237

238238
var egressEndpoint = await policySandbox.GetEndpointAsync(Constants.DefaultEgressPort);
239239
Assert.Contains(
@@ -259,7 +259,7 @@ await policySandbox.PatchEgressRulesAsync(new List<NetworkRule>
259259
new() { Action = NetworkRuleAction.Allow, Target = "www.github.com" },
260260
new() { Action = NetworkRuleAction.Deny, Target = "pypi.org" }
261261
});
262-
await Task.Delay(2000);
262+
await WaitUntilEgressBlocksAsync(policySandbox, "https://pypi.org", TimeSpan.FromSeconds(30));
263263

264264
var patchedPolicy = await policySandbox.GetEgressPolicyAsync();
265265
Assert.NotNull(patchedPolicy.Egress);
@@ -1112,6 +1112,35 @@ private static async Task<Execution> RunWithRetryAsync(Sandbox sandbox, string c
11121112
}
11131113
return result!;
11141114
}
1115+
1116+
/// <summary>
1117+
/// Polls curl against <paramref name="url"/> until the egress sidecar blocks
1118+
/// it (Execution.Error becomes non-null), or the timeout elapses. NetworkPolicy
1119+
/// sidecars sometimes accept connections before iptables/proxy rules apply,
1120+
/// so a fixed sleep is flaky.
1121+
/// </summary>
1122+
private static async Task WaitUntilEgressBlocksAsync(Sandbox sandbox, string url, TimeSpan timeout)
1123+
{
1124+
var deadline = DateTime.UtcNow + timeout;
1125+
Execution? last = null;
1126+
while (DateTime.UtcNow < deadline)
1127+
{
1128+
try
1129+
{
1130+
last = await sandbox.Commands.RunAsync($"curl -I {url}");
1131+
if (last?.Error != null)
1132+
{
1133+
return;
1134+
}
1135+
}
1136+
catch
1137+
{
1138+
// Transient SDK/SSE errors during sidecar warmup — keep polling.
1139+
}
1140+
await Task.Delay(500);
1141+
}
1142+
Assert.Fail($"Egress policy did not block {url} within {timeout} (last error={last?.Error?.ToString() ?? "null"})");
1143+
}
11151144
}
11161145

11171146
public sealed class SandboxE2ETestFixture : IAsyncLifetime

tests/go/e2e_test.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,16 @@ func TestE2E_FullLifecycle(t *testing.T) {
117117
}
118118
execClient := opensandbox.NewExecdClient(execdURL, execToken)
119119

120-
err = execClient.Ping(ctx)
121-
require.NoError(t, err)
120+
// This test bypasses the SDK's high-level CreateSandbox helper (which calls
121+
// WaitUntilReady) and pings execd directly through the server-side proxy.
122+
// The state-Running flag is satisfied as soon as the container is up, but
123+
// execd's HTTP routes may register a few ms later and the proxy can drop
124+
// the very first connection it sees ("connection reset by peer"). Poll
125+
// until ping succeeds — real users go through CreateSandbox which already
126+
// handles this.
127+
require.Eventually(t, func() bool {
128+
return execClient.Ping(ctx) == nil
129+
}, 30*time.Second, 500*time.Millisecond, "execd ping never succeeded")
122130
t.Log("Execd ping: OK")
123131

124132
// 6. Test Execd — run a command with SSE streaming
@@ -131,7 +139,6 @@ func TestE2E_FullLifecycle(t *testing.T) {
131139
return nil
132140
})
133141
require.NoError(t, err)
134-
t.Logf("Command raw output (%d bytes): %q", output.Len(), output.String())
135142

136143
// 7. Test Execd — file operations
137144
fileInfoMap, err := execClient.GetFileInfo(ctx, "/etc/os-release")

tests/java/src/test/java/com/alibaba/opensandbox/e2e/SandboxE2ETest.java

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -270,11 +270,10 @@ void testSandboxCreateWithNetworkPolicy() {
270270
.readyTimeout(Duration.ofSeconds(60))
271271
.networkPolicy(networkPolicy)
272272
.build();
273-
// Wait for NetworkPolicy sidecar to be fully initialized
274-
try {
275-
Thread.sleep(2000);
276-
} catch (InterruptedException ignored) {
277-
}
273+
// Wait for NetworkPolicy sidecar to be fully initialized.
274+
// The sidecar may accept the sandbox before iptables/proxy rules apply,
275+
// so poll a denied target until the policy actually blocks it.
276+
waitUntilEgressBlocks(policySandbox, "https://www.github.com", Duration.ofSeconds(30));
278277

279278
try {
280279
NetworkPolicy initialPolicy = policySandbox.getEgressPolicy();
@@ -319,10 +318,8 @@ void testSandboxCreateWithNetworkPolicy() {
319318
.target("pypi.org")
320319
.build()));
321320

322-
try {
323-
Thread.sleep(2000);
324-
} catch (InterruptedException ignored) {
325-
}
321+
// Poll until the patched rule takes effect (pypi now blocked).
322+
waitUntilEgressBlocks(policySandbox, "https://pypi.org", Duration.ofSeconds(30));
326323

327324
NetworkPolicy patchedPolicy = policySandbox.getEgressPolicy();
328325
assertNotNull(patchedPolicy);
@@ -393,10 +390,8 @@ void testSandboxCreateWithNetworkPolicyViaServerProxy() {
393390
.readyTimeout(Duration.ofSeconds(60))
394391
.networkPolicy(networkPolicy)
395392
.build();
396-
try {
397-
Thread.sleep(2000);
398-
} catch (InterruptedException ignored) {
399-
}
393+
// Wait for NetworkPolicy sidecar/iptables rules to be active.
394+
waitUntilEgressBlocks(policySandbox, "https://www.github.com", Duration.ofSeconds(30));
400395

401396
try {
402397
SandboxEndpoint egressEndpoint = policySandbox.getEndpoint(18080);
@@ -447,10 +442,8 @@ void testSandboxCreateWithNetworkPolicyViaServerProxy() {
447442
.target("pypi.org")
448443
.build()));
449444

450-
try {
451-
Thread.sleep(2000);
452-
} catch (InterruptedException ignored) {
453-
}
445+
// Poll until patched rule applied (pypi now blocked).
446+
waitUntilEgressBlocks(policySandbox, "https://pypi.org", Duration.ofSeconds(30));
454447

455448
NetworkPolicy patchedPolicy = policySandbox.getEgressPolicy();
456449
assertNotNull(patchedPolicy.getEgress());
@@ -1597,4 +1590,33 @@ private Execution runWithRetry(Sandbox sandbox, String command, int maxAttempts,
15971590
}
15981591
return result;
15991592
}
1593+
1594+
/**
1595+
* Polls the sandbox running curl until the given URL is blocked by the
1596+
* network policy. Returns once curl reports an error (egress active), or
1597+
* fails the test if the timeout elapses.
1598+
*/
1599+
private void waitUntilEgressBlocks(Sandbox sandbox, String url, Duration timeout) {
1600+
long deadline = System.currentTimeMillis() + timeout.toMillis();
1601+
Execution last = null;
1602+
while (System.currentTimeMillis() < deadline) {
1603+
try {
1604+
last = sandbox.commands().run(
1605+
RunCommandRequest.builder().command("curl -I " + url).build());
1606+
if (last != null && last.getError() != null) {
1607+
return;
1608+
}
1609+
} catch (Exception ignored) {
1610+
// Transient SDK/SSE errors during sidecar warmup — keep polling.
1611+
}
1612+
try {
1613+
Thread.sleep(500);
1614+
} catch (InterruptedException e) {
1615+
Thread.currentThread().interrupt();
1616+
break;
1617+
}
1618+
}
1619+
fail("Egress policy did not block " + url + " within " + timeout
1620+
+ " (last execution error=" + (last == null ? "null" : last.getError()) + ")");
1621+
}
16001622
}

0 commit comments

Comments
 (0)