diff --git a/.github/workflows/execd-test.yml b/.github/workflows/execd-test.yml index 729e2d0f4..0db55542f 100644 --- a/.github/workflows/execd-test.yml +++ b/.github/workflows/execd-test.yml @@ -109,6 +109,8 @@ jobs: sleep 5 python3 tests/smoke_api.py + python3 tests/command_resume_test.py + - name: SIGTERM forward test if: matrix.os == 'ubuntu-latest' working-directory: components/execd diff --git a/components/execd/RELEASE_NOTES.md b/components/execd/RELEASE_NOTES.md index e732438a1..2b7895efb 100644 --- a/components/execd/RELEASE_NOTES.md +++ b/components/execd/RELEASE_NOTES.md @@ -109,8 +109,8 @@ Thanks to these contributors ❤️ - @csdbianhua --- -- Docker Hub: opensandbox/execd:v1.0.9 -- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9 +- Docker Hub: opensandbox/execd:v1.0.8 +- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.8 # components/execd 1.0.7 @@ -144,8 +144,8 @@ Thanks to these contributors ❤️ - @dependabot --- -- Docker Hub: opensandbox/execd:v1.0.9 -- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9 +- Docker Hub: opensandbox/execd:v1.0.7 +- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.7 # components/execd 1.0.6 @@ -167,8 +167,8 @@ Thanks to these contributors ❤️ - @dependabot --- -- Docker Hub: opensandbox/execd:v1.0.9 -- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9 +- Docker Hub: opensandbox/execd:v1.0.6 +- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.6 # components/execd 1.0.5 @@ -185,8 +185,8 @@ Thanks to these contributors ❤️ - @Pangjiping --- -- Docker Hub: opensandbox/execd:v1.0.9 -- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9 +- Docker Hub: opensandbox/execd:v1.0.5 +- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.5 # components/execd 1.0.4 @@ -209,8 +209,8 @@ Thanks to these contributors ❤️ - @ninan-nn --- -- Docker Hub: opensandbox/execd:v1.0.9 -- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9 +- Docker Hub: opensandbox/execd:v1.0.4 +- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.4 # components/execd 1.0.3 @@ -233,8 +233,8 @@ Thanks to these contributors ❤️ - @jwx0925 --- -- Docker Hub: opensandbox/execd:v1.0.9 -- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9 +- Docker Hub: opensandbox/execd:v1.0.3 +- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.3 # components/execd 1.0.2 @@ -262,8 +262,8 @@ Thanks to these contributors ❤️ - @ninan-nn --- -- Docker Hub: opensandbox/execd:v1.0.9 -- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9 +- Docker Hub: opensandbox/execd:v1.0.2 +- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.2 # components/execd 1.0.1 @@ -295,8 +295,8 @@ Thanks to these contributors ❤️ - @jwx0925 --- -- Docker Hub: opensandbox/execd:v1.0.9 -- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9 +- Docker Hub: opensandbox/execd:v1.0.1 +- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.1 # components/execd 1.0.0 diff --git a/components/execd/pkg/runtime/bash_session.go b/components/execd/pkg/runtime/bash_session.go index 98f7afd48..94b89a34e 100644 --- a/components/execd/pkg/runtime/bash_session.go +++ b/components/execd/pkg/runtime/bash_session.go @@ -252,7 +252,7 @@ func (s *bashSession) run(ctx context.Context, request *ExecuteCodeRequest) erro continue } if request.Hooks.OnExecuteStdout != nil { - request.Hooks.OnExecuteStdout(line) + request.Hooks.OnExecuteStdout(request.NextEventID(), line) } } } diff --git a/components/execd/pkg/runtime/bash_session_test.go b/components/execd/pkg/runtime/bash_session_test.go index d054f0fa5..eb1884ef4 100644 --- a/components/execd/pkg/runtime/bash_session_test.go +++ b/components/execd/pkg/runtime/bash_session_test.go @@ -56,7 +56,7 @@ func TestBashSession_NonZeroExitEmitsError(t *testing.T) { Timeout: 5 * time.Second, Hooks: ExecuteResultHook{ OnExecuteInit: func(s string) { sessionID = s }, - OnExecuteStdout: func(s string) { stdoutLine = s }, + OnExecuteStdout: func(_ int64, s string) { stdoutLine = s }, OnExecuteError: func(err *execute.ErrorOutput) { errCh <- err }, OnExecuteComplete: func(_ time.Duration) { completeCh <- struct{}{} @@ -105,7 +105,7 @@ func TestBashSession_envAndExitCode(t *testing.T) { require.Equal(t, session.config.Session, ctx, "unexpected session in OnExecuteInit") initCalls++ }, - OnExecuteStdout: func(text string) { + OnExecuteStdout: func(_ int64, text string) { t.Log(text) stdoutLines = append(stdoutLines, text) }, @@ -178,7 +178,7 @@ func TestBashSession_envLargeOutputChained(t *testing.T) { require.Equal(t, session.config.Session, ctx, "unexpected session in OnExecuteInit") initCalls++ }, - OnExecuteStdout: func(text string) { + OnExecuteStdout: func(_ int64, text string) { t.Log(text) stdoutLines = append(stdoutLines, text) }, @@ -223,7 +223,7 @@ func TestBashSession_cwdPersistsWithoutOverride(t *testing.T) { targetDir := t.TempDir() var stdoutLines []string hooks := ExecuteResultHook{ - OnExecuteStdout: func(line string) { + OnExecuteStdout: func(_ int64, line string) { stdoutLines = append(stdoutLines, line) }, } @@ -265,7 +265,7 @@ func TestBashSession_requestCwdOverridesAfterCd(t *testing.T) { var stdoutLines []string hooks := ExecuteResultHook{ - OnExecuteStdout: func(line string) { + OnExecuteStdout: func(_ int64, line string) { stdoutLines = append(stdoutLines, line) }, } @@ -307,7 +307,7 @@ func TestBashSession_envDumpNotLeakedWhenNoTrailingNewline(t *testing.T) { var stdoutLines []string hooks := ExecuteResultHook{ - OnExecuteStdout: func(line string) { + OnExecuteStdout: func(_ int64, line string) { stdoutLines = append(stdoutLines, line) }, } @@ -335,7 +335,7 @@ func TestBashSession_envDumpNotLeakedWhenNoOutput(t *testing.T) { var stdoutLines []string hooks := ExecuteResultHook{ - OnExecuteStdout: func(line string) { + OnExecuteStdout: func(_ int64, line string) { stdoutLines = append(stdoutLines, line) }, } @@ -366,7 +366,7 @@ func TestBashSession_heredoc(t *testing.T) { t.Cleanup(func() { _ = controller.DeleteBashSession(sessionID) }) hooks := ExecuteResultHook{ - OnExecuteStdout: func(line string) { + OnExecuteStdout: func(_ int64, line string) { fmt.Printf("[stdout] %s\n", line) }, OnExecuteComplete: func(d time.Duration) { @@ -419,7 +419,7 @@ func TestBashSession_execReplacesShell(t *testing.T) { var stdoutLines []string hooks := ExecuteResultHook{ - OnExecuteStdout: func(line string) { + OnExecuteStdout: func(_ int64, line string) { stdoutLines = append(stdoutLines, line) }, } @@ -459,7 +459,7 @@ func TestBashSession_complexExec(t *testing.T) { var stdoutLines []string hooks := ExecuteResultHook{ - OnExecuteStdout: func(line string) { + OnExecuteStdout: func(_ int64, line string) { stdoutLines = append(stdoutLines, line) }, } diff --git a/components/execd/pkg/runtime/command.go b/components/execd/pkg/runtime/command.go index 893956366..55ab6e4fd 100644 --- a/components/execd/pkg/runtime/command.go +++ b/components/execd/pkg/runtime/command.go @@ -134,11 +134,11 @@ func (c *Controller) runCommand(ctx context.Context, request *ExecuteCodeRequest wg.Add(2) safego.Go(func() { defer wg.Done() - c.tailStdPipe(stdoutPath, request.Hooks.OnExecuteStdout, done) + c.tailStdPipe(stdoutPath, request.wrapStdoutPipeHook(), done) }) safego.Go(func() { defer wg.Done() - c.tailStdPipe(stderrPath, request.Hooks.OnExecuteStderr, done) + c.tailStdPipe(stderrPath, request.wrapStderrPipeHook(), done) }) err = cmd.Start() diff --git a/components/execd/pkg/runtime/command_test.go b/components/execd/pkg/runtime/command_test.go index a6207fc3b..1eab21509 100644 --- a/components/execd/pkg/runtime/command_test.go +++ b/components/execd/pkg/runtime/command_test.go @@ -133,10 +133,10 @@ func TestRunCommand_Echo(t *testing.T) { Timeout: 5 * time.Second, Hooks: ExecuteResultHook{ OnExecuteInit: func(s string) { sessionID = s }, - OnExecuteStdout: func(s string) { + OnExecuteStdout: func(_ int64, s string) { stdoutLines = append(stdoutLines, s) }, - OnExecuteStderr: func(s string) { + OnExecuteStderr: func(_ int64, s string) { stderrLines = append(stderrLines, s) }, OnExecuteError: func(err *execute.ErrorOutput) { @@ -188,8 +188,8 @@ func TestRunCommand_Error(t *testing.T) { Timeout: 5 * time.Second, Hooks: ExecuteResultHook{ OnExecuteInit: func(s string) { sessionID = s }, - OnExecuteStdout: func(s string) { stdoutLines = append(stdoutLines, s) }, - OnExecuteStderr: func(s string) { stderrLines = append(stderrLines, s) }, + OnExecuteStdout: func(_ int64, s string) { stdoutLines = append(stdoutLines, s) }, + OnExecuteStderr: func(_ int64, s string) { stderrLines = append(stderrLines, s) }, OnExecuteError: func(err *execute.ErrorOutput) { gotErr = err completeCh <- struct{}{} @@ -241,8 +241,8 @@ func TestRunCommand_ExpandsHomeInCwd(t *testing.T) { Timeout: 5 * time.Second, Hooks: ExecuteResultHook{ OnExecuteInit: func(_ string) {}, - OnExecuteStdout: func(s string) { stdoutLines = append(stdoutLines, s) }, - OnExecuteStderr: func(_ string) {}, + OnExecuteStdout: func(_ int64, s string) { stdoutLines = append(stdoutLines, s) }, + OnExecuteStderr: func(_ int64, _ string) {}, OnExecuteError: func(err *execute.ErrorOutput) { require.Failf(t, "unexpected error hook", "%+v", err) }, @@ -303,8 +303,8 @@ func TestRunCommand_ExpandsCwdFromRequestEnvWithHigherPriority(t *testing.T) { }, Hooks: ExecuteResultHook{ OnExecuteInit: func(_ string) {}, - OnExecuteStdout: func(s string) { stdoutLines = append(stdoutLines, s) }, - OnExecuteStderr: func(_ string) {}, + OnExecuteStdout: func(_ int64, s string) { stdoutLines = append(stdoutLines, s) }, + OnExecuteStderr: func(_ int64, _ string) {}, OnExecuteError: func(err *execute.ErrorOutput) { gotErr = err }, diff --git a/components/execd/pkg/runtime/command_windows.go b/components/execd/pkg/runtime/command_windows.go index a3e418fb4..cf07bdd3a 100644 --- a/components/execd/pkg/runtime/command_windows.go +++ b/components/execd/pkg/runtime/command_windows.go @@ -58,10 +58,10 @@ func (c *Controller) runCommand(ctx context.Context, request *ExecuteCodeRequest done := make(chan struct{}, 1) safego.Go(func() { - c.tailStdPipe(c.stdoutFileName(session), request.Hooks.OnExecuteStdout, done) + c.tailStdPipe(c.stdoutFileName(session), request.wrapStdoutPipeHook(), done) }) safego.Go(func() { - c.tailStdPipe(c.stderrFileName(session), request.Hooks.OnExecuteStderr, done) + c.tailStdPipe(c.stderrFileName(session), request.wrapStderrPipeHook(), done) }) err = cmd.Start() diff --git a/components/execd/pkg/runtime/jupyter.go b/components/execd/pkg/runtime/jupyter.go index f4732d515..1dff6a9e2 100644 --- a/components/execd/pkg/runtime/jupyter.go +++ b/components/execd/pkg/runtime/jupyter.go @@ -122,9 +122,16 @@ func dispatchExecutionResultHooks(request *ExecuteCodeRequest, result *execute.E for _, stream := range result.Stream { switch stream.Name { case execute.StreamStdout: - request.Hooks.OnExecuteStdout(stream.Text) + if stream.Text != "" && request.Hooks.OnExecuteStdout != nil { + eid := request.NextEventID() + request.Hooks.OnExecuteStdout(eid, stream.Text) + } case execute.StreamStderr: - request.Hooks.OnExecuteStderr(stream.Text) + if stream.Text != "" && request.Hooks.OnExecuteStderr != nil { + eid := request.NextEventID() + request.Hooks.OnExecuteStderr(eid, stream.Text) + } + default: } } } diff --git a/components/execd/pkg/runtime/types.go b/components/execd/pkg/runtime/types.go index cd0615c63..a5ce7cb95 100644 --- a/components/execd/pkg/runtime/types.go +++ b/components/execd/pkg/runtime/types.go @@ -17,6 +17,7 @@ package runtime import ( "fmt" "sync" + "sync/atomic" "time" "github.com/alibaba/opensandbox/execd/pkg/jupyter/execute" @@ -27,8 +28,8 @@ type ExecuteResultHook struct { OnExecuteInit func(context string) OnExecuteResult func(result map[string]any, count int) OnExecuteStatus func(status string) - OnExecuteStdout func(stdout string) //nolint:predeclared - OnExecuteStderr func(stderr string) //nolint:predeclared + OnExecuteStdout func(eid int64, stdout string) //nolint:predeclared + OnExecuteStderr func(eid int64, stderr string) //nolint:predeclared OnExecuteError func(err *execute.ErrorOutput) OnExecuteComplete func(executionTime time.Duration) } @@ -44,6 +45,35 @@ type ExecuteCodeRequest struct { Uid *uint32 `json:"uid,omitempty"` Gid *uint32 `json:"gid,omitempty"` Hooks ExecuteResultHook + + eventSeq atomic.Uint64 +} + +func (req *ExecuteCodeRequest) NextEventID() int64 { + if req == nil { + return 0 + } + return int64(req.eventSeq.Add(1)) +} + +func (req *ExecuteCodeRequest) wrapStdoutPipeHook() func(string) { + return func(text string) { + if text == "" || req.Hooks.OnExecuteStdout == nil { + return + } + eid := req.NextEventID() + req.Hooks.OnExecuteStdout(eid, text) + } +} + +func (req *ExecuteCodeRequest) wrapStderrPipeHook() func(string) { + return func(text string) { + if text == "" || req.Hooks.OnExecuteStderr == nil { + return + } + eid := req.NextEventID() + req.Hooks.OnExecuteStderr(eid, text) + } } // SetDefaultHooks installs stdout logging fallbacks for unset hooks. @@ -55,10 +85,10 @@ func (req *ExecuteCodeRequest) SetDefaultHooks() { req.Hooks.OnExecuteStatus = func(status string) { fmt.Printf("OnExecuteStatus: %s\n", status) } } if req.Hooks.OnExecuteStdout == nil { - req.Hooks.OnExecuteStdout = func(stdout string) { fmt.Printf("OnExecuteStdout: %s\n", stdout) } + req.Hooks.OnExecuteStdout = func(eid int64, stdout string) { fmt.Printf("OnExecuteStdout: eid=%d %s\n", eid, stdout) } } if req.Hooks.OnExecuteStderr == nil { - req.Hooks.OnExecuteStderr = func(stderr string) { fmt.Printf("OnExecuteStderr: %s\n", stderr) } + req.Hooks.OnExecuteStderr = func(eid int64, stderr string) { fmt.Printf("OnExecuteStderr: eid=%d %s\n", eid, stderr) } } if req.Hooks.OnExecuteError == nil { req.Hooks.OnExecuteError = func(err *execute.ErrorOutput) { fmt.Printf("OnExecuteError: %++v\n", err) } diff --git a/components/execd/pkg/sbuf/config.go b/components/execd/pkg/sbuf/config.go new file mode 100644 index 000000000..12dda926d --- /dev/null +++ b/components/execd/pkg/sbuf/config.go @@ -0,0 +1,37 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sbuf + +// Config controls per-stream bounds and append policy. +type Config struct { + // MaxEvents is the maximum number of events retained per stream. Oldest events are dropped when exceeded. + // Zero defaults to DefaultMaxEvents. + MaxEvents int + // MaxBytes is the approximate upper bound on total payload bytes per stream (sum of len(Payload)). + // Oldest events are dropped until under the limit. Zero means no byte limit. + MaxBytes int64 + // StrictMonotonic rejects Append when eid <= last eid for that stream. Recommended for execd SSE eids. + StrictMonotonic bool +} + +const DefaultMaxEvents = 1024 + +func (c *Config) normalized() Config { + out := *c + if out.MaxEvents <= 0 { + out.MaxEvents = DefaultMaxEvents + } + return out +} diff --git a/components/execd/pkg/sbuf/errors.go b/components/execd/pkg/sbuf/errors.go new file mode 100644 index 000000000..8a37ac6e5 --- /dev/null +++ b/components/execd/pkg/sbuf/errors.go @@ -0,0 +1,23 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sbuf + +import "errors" + +var ( + // ErrOutOfOrder is returned when StrictMonotonic is enabled and eid is not greater than the last appended eid. + ErrOutOfOrder = errors.New("sbuf: eid out of order for stream") + ErrEmptyStreamID = errors.New("sbuf: empty stream id") +) diff --git a/components/execd/pkg/sbuf/event.go b/components/execd/pkg/sbuf/event.go new file mode 100644 index 000000000..d65219e83 --- /dev/null +++ b/components/execd/pkg/sbuf/event.go @@ -0,0 +1,21 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sbuf + +// Event is one stored record (e.g. a single SSE JSON line body). Payload is owned by the buffer after Append. +type Event struct { + EID int64 + Payload []byte +} diff --git a/components/execd/pkg/sbuf/ring.go b/components/execd/pkg/sbuf/ring.go new file mode 100644 index 000000000..37426328a --- /dev/null +++ b/components/execd/pkg/sbuf/ring.go @@ -0,0 +1,91 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sbuf + +// ring is a FIFO queue with a fixed max length; push drops oldest when full. +type ring struct { + maxLen int + slots []eventSlot + head int + n int + bytes int64 +} + +type eventSlot struct { + eid int64 + payload []byte +} + +func newRing(maxLen int) *ring { + if maxLen < 1 { + maxLen = 1 + } + return &ring{ + maxLen: maxLen, + slots: make([]eventSlot, maxLen), + } +} + +func (r *ring) push(eid int64, payload []byte, maxBytes int64) { + pld := append([]byte(nil), payload...) + size := int64(len(pld)) + + if r.n == r.maxLen { + r.evictHead() + } + idx := (r.head + r.n) % r.maxLen + r.slots[idx] = eventSlot{eid: eid, payload: pld} + r.n++ + r.bytes += size + + if maxBytes > 0 { + for r.bytes > maxBytes && r.n > 0 { + r.evictHead() + } + } +} + +func (r *ring) evictHead() { + if r.n == 0 { + return + } + old := r.slots[r.head] + r.bytes -= int64(len(old.payload)) + r.slots[r.head] = eventSlot{} + r.head = (r.head + 1) % r.maxLen + r.n-- +} + +func (r *ring) iterAfter(afterEid int64, fn func(eid int64, payload []byte)) { + for i := range r.n { + idx := (r.head + i) % r.maxLen + s := r.slots[idx] + if s.eid > afterEid { + fn(s.eid, s.payload) + } + } +} + +// snapshotAfter returns a copy slice for safe iteration outside the ring lock. +func (r *ring) snapshotAfter(afterEid int64) []Event { + var out []Event + r.iterAfter(afterEid, func(eid int64, payload []byte) { + out = append(out, Event{ + EID: eid, + Payload: append([]byte(nil), payload...), + }) + }) + return out +} diff --git a/components/execd/pkg/sbuf/store.go b/components/execd/pkg/sbuf/store.go new file mode 100644 index 000000000..ead4666a4 --- /dev/null +++ b/components/execd/pkg/sbuf/store.go @@ -0,0 +1,108 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package sbuf provides bounded, per-stream FIFO buffers for SSE (or similar) events keyed by eid, +// used to serve disconnect resume (catch-up by event id). +// It is storage-only: callers assign eids and decide when to delete a stream. +package sbuf + +import ( + "sync" +) + +// Store holds bounded event rings keyed by caller-defined stream IDs (e.g. command execution id). +type Store struct { + cfg Config + mu sync.Mutex + streams map[string]*streamBuf +} + +type streamBuf struct { + mu sync.Mutex + lastEid int64 + ring *ring + maxBytes int64 +} + +// NewStore creates an empty store. cfg is copied after normalization. +func NewStore(cfg Config) *Store { + cfg = cfg.normalized() + return &Store{ + cfg: cfg, + streams: make(map[string]*streamBuf), + } +} + +// Append adds one event to the stream's ring. Payload is copied. +// With StrictMonotonic, returns ErrOutOfOrder if eid <= previous eid for this stream. +func (s *Store) Append(streamID string, eid int64, payload []byte) error { + if streamID == "" { + return ErrEmptyStreamID + } + sb := s.getOrCreate(streamID) + sb.mu.Lock() + defer sb.mu.Unlock() + + if s.cfg.StrictMonotonic { + if eid <= sb.lastEid { + return ErrOutOfOrder + } + } + sb.lastEid = eid + sb.ring.push(eid, payload, sb.maxBytes) + return nil +} + +func (s *Store) getOrCreate(streamID string) *streamBuf { + s.mu.Lock() + defer s.mu.Unlock() + if sb, ok := s.streams[streamID]; ok { + return sb + } + sb := &streamBuf{ + ring: newRing(s.cfg.MaxEvents), + maxBytes: s.cfg.MaxBytes, + } + s.streams[streamID] = sb + return sb +} + +// EventsAfter returns a snapshot of events with EID > afterEid in order. +// If the stream does not exist, ok is false and events is nil. +func (s *Store) EventsAfter(streamID string, afterEid int64) (events []Event, ok bool) { + s.mu.Lock() + sb, found := s.streams[streamID] + s.mu.Unlock() + if !found { + return nil, false + } + sb.mu.Lock() + defer sb.mu.Unlock() + return sb.ring.snapshotAfter(afterEid), true +} + +// Delete removes a stream buffer. No-op if missing. +func (s *Store) Delete(streamID string) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.streams, streamID) +} + +// Has reports whether a stream currently exists. +func (s *Store) Has(streamID string) bool { + s.mu.Lock() + defer s.mu.Unlock() + _, ok := s.streams[streamID] + return ok +} diff --git a/components/execd/pkg/sbuf/store_benchmark_test.go b/components/execd/pkg/sbuf/store_benchmark_test.go new file mode 100644 index 000000000..e769db981 --- /dev/null +++ b/components/execd/pkg/sbuf/store_benchmark_test.go @@ -0,0 +1,136 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sbuf + +import ( + "fmt" + "sync/atomic" + "testing" +) + +// payload used across benchmarks (typical SSE JSON line order of magnitude). +var benchPayload = []byte(`{"type":"stdout","eid":1,"text":"hello","timestamp":0}`) + +// BenchmarkRing_pushNoEvict measures ring.push when the ring is not full (no evictHead). +func BenchmarkRing_pushNoEvict(b *testing.B) { + r := newRing(1 << 20) + b.SetBytes(int64(len(benchPayload))) + var eid atomic.Int64 + b.ResetTimer() + for i := 0; i < b.N; i++ { + r.push(eid.Add(1), benchPayload, 0) + } +} + +// BenchmarkRing_pushWithEvict measures push when the ring stays at capacity (each push may evict oldest). +func BenchmarkRing_pushWithEvict(b *testing.B) { + const cap = 64 + r := newRing(cap) + // Fill ring so every subsequent push evicts one slot. + for i := int64(1); i <= cap; i++ { + r.push(i, benchPayload, 0) + } + b.SetBytes(int64(len(benchPayload))) + var eid atomic.Int64 + eid.Store(cap) + b.ResetTimer() + for i := 0; i < b.N; i++ { + r.push(eid.Add(1), benchPayload, 0) + } +} + +// BenchmarkStore_Append_noEvict is Append on a warm stream with a large MaxEvents (no ring eviction). +func BenchmarkStore_Append_noEvict(b *testing.B) { + s := NewStore(Config{MaxEvents: 1 << 20, StrictMonotonic: true}) + if err := s.Append("s", 1, benchPayload); err != nil { + b.Fatal(err) + } + var eid int64 = 1 + b.SetBytes(int64(len(benchPayload))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + eid++ + if err := s.Append("s", eid, benchPayload); err != nil { + b.Fatal(err) + } + } +} + +// BenchmarkStore_Append_evicting keeps a small ring so almost every Append evicts the oldest event. +func BenchmarkStore_Append_evicting(b *testing.B) { + s := NewStore(Config{MaxEvents: 64, StrictMonotonic: true}) + if err := s.Append("s", 1, benchPayload); err != nil { + b.Fatal(err) + } + var eid int64 = 1 + b.SetBytes(int64(len(benchPayload))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + eid++ + if err := s.Append("s", eid, benchPayload); err != nil { + b.Fatal(err) + } + } +} + +// BenchmarkStore_EventsAfter measures snapshot copy cost after many appends. +func BenchmarkStore_EventsAfter(b *testing.B) { + const n = 1000 + s := NewStore(Config{MaxEvents: n + 10, StrictMonotonic: true}) + for i := int64(1); i <= n; i++ { + if err := s.Append("s", i, benchPayload); err != nil { + b.Fatal(err) + } + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = s.EventsAfter("s", 0) + } +} + +// BenchmarkStore_Append_ParallelDifferentStreams: one stream per goroutine (minimal lock contention on streamBuf). +func BenchmarkStore_Append_ParallelDifferentStreams(b *testing.B) { + s := NewStore(Config{MaxEvents: 1 << 16, StrictMonotonic: true}) + b.SetBytes(int64(len(benchPayload))) + var id atomic.Int64 + b.RunParallel(func(pb *testing.PB) { + // Unique stream id per goroutine iteration batch. + my := id.Add(1) + sid := fmt.Sprintf("s-%d", my) + var e int64 + for pb.Next() { + e++ + if err := s.Append(sid, e, benchPayload); err != nil { + b.Fatal(err) + } + } + }) +} + +// BenchmarkStore_Append_ParallelSameStream: all goroutines append to one stream (serialized on streamBuf.mu). +// StrictMonotonic is off: parallel workers would observe eids out of arrival order if enforced. +func BenchmarkStore_Append_ParallelSameStream(b *testing.B) { + s := NewStore(Config{MaxEvents: 1 << 20, StrictMonotonic: false}) + var eid atomic.Int64 + b.SetBytes(int64(len(benchPayload))) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + n := eid.Add(1) + if err := s.Append("s", n, benchPayload); err != nil { + b.Fatal(err) + } + } + }) +} diff --git a/components/execd/pkg/sbuf/store_test.go b/components/execd/pkg/sbuf/store_test.go new file mode 100644 index 000000000..aeb1554d9 --- /dev/null +++ b/components/execd/pkg/sbuf/store_test.go @@ -0,0 +1,80 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sbuf + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestStore_EventsAfter_NotFound(t *testing.T) { + s := NewStore(Config{MaxEvents: 8, StrictMonotonic: true}) + ev, ok := s.EventsAfter("missing", 0) + require.False(t, ok) + require.Nil(t, ev) +} + +func TestStore_AppendStrictMonotonic(t *testing.T) { + s := NewStore(Config{MaxEvents: 8, StrictMonotonic: true}) + require.NoError(t, s.Append("stream-a", 1, []byte(`{"a":1}`))) + require.ErrorIs(t, s.Append("stream-a", 1, []byte(`dup`)), ErrOutOfOrder) + require.NoError(t, s.Append("stream-a", 2, []byte(`{"a":2}`))) + + ev, ok := s.EventsAfter("stream-a", 0) + require.True(t, ok) + require.Len(t, ev, 2) + require.Equal(t, int64(1), ev[0].EID) + require.Equal(t, `{"a":1}`, string(ev[0].Payload)) + + ev2, _ := s.EventsAfter("stream-a", 1) + require.Len(t, ev2, 1) + require.Equal(t, int64(2), ev2[0].EID) +} + +func TestStore_MaxEventsEvictsOldest(t *testing.T) { + s := NewStore(Config{MaxEvents: 3, StrictMonotonic: true}) + for i := int64(1); i <= 5; i++ { + require.NoError(t, s.Append("s", i, []byte{byte(i)})) + } + ev, ok := s.EventsAfter("s", 0) + require.True(t, ok) + require.Len(t, ev, 3) + require.Equal(t, int64(3), ev[0].EID) + require.Equal(t, byte(3), ev[0].Payload[0]) +} + +func TestStore_MaxBytesEvicts(t *testing.T) { + s := NewStore(Config{MaxEvents: 100, MaxBytes: 10, StrictMonotonic: true}) + require.NoError(t, s.Append("s", 1, []byte("1234567890"))) + require.NoError(t, s.Append("s", 2, []byte("1234567890"))) + ev, ok := s.EventsAfter("s", 0) + require.True(t, ok) + require.Len(t, ev, 1) + require.Equal(t, int64(2), ev[0].EID) +} + +func TestStore_Delete(t *testing.T) { + s := NewStore(Config{MaxEvents: 8, StrictMonotonic: true}) + require.NoError(t, s.Append("x", 1, []byte("a"))) + require.True(t, s.Has("x")) + s.Delete("x") + require.False(t, s.Has("x")) +} + +func TestStore_EmptyStreamID(t *testing.T) { + s := NewStore(Config{}) + require.ErrorIs(t, s.Append("", 1, nil), ErrEmptyStreamID) +} diff --git a/components/execd/pkg/web/controller/codeinterpreting.go b/components/execd/pkg/web/controller/codeinterpreting.go index 93680e3f4..2c955a8a1 100644 --- a/components/execd/pkg/web/controller/codeinterpreting.go +++ b/components/execd/pkg/web/controller/codeinterpreting.go @@ -21,6 +21,7 @@ import ( "io" "net/http" "sync" + "sync/atomic" "time" "github.com/gin-gonic/gin" @@ -44,6 +45,11 @@ type CodeInterpretingController struct { // chunkWriter serializes SSE event writes to prevent interleaved output. chunkWriter sync.Mutex + + resumeStreamMu sync.Mutex + resumeStreamID string + // resumeEnabled opts into disconnect resume (event buffer + live hub) for RunCommand. + resumeEnabled atomic.Bool } type codeExecutionRunner interface { @@ -145,7 +151,7 @@ func (c *CodeInterpretingController) RunCode() { }) } runCodeRequest := c.buildExecuteCodeRequest(request) - eventsHandler := c.setServerEventsHandler(ctx) + eventsHandler := c.setServerEventsHandler(ctx, runCodeRequest) // completeCh is closed when OnExecuteComplete fires, meaning the final SSE // event has been written and flushed. We only wait for this callback as a @@ -385,7 +391,7 @@ func (c *CodeInterpretingController) RunInSession() { close(completeCh) }) } - hooks := c.setServerEventsHandler(ctx) + hooks := c.setServerEventsHandler(ctx, runReq) origComplete := hooks.OnExecuteComplete hooks.OnExecuteComplete = func(executionTime time.Duration) { origComplete(executionTime) diff --git a/components/execd/pkg/web/controller/command.go b/components/execd/pkg/web/controller/command.go index f89e570f4..3a99d8487 100644 --- a/components/execd/pkg/web/controller/command.go +++ b/components/execd/pkg/web/controller/command.go @@ -16,6 +16,7 @@ package controller import ( "context" + "errors" "fmt" "net/http" "strconv" @@ -24,6 +25,7 @@ import ( "github.com/alibaba/opensandbox/execd/pkg/flag" "github.com/alibaba/opensandbox/execd/pkg/jupyter/execute" + "github.com/alibaba/opensandbox/execd/pkg/log" "github.com/alibaba/opensandbox/execd/pkg/runtime" "github.com/alibaba/opensandbox/execd/pkg/telemetry" "github.com/alibaba/opensandbox/execd/pkg/web/model" @@ -53,6 +55,11 @@ func (c *CodeInterpretingController) RunCommand() { ctx, cancel := context.WithCancel(c.ctx.Request.Context()) defer cancel() + c.resumeEnabled.Store(true) + defer func() { + deferResumeCleanup(c) + c.resumeEnabled.Store(false) + }() execStart := time.Now() var recordOnce sync.Once recordExecution := func(result string) { @@ -67,7 +74,7 @@ func (c *CodeInterpretingController) RunCommand() { } runCodeRequest := c.buildExecuteCommandRequest(request) - eventsHandler := c.setServerEventsHandler(ctx) + eventsHandler := c.setServerEventsHandler(ctx, runCodeRequest) origComplete := eventsHandler.OnExecuteComplete eventsHandler.OnExecuteComplete = func(executionTime time.Duration) { origComplete(executionTime) @@ -151,6 +158,80 @@ func (c *CodeInterpretingController) GetBackgroundCommandOutput() { c.ctx.String(http.StatusOK, "%s", output) } +// ResumeCommandStream sends buffered events after after_eid, then if the command is still running +// and no other client holds the live slot, streams further events until completion or client disconnect. +func (c *CodeInterpretingController) ResumeCommandStream() { + commandID := c.ctx.Param("id") + if commandID == "" { + c.RespondError(http.StatusBadRequest, model.ErrorCodeInvalidRequest, "missing command execution id") + return + } + afterEid := c.QueryInt64(c.ctx.Query(model.CommandResumeAfterEidQuery), 0) + + hub := commandStreams.getHub(commandID) + st, errSt := codeRunner.GetCommandStatus(commandID) + if errSt != nil && hub == nil { + c.RespondError(http.StatusNotFound, model.ErrorCodeInvalidRequest, errSt.Error()) + return + } + + events, bufferOK := resumeBuffer.EventsAfter(commandID, afterEid) + if !bufferOK && hub == nil { + c.RespondError(http.StatusNotFound, model.ErrorCodeInvalidRequest, "command stream resume buffer not available") + return + } + + if st != nil && st.Running && hub != nil && hub.isHolderAlive() { + c.RespondError( + http.StatusConflict, + model.ErrorCodeInvalidRequest, + "primary SSE stream is still active; disconnect it before resuming", + ) + return + } + + c.setupSSEResponse() + lastReplayMaxEid := afterEid + for _, ev := range events { + c.writeSingleEvent("ResumeBuffer", ev.Payload, false, fmt.Sprintf("buffer eid=%d", ev.EID), 0) + if ev.EID > lastReplayMaxEid { + lastReplayMaxEid = ev.EID + } + } + + st2, _ := codeRunner.GetCommandStatus(commandID) + if st2 == nil || !st2.Running { + if len(events) > 0 { + log.Info("resume stream: command_id=%s after_eid=%d snapshot_events=%d (replay only)", + commandID, afterEid, len(events)) + } + return + } + + hub = commandStreams.getHub(commandID) + if hub == nil { + return + } + + h, err := commandStreams.tryAttachResume(commandID, c.ctx.Writer, c.ctx.Request.Context()) + if err != nil { + if errors.Is(err, errLiveStreamPrimaryActive) { + log.Error("ResumeCommandStream: attach conflict after buffered history (another client may have attached)") + } + return + } + + // Catch up events appended while the snapshot slice was replayed (holder still nil); same mutex as writeFrame. + tailN := h.flushResumeTail(commandID, lastReplayMaxEid) + log.Info("resume stream: command_id=%s after_eid=%d snapshot_events=%d post_attach_tail=%d (live)", + commandID, afterEid, len(events), tailN) + + select { + case <-h.waitDone(): + case <-c.ctx.Request.Context().Done(): + } +} + func (c *CodeInterpretingController) buildExecuteCommandRequest(request model.RunCommandRequest) *runtime.ExecuteCodeRequest { timeout := time.Duration(request.TimeoutMs) * time.Millisecond if request.Background { diff --git a/components/execd/pkg/web/controller/command_stream.go b/components/execd/pkg/web/controller/command_stream.go new file mode 100644 index 000000000..6ec07d7d0 --- /dev/null +++ b/components/execd/pkg/web/controller/command_stream.go @@ -0,0 +1,237 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Bounded event buffer (sbuf) plus at-most-one live SSE writer per command id for disconnect resume; +// GET /command/:id/resume sends buffered events then may take over as the sole live consumer. + +package controller + +import ( + "bytes" + "context" + "errors" + "io" + "net/http" + "sync" + + "github.com/alibaba/opensandbox/execd/pkg/log" + "github.com/alibaba/opensandbox/execd/pkg/sbuf" +) + +var ( + resumeBuffer *sbuf.Store + + errLiveStreamPrimaryActive = errors.New("primary SSE stream is still active") + errLiveHubNotFound = errors.New("command live hub not found") +) + +func init() { + resumeBuffer = sbuf.NewStore(sbuf.Config{StrictMonotonic: true}) +} + +func deferResumeCleanup(c *CodeInterpretingController) { + c.resumeStreamMu.Lock() + id := c.resumeStreamID + c.resumeStreamID = "" + c.resumeStreamMu.Unlock() + if id == "" { + return + } + commandStreams.closeAndRemove(id) + resumeBuffer.Delete(id) + log.Info("command stream: hub and resume buffer cleaned up id=%s", id) +} + +// --- live SSE routing (mutually exclusive main vs resume) --- + +type streamRegistry struct { + mu sync.Mutex + m map[string]*streamHub +} + +var commandStreams = &streamRegistry{m: make(map[string]*streamHub)} + +type streamHub struct { + streamID string + mu sync.Mutex + holder *streamHolder + done chan struct{} +} + +type streamHolder struct { + writer http.ResponseWriter + ctx context.Context +} + +func (r *streamRegistry) registerPrimary(id string, w http.ResponseWriter, ctx context.Context) { + r.mu.Lock() + h := &streamHub{ + streamID: id, + done: make(chan struct{}), + } + r.m[id] = h + h.mu.Lock() + h.holder = &streamHolder{writer: w, ctx: ctx} + h.mu.Unlock() + r.mu.Unlock() + + log.Info("command stream: primary hub registered id=%s", id) + watchHolderRelease(h, ctx) +} + +// watchHolderRelease clears h.holder when ctx is cancelled. All holder mutations use h.mu only +// (see tryAttachResume, registerPrimary, writeFrame) so r.mu and h.mu are not split across h.holder. +func watchHolderRelease(h *streamHub, ctx context.Context) { + go func() { + <-ctx.Done() + h.mu.Lock() + defer h.mu.Unlock() + if h.holder != nil && h.holder.ctx == ctx { + h.holder = nil + } + }() +} + +func (r *streamRegistry) getHub(id string) *streamHub { + r.mu.Lock() + defer r.mu.Unlock() + return r.m[id] +} + +func (r *streamRegistry) closeAndRemove(id string) { + r.mu.Lock() + h := r.m[id] + delete(r.m, id) + r.mu.Unlock() + if h != nil { + h.closeDone() + } +} + +func (h *streamHub) closeDone() { + h.mu.Lock() + defer h.mu.Unlock() + select { + case <-h.done: + default: + close(h.done) + } +} + +func (h *streamHub) waitDone() <-chan struct{} { + return h.done +} + +func (h *streamHub) isHolderAlive() bool { + if h == nil { + return false + } + h.mu.Lock() + defer h.mu.Unlock() + return h.holder != nil && h.holder.ctx.Err() == nil +} + +func (r *streamRegistry) tryAttachResume(id string, w http.ResponseWriter, ctx context.Context) (*streamHub, error) { + r.mu.Lock() + h := r.m[id] + if h == nil { + r.mu.Unlock() + return nil, errLiveHubNotFound + } + h.mu.Lock() + if h.holder != nil && h.holder.ctx.Err() == nil { + h.mu.Unlock() + r.mu.Unlock() + return nil, errLiveStreamPrimaryActive + } + h.holder = &streamHolder{writer: w, ctx: ctx} + h.mu.Unlock() + r.mu.Unlock() + + watchHolderRelease(h, ctx) + return h, nil +} + +func (r *streamRegistry) writeSSE(id string, data []byte, bufEid int64, handler, summary string) { + r.mu.Lock() + h := r.m[id] + r.mu.Unlock() + if h == nil { + if bufEid > 0 { + _ = resumeBuffer.Append(id, bufEid, bytes.Clone(data)) + } + return + } + h.writeFrame(data, bufEid, handler, summary) +} + +// flushResumeTail writes all buffered events with EID > afterEid to the current holder while holding h.mu. +// Live writeFrame calls block on the same mutex, so chunks appended only to the ring during the initial +// snapshot replay cannot be missed on this connection (see ResumeCommandStream). +// Returns how many extra events were written after the initial snapshot replay. +func (h *streamHub) flushResumeTail(commandID string, afterEid int64) int { + if h == nil { + return 0 + } + h.mu.Lock() + defer h.mu.Unlock() + if h.holder == nil { + return 0 + } + + tail, ok := resumeBuffer.EventsAfter(commandID, afterEid) + if !ok || len(tail) == 0 { + return 0 + } + writer := h.holder.writer + written := 0 + for _, ev := range tail { + payload := append(append([]byte(nil), ev.Payload...), '\n', '\n') + nw, err := writer.Write(payload) + if err == nil && nw != len(payload) { + err = io.ErrShortWrite + } + if err != nil { + log.Error("flushResumeTail: write eid=%d: %v", ev.EID, err) + return written + } + if flusher, ok := writer.(http.Flusher); ok { + flusher.Flush() + } + written++ + } + return written +} + +func (h *streamHub) writeFrame(data []byte, bufEid int64, handler, summary string) { + h.mu.Lock() + defer h.mu.Unlock() + + payload := append(data, '\n', '\n') + if h.holder != nil { + n, err := h.holder.writer.Write(payload) + if err == nil && n != len(payload) { + err = io.ErrShortWrite + } + if err != nil { + log.Error("StreamEvent.%s write data %s error: %v", handler, summary, err) + } else if flusher, ok := h.holder.writer.(http.Flusher); ok { + flusher.Flush() + } + } + + if bufEid > 0 { + _ = resumeBuffer.Append(h.streamID, bufEid, bytes.Clone(data)) + } +} diff --git a/components/execd/pkg/web/controller/command_stream_test.go b/components/execd/pkg/web/controller/command_stream_test.go new file mode 100644 index 000000000..72d913388 --- /dev/null +++ b/components/execd/pkg/web/controller/command_stream_test.go @@ -0,0 +1,49 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "context" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestFlushResumeTail_NilHubNoPanic(t *testing.T) { + var h *streamHub + h.flushResumeTail("any", 0) +} + +func TestFlushResumeTail_WritesBufferedEvents(t *testing.T) { + cmdID := "flush-resume-test-cmd" + payload := []byte(`{"type":"stdout","eid":1}`) + require.NoError(t, resumeBuffer.Append(cmdID, 1, payload)) + + w := httptest.NewRecorder() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + h := &streamHub{ + streamID: cmdID, + done: make(chan struct{}), + holder: &streamHolder{writer: w, ctx: ctx}, + } + h.flushResumeTail(cmdID, 0) + + body := w.Body.String() + require.Contains(t, body, "stdout") + require.Contains(t, body, `"eid":1`) +} diff --git a/components/execd/pkg/web/controller/sse.go b/components/execd/pkg/web/controller/sse.go index 573315260..211c88bcb 100644 --- a/components/execd/pkg/web/controller/sse.go +++ b/components/execd/pkg/web/controller/sse.go @@ -46,16 +46,23 @@ func (c *basicController) setupSSEResponse() { } // setServerEventsHandler adapts runtime callbacks to SSE events. -func (c *CodeInterpretingController) setServerEventsHandler(ctx context.Context) runtime.ExecuteResultHook { +func (c *CodeInterpretingController) setServerEventsHandler(ctx context.Context, req *runtime.ExecuteCodeRequest) runtime.ExecuteResultHook { return runtime.ExecuteResultHook{ OnExecuteInit: func(session string) { + if c.resumeEnabled.Load() { + c.resumeStreamMu.Lock() + c.resumeStreamID = session + c.resumeStreamMu.Unlock() + commandStreams.registerPrimary(session, c.ctx.Writer, c.ctx.Request.Context()) + } + event := model.ServerStreamEvent{ Type: model.StreamEventTypeInit, Text: session, Timestamp: time.Now().UnixMilli(), } payload := event.ToJSON() - c.writeSingleEvent("OnExecuteInit", payload, true, event.Summary()) + c.writeSingleEvent("OnExecuteInit", payload, true, event.Summary(), 0) safego.Go(func() { c.ping(ctx) }) }, @@ -80,7 +87,7 @@ func (c *CodeInterpretingController) setServerEventsHandler(ctx context.Context) Timestamp: time.Now().UnixMilli(), } payload := event.ToJSON() - c.writeSingleEvent("OnExecuteResult", payload, true, event.Summary()) + c.writeSingleEvent("OnExecuteResult", payload, true, event.Summary(), 0) } if len(mutated) > 0 { event := model.ServerStreamEvent{ @@ -89,30 +96,34 @@ func (c *CodeInterpretingController) setServerEventsHandler(ctx context.Context) Timestamp: time.Now().UnixMilli(), } payload := event.ToJSON() - c.writeSingleEvent("OnExecuteResult", payload, true, event.Summary()) + c.writeSingleEvent("OnExecuteResult", payload, true, event.Summary(), 0) } }, OnExecuteComplete: func(executionTime time.Duration) { + eid := req.NextEventID() event := model.ServerStreamEvent{ + Eid: eid, Type: model.StreamEventTypeComplete, ExecutionTime: executionTime.Milliseconds(), Timestamp: time.Now().UnixMilli(), } payload := event.ToJSON() - c.writeSingleEvent("OnExecuteComplete", payload, true, event.Summary()) + c.writeSingleEvent("OnExecuteComplete", payload, true, event.Summary(), eid) }, OnExecuteError: func(err *execute.ErrorOutput) { if err == nil { return } + eid := req.NextEventID() event := model.ServerStreamEvent{ + Eid: eid, Type: model.StreamEventTypeError, Error: err, Timestamp: time.Now().UnixMilli(), } payload := event.ToJSON() - c.writeSingleEvent("OnExecuteError", payload, true, event.Summary()) + c.writeSingleEvent("OnExecuteError", payload, true, event.Summary(), eid) }, OnExecuteStatus: func(status string) { event := model.ServerStreamEvent{ @@ -121,43 +132,60 @@ func (c *CodeInterpretingController) setServerEventsHandler(ctx context.Context) Timestamp: time.Now().UnixMilli(), } payload := event.ToJSON() - c.writeSingleEvent("OnExecuteStatus", payload, true, event.Summary()) + c.writeSingleEvent("OnExecuteStatus", payload, true, event.Summary(), 0) }, - OnExecuteStdout: func(text string) { + OnExecuteStdout: func(eid int64, text string) { if text == "" { return } event := model.ServerStreamEvent{ + Eid: eid, Type: model.StreamEventTypeStdout, Text: text, Timestamp: time.Now().UnixMilli(), } payload := event.ToJSON() - c.writeSingleEvent("OnExecuteStdout", payload, true, event.Summary()) + c.writeSingleEvent("OnExecuteStdout", payload, true, event.Summary(), eid) }, - OnExecuteStderr: func(text string) { + OnExecuteStderr: func(eid int64, text string) { if text == "" { return } event := model.ServerStreamEvent{ + Eid: eid, Type: model.StreamEventTypeStderr, Text: text, Timestamp: time.Now().UnixMilli(), } payload := event.ToJSON() - c.writeSingleEvent("OnExecuteStderr", payload, true, event.Summary()) + c.writeSingleEvent("OnExecuteStderr", payload, true, event.Summary(), eid) }, } } -// writeSingleEvent serializes one SSE frame. -func (c *CodeInterpretingController) writeSingleEvent(handler string, data []byte, verbose bool, summary string) { +// writeSingleEvent serializes one SSE frame. When resumeStreamID is set, writes go through commandStreams (live hub + buffer). +// bufEid is stdout/stderr event id for the resume buffer; 0 skips Append (control events, resume catch-up frames). +func (c *CodeInterpretingController) writeSingleEvent(handler string, data []byte, verbose bool, summary string, bufEid int64) { if c == nil || c.ctx == nil || c.ctx.Writer == nil { return } + var streamID string + if c.resumeEnabled.Load() { + c.resumeStreamMu.Lock() + streamID = c.resumeStreamID + c.resumeStreamMu.Unlock() + } + if streamID != "" { + commandStreams.writeSSE(streamID, data, bufEid, handler, summary) + if verbose { + log.Info("StreamEvent.%s write data %s", handler, summary) + } + return + } + select { case <-c.ctx.Request.Context().Done(): log.Error("StreamEvent.%s: client disconnected", handler) @@ -167,11 +195,6 @@ func (c *CodeInterpretingController) writeSingleEvent(handler string, data []byt c.chunkWriter.Lock() defer c.chunkWriter.Unlock() - defer func() { - if flusher, ok := c.ctx.Writer.(http.Flusher); ok { - flusher.Flush() - } - }() payload := append(data, '\n', '\n') n, err := c.ctx.Writer.Write(payload) @@ -181,10 +204,15 @@ func (c *CodeInterpretingController) writeSingleEvent(handler string, data []byt if err != nil { log.Error("StreamEvent.%s write data %s error: %v", handler, summary, err) - } else { - if verbose { - log.Info("StreamEvent.%s write data %s", handler, summary) - } + return + } + + if flusher, ok := c.ctx.Writer.(http.Flusher); ok { + flusher.Flush() + } + + if verbose { + log.Info("StreamEvent.%s write data %s", handler, summary) } } @@ -200,6 +228,6 @@ func (c *CodeInterpretingController) ping(ctx context.Context) { Timestamp: time.Now().UnixMilli(), } payload := event.ToJSON() - c.writeSingleEvent("Ping", payload, false, event.Summary()) + c.writeSingleEvent("Ping", payload, false, event.Summary(), 0) }, 3*time.Second, ctx.Done()) } diff --git a/components/execd/pkg/web/model/codeinterpreting.go b/components/execd/pkg/web/model/codeinterpreting.go index c45382a17..c473b94e4 100644 --- a/components/execd/pkg/web/model/codeinterpreting.go +++ b/components/execd/pkg/web/model/codeinterpreting.go @@ -16,14 +16,12 @@ package model import ( "encoding/json" - "errors" "fmt" "strings" "github.com/go-playground/validator/v10" "github.com/alibaba/opensandbox/execd/pkg/jupyter/execute" - "github.com/alibaba/opensandbox/execd/pkg/runtime" ) // RunCodeRequest represents a code execution request. @@ -48,30 +46,6 @@ type CodeContextRequest struct { Cwd string `json:"cwd,omitempty"` } -// RunCommandRequest represents a shell command execution request. -type RunCommandRequest struct { - Command string `json:"command" validate:"required"` - Cwd string `json:"cwd,omitempty"` - Background bool `json:"background,omitempty"` - // TimeoutMs caps execution duration; 0 uses server default. - TimeoutMs int64 `json:"timeout,omitempty" validate:"omitempty,gte=1"` - - Uid *uint32 `json:"uid,omitempty"` - Gid *uint32 `json:"gid,omitempty"` - Envs map[string]string `json:"envs,omitempty"` -} - -func (r *RunCommandRequest) Validate() error { - validate := validator.New() - if err := validate.Struct(r); err != nil { - return err - } - if r.Gid != nil && r.Uid == nil { - return errors.New("uid is required when gid is provided") - } - return runtime.ValidateWorkingDir(r.Cwd) -} - type ServerStreamEventType string const ( @@ -88,6 +62,7 @@ const ( // ServerStreamEvent is emitted to clients over SSE. type ServerStreamEvent struct { + Eid int64 `json:"eid,omitempty"` Type ServerStreamEventType `json:"type,omitempty"` Text string `json:"text,omitempty"` ExecutionCount int `json:"execution_count,omitempty"` @@ -106,6 +81,9 @@ func (s ServerStreamEvent) ToJSON() []byte { // Summary renders a lightweight, log-friendly string without JSON. func (s ServerStreamEvent) Summary() string { parts := []string{fmt.Sprintf("type=%s", s.Type)} + if s.Eid > 0 { + parts = append(parts, fmt.Sprintf("eid=%d", s.Eid)) + } if s.Text != "" { parts = append(parts, fmt.Sprintf("text=%s", truncateString(s.Text, 100))) } diff --git a/components/execd/pkg/web/model/codeinterpreting_test.go b/components/execd/pkg/web/model/codeinterpreting_test.go index f90536b9e..b7f3db0c3 100644 --- a/components/execd/pkg/web/model/codeinterpreting_test.go +++ b/components/execd/pkg/web/model/codeinterpreting_test.go @@ -55,10 +55,11 @@ func TestRunCommandRequestValidateCwd(t *testing.T) { req := RunCommandRequest{Command: "ls", Cwd: tmp} require.NoError(t, req.Validate()) + // Cwd is not validated for existence — the runtime resolves it. Only + // structural constraints (non-empty command, non-negative timeout, uid/gid) + // are enforced at the API layer. req.Cwd = filepath.Join(tmp, "missing-subdir") - err := req.Validate() - require.Error(t, err) - require.Contains(t, err.Error(), "working directory") + require.NoError(t, req.Validate()) } func ptr32(v uint32) *uint32 { return &v } @@ -79,6 +80,7 @@ func TestRunCommandRequestValidateUidGid(t *testing.T) { func TestServerStreamEventToJSON(t *testing.T) { event := ServerStreamEvent{ + Eid: 42, Type: StreamEventTypeStdout, Text: "hello", ExecutionCount: 3, @@ -87,6 +89,7 @@ func TestServerStreamEventToJSON(t *testing.T) { data := event.ToJSON() var decoded ServerStreamEvent require.NoError(t, json.Unmarshal(data, &decoded)) + require.Equal(t, event.Eid, decoded.Eid) require.Equal(t, event.Type, decoded.Type) require.Equal(t, event.Text, decoded.Text) require.Equal(t, event.ExecutionCount, decoded.ExecutionCount) @@ -102,11 +105,12 @@ func TestServerStreamEventSummary(t *testing.T) { { name: "basic stdout", event: ServerStreamEvent{ + Eid: 7, Type: StreamEventTypeStdout, Text: "hello", ExecutionCount: 2, }, - contains: []string{"type=stdout", "text=hello"}, + contains: []string{"type=stdout", "eid=7", "text=hello"}, }, { name: "truncated text and error", diff --git a/components/execd/pkg/web/model/command.go b/components/execd/pkg/web/model/command.go index 0d35aa823..8876eaa06 100644 --- a/components/execd/pkg/web/model/command.go +++ b/components/execd/pkg/web/model/command.go @@ -14,7 +14,38 @@ package model -import "time" +import ( + "errors" + "time" + + "github.com/go-playground/validator/v10" +) + +const CommandResumeAfterEidQuery = "after_eid" + +// RunCommandRequest represents a shell command execution request. +type RunCommandRequest struct { + Command string `json:"command" validate:"required"` + Cwd string `json:"cwd,omitempty"` + Background bool `json:"background,omitempty"` + // TimeoutMs caps execution duration; 0 uses server default. + TimeoutMs int64 `json:"timeout,omitempty" validate:"omitempty,gte=1"` + + Uid *uint32 `json:"uid,omitempty"` + Gid *uint32 `json:"gid,omitempty"` + Envs map[string]string `json:"envs,omitempty"` +} + +func (r *RunCommandRequest) Validate() error { + validate := validator.New() + if err := validate.Struct(r); err != nil { + return err + } + if r.Gid != nil && r.Uid == nil { + return errors.New("uid is required when gid is provided") + } + return nil +} // CommandStatusResponse represents command status for REST APIs. type CommandStatusResponse struct { diff --git a/components/execd/pkg/web/router.go b/components/execd/pkg/web/router.go index 73163cbc5..03280aa97 100644 --- a/components/execd/pkg/web/router.go +++ b/components/execd/pkg/web/router.go @@ -74,6 +74,7 @@ func NewRouter(accessToken string) *gin.Engine { command.POST("", withCode(func(c *controller.CodeInterpretingController) { c.RunCommand() })) command.DELETE("", withCode(func(c *controller.CodeInterpretingController) { c.InterruptCommand() })) command.GET("/status/:id", withCode(func(c *controller.CodeInterpretingController) { c.GetCommandStatus() })) + command.GET("/:id/resume", withCode(func(c *controller.CodeInterpretingController) { c.ResumeCommandStream() })) command.GET("/:id/logs", withCode(func(c *controller.CodeInterpretingController) { c.GetBackgroundCommandOutput() })) } diff --git a/components/execd/tests/command_resume_test.py b/components/execd/tests/command_resume_test.py new file mode 100644 index 000000000..4bbc2d74a --- /dev/null +++ b/components/execd/tests/command_resume_test.py @@ -0,0 +1,274 @@ +#!/usr/bin/env python3 +# Copyright 2026 Alibaba Group Holding Ltd. +# +# Manual local test: POST /command (streaming output) -> disconnect -> GET /resume (catch-up + live tail), +# repeat at least 3 disconnect/resume rounds, then read until execution_complete on the last connection. +# +# Configure EXECD_URL (and optional EXECD_TOKEN). Examples: +# EXECD_URL=localhost:44772 +# EXECD_URL=https://remote.example +# python3 components/execd/tests/command_resume_test.py + +from __future__ import annotations + +import http.client +import json +import os +import ssl +import sys +import urllib.parse +from typing import Any + +API_ACCESS_TOKEN_HEADER = "X-EXECD-ACCESS-TOKEN" + + +class RunCollector: + """Aggregates stdout and execution_complete across connections for final assertions.""" + + def __init__(self) -> None: + self.stdout_by_eid: dict[int, str] = {} + self.primary_stdout_lines = 0 + self.resume_stdout_lines = 0 + self.saw_complete = False + + def record(self, tag: str, ev: dict[str, Any]) -> None: + t = ev.get("type") + if t == "execution_complete": + self.saw_complete = True + return + if t != "stdout": + return + eid = int(ev.get("eid") or 0) + txt = (ev.get("text") or "").strip() + if eid in self.stdout_by_eid: + assert self.stdout_by_eid[eid] == txt, ( + f"duplicate eid {eid} with different text: {self.stdout_by_eid[eid]!r} vs {txt!r}" + ) + else: + self.stdout_by_eid[eid] = txt + if tag == "primary": + self.primary_stdout_lines += 1 + elif tag.startswith("resume"): + self.resume_stdout_lines += 1 + + def assert_ok(self) -> None: + assert self.saw_complete, "expected execution_complete" + assert self.resume_stdout_lines > 0, ( + "resume delivered no stdout lines; disconnect resume may not be working (check 409, STDOUT_PER_CHOP)" + ) + assert len(self.stdout_by_eid) == OUTPUT_LINES, ( + f"expected {OUTPUT_LINES} stdout lines, got distinct eid count={len(self.stdout_by_eid)}" + ) + for n in range(1, OUTPUT_LINES + 1): + assert n in self.stdout_by_eid, f"missing eid={n}" + assert self.stdout_by_eid[n] == f"tick{n}", ( + f"eid={n} text should be tick{n}, got {self.stdout_by_eid[n]!r}" + ) + assert self.primary_stdout_lines >= 1, "primary connection should receive at least one stdout line" + assert self.primary_stdout_lines + self.resume_stdout_lines == OUTPUT_LINES, ( + "primary + resume stdout line counts should equal total stdout lines (each line counted once): " + f"primary={self.primary_stdout_lines} resume={self.resume_stdout_lines} expected={OUTPUT_LINES}" + ) + print( + "ASSERT ok: execution_complete + resume delivered output + tick1..tick" + + str(OUTPUT_LINES) + + " with eid 1.." + + str(OUTPUT_LINES) + + " complete", + flush=True, + ) + +# Execd base URL (host:port is ok; http:// is prepended if missing). +EXECD_URL = os.environ.get("EXECD_URL", "http://127.0.0.1:44772") +if "://" not in EXECD_URL: + EXECD_URL = "http://" + EXECD_URL + +TOKEN = os.environ.get("EXECD_TOKEN", "") + +# Close each connection after this many stdout lines (three disconnect/resume rounds before the final read). +STDOUT_PER_CHOP = 15 + +# One primary disconnect plus (RESUME_CHOPS - 1) partial resume disconnects; last resume reads until complete. +RESUME_CHOPS = 3 + +# Bounded output: sleep 0.1s between lines, OUTPUT_LINES total; wall time ~ OUTPUT_LINES * 0.1s. +OUTPUT_LINES = 200 + +TIMEOUT_MS = 300_000 + +COMMAND = ( + "sh -c 'n=0; while [ \"$n\" -lt " + + str(OUTPUT_LINES) + + " ]; do n=$((n+1)); echo tick$n; sleep 0.1; done'" +) + + +def parse_frames(buf: bytes) -> tuple[list[dict[str, Any]], bytes]: + out: list[dict[str, Any]] = [] + while True: + i = buf.find(b"\n\n") + if i < 0: + return out, buf + raw = buf[:i].strip() + buf = buf[i + 2 :] + if not raw: + continue + try: + out.append(json.loads(raw.decode("utf-8"))) + except (json.JSONDecodeError, UnicodeDecodeError): + pass + + +def connect(scheme: str, host: str, port: int) -> http.client.HTTPConnection: + if scheme == "https": + return http.client.HTTPSConnection( + host, port, timeout=600, context=ssl.create_default_context() + ) + return http.client.HTTPConnection(host, port, timeout=600) + + +def parse_url(base: str) -> tuple[str, str, int, str]: + u = urllib.parse.urlparse(base.rstrip("/")) + scheme = (u.scheme or "http").lower() + host = u.hostname or "127.0.0.1" + port = u.port or (443 if scheme == "https" else 80) + return scheme, host, port, u.path or "" + + +def path_join(prefix: str, p: str) -> str: + if not prefix: + return p if p.startswith("/") else "/" + p + return prefix.rstrip("/") + (p if p.startswith("/") else "/" + p) + + +def headers() -> dict[str, str]: + h: dict[str, str] = { + "Content-Type": "application/json", + "Accept": "text/event-stream", + } + if TOKEN: + h[API_ACCESS_TOKEN_HEADER] = TOKEN + return h + + +def pump( + resp: http.client.HTTPResponse, + tag: str, + max_eid: int, + *, + stop_after_stdout: int | None, + collector: RunCollector | None = None, +) -> tuple[int, bool, str | None]: + """Read SSE; update max_eid. If stop_after_stdout is a number, stop after that many stdout lines; if None, read until execution_complete.""" + buf = b"" + cmd_id: str | None = None + stdout_n = 0 + complete = False + while True: + chunk = resp.read(8192) + if not chunk: + break + buf += chunk + frames, buf = parse_frames(buf) + for ev in frames: + if collector is not None: + collector.record(tag, ev) + t = ev.get("type") + if t == "init": + cmd_id = ev.get("text") + print(f"[{tag}] init id={cmd_id}") + elif t in ("stdout", "stderr"): + eid = int(ev.get("eid") or 0) + max_eid = max(max_eid, eid) + txt = ev.get("text", "") + print(f"[{tag}] {t} eid={eid} {txt!r}") + if t == "stdout": + stdout_n += 1 + elif t == "execution_complete": + complete = True + print(f"[{tag}] execution_complete ms={ev.get('execution_time')}") + elif t != "ping": + print(f"[{tag}] {t}") + + if complete: + return max_eid, True, cmd_id + if stop_after_stdout is not None and stdout_n >= stop_after_stdout: + return max_eid, False, cmd_id + return max_eid, complete, cmd_id + + +def main() -> int: + scheme, host, port, prefix = parse_url(EXECD_URL) + h = headers() + cmd_path = path_join(prefix, "/command") + resume_tmpl = path_join(prefix, "/command/{id}/resume") + collector = RunCollector() + + body = json.dumps({"command": COMMAND, "timeout": TIMEOUT_MS}) + conn = connect(scheme, host, port) + conn.request("POST", cmd_path, body.encode("utf-8"), h) + r = conn.getresponse() + if r.status != 200: + print(f"POST /command HTTP {r.status}", r.read().decode("utf-8", "replace"), file=sys.stderr) + conn.close() + return 1 + + max_eid = 0 + max_eid, done, cid = pump( + r, + "primary", + max_eid, + stop_after_stdout=STDOUT_PER_CHOP, + collector=collector, + ) + conn.close() + if not cid: + print("no init", file=sys.stderr) + return 1 + if done: + print("command finished on primary connection (unexpected)", file=sys.stderr) + return 0 + + for round_i in range(RESUME_CHOPS): + path = resume_tmpl.format(id=cid) + f"?after_eid={max_eid}" + tag = f"resume{round_i + 1}" + c2 = connect(scheme, host, port) + c2.request("GET", path, headers=h) + r2 = c2.getresponse() + if r2.status == 409: + print( + f"{tag} HTTP 409: primary SSE still active; retry later or increase STDOUT_PER_CHOP", + file=sys.stderr, + ) + print(r2.read().decode("utf-8", "replace"), file=sys.stderr) + c2.close() + return 1 + if r2.status != 200: + print(f"{tag} HTTP {r2.status}", r2.read().decode("utf-8", "replace"), file=sys.stderr) + c2.close() + return 1 + + last = round_i == RESUME_CHOPS - 1 + max_eid, done, _ = pump( + r2, + tag, + max_eid, + stop_after_stdout=None if last else STDOUT_PER_CHOP, + collector=collector, + ) + c2.close() + if done: + try: + collector.assert_ok() + except AssertionError as e: + print(f"ASSERT failed: {e}", file=sys.stderr) + return 1 + print("done.") + return 0 + + print("done (unexpected: should have completed in last resume)", file=sys.stderr) + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/sdks/sandbox/csharp/src/OpenSandbox/Adapters/CommandsAdapter.cs b/sdks/sandbox/csharp/src/OpenSandbox/Adapters/CommandsAdapter.cs index 8a6f68e1d..eb41037ab 100644 --- a/sdks/sandbox/csharp/src/OpenSandbox/Adapters/CommandsAdapter.cs +++ b/sdks/sandbox/csharp/src/OpenSandbox/Adapters/CommandsAdapter.cs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +using System.IO; using System.Runtime.CompilerServices; using System.Text; using System.Text.Json; @@ -73,6 +74,8 @@ public async IAsyncEnumerable RunStreamAsync( } } + private const int MaxResumeRetries = 3; + public async Task RunAsync( string command, RunCommandOptions? options = null, @@ -80,11 +83,78 @@ public async Task RunAsync( CancellationToken cancellationToken = default) { _logger.LogDebug("Running command (commandLength={CommandLength})", command.Length); - return await ConsumeExecutionAsync( - RunStreamAsync(command, options, cancellationToken), - handlers, - inferExitCode: !(options?.Background ?? false), - cancellationToken).ConfigureAwait(false); + + var execution = new Execution(); + string? commandId = null; + var inferExitCode = !(options?.Background ?? false); + + for (int attempt = 0; attempt <= MaxResumeRetries; attempt++) + { + try + { + var stream = attempt == 0 + ? RunStreamAsync(command, options, cancellationToken) + : ResumeStreamAsync(commandId!, execution.LastEid, cancellationToken); + + await ConsumeExecutionIntoAsync(stream, execution, handlers, cancellationToken).ConfigureAwait(false); + + if (inferExitCode) + { + execution.ExitCode = InferForegroundExitCode(execution); + } + return execution; + } + catch (Exception ex) + { + if (execution.Id != null) commandId = execution.Id; + + if (ex is SandboxApiException apiEx) + { + if (apiEx.StatusCode == 409 && attempt < MaxResumeRetries) + { + await Task.Delay(1000, cancellationToken).ConfigureAwait(false); + continue; + } + if (apiEx.StatusCode == 404 && attempt > 0) + { + return execution; + } + _logger.LogError(ex, "Failed to run command (length={CommandLength})", command.Length); + throw; + } + + if (commandId == null || attempt >= MaxResumeRetries || !IsNetworkError(ex)) + { + _logger.LogError(ex, "Failed to run command (length={CommandLength})", command.Length); + throw; + } + } + } + + return execution; + } + + private async IAsyncEnumerable ResumeStreamAsync( + string commandId, + long afterEid, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + var url = $"{_baseUrl}/command/{Uri.EscapeDataString(commandId)}/resume?after_eid={afterEid}"; + using var request = new HttpRequestMessage(HttpMethod.Get, url); + + request.Headers.Accept.Add(new System.Net.Http.Headers.MediaTypeWithQualityHeaderValue("text/event-stream")); + + foreach (var header in _headers) + { + request.Headers.TryAddWithoutValidation(header.Key, header.Value); + } + + using var response = await _sseHttpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); + + await foreach (var ev in SseParser.ParseJsonEventStreamAsync(response, "Resume command failed", cancellationToken).ConfigureAwait(false)) + { + yield return ev; + } } public async Task InterruptAsync(string sessionId, CancellationToken cancellationToken = default) @@ -318,6 +388,20 @@ private async Task ConsumeExecutionAsync( CancellationToken cancellationToken) { var execution = new Execution(); + await ConsumeExecutionIntoAsync(stream, execution, handlers, cancellationToken).ConfigureAwait(false); + if (inferExitCode) + { + execution.ExitCode = InferForegroundExitCode(execution); + } + return execution; + } + + private static async Task ConsumeExecutionIntoAsync( + IAsyncEnumerable stream, + Execution execution, + ExecutionHandlers? handlers, + CancellationToken cancellationToken) + { var dispatcher = new ExecutionEventDispatcher(execution, handlers); await foreach (var ev in stream.WithCancellation(cancellationToken).ConfigureAwait(false)) @@ -325,13 +409,21 @@ private async Task ConsumeExecutionAsync( PreserveLegacyInitId(ev, execution); await dispatcher.DispatchAsync(ev).ConfigureAwait(false); } + } - if (inferExitCode) + private static bool IsNetworkError(Exception ex) + { + if (ex is OperationCanceledException) return false; + if (ex is IOException) return true; + if (ex is System.Net.Http.HttpRequestException) return true; + + var inner = ex.InnerException; + while (inner != null) { - execution.ExitCode = InferForegroundExitCode(execution); + if (inner is IOException) return true; + inner = inner.InnerException; } - - return execution; + return false; } private sealed record StreamingRequestSpec(string Url, object Body, string ErrorMessage); diff --git a/sdks/sandbox/csharp/src/OpenSandbox/Internal/ExecutionEventDispatcher.cs b/sdks/sandbox/csharp/src/OpenSandbox/Internal/ExecutionEventDispatcher.cs index 25886f2b2..9325f48e0 100644 --- a/sdks/sandbox/csharp/src/OpenSandbox/Internal/ExecutionEventDispatcher.cs +++ b/sdks/sandbox/csharp/src/OpenSandbox/Internal/ExecutionEventDispatcher.cs @@ -32,6 +32,11 @@ public ExecutionEventDispatcher(Execution execution, ExecutionHandlers? handlers public async Task DispatchAsync(ServerStreamEvent ev) { + if (ev.Eid.HasValue && ev.Eid.Value > _execution.LastEid) + { + _execution.LastEid = ev.Eid.Value; + } + var timestamp = ev.Timestamp ?? DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); switch (ev.Type) diff --git a/sdks/sandbox/csharp/src/OpenSandbox/Models/Execd.cs b/sdks/sandbox/csharp/src/OpenSandbox/Models/Execd.cs index 91a19d529..2b2c2009b 100644 --- a/sdks/sandbox/csharp/src/OpenSandbox/Models/Execd.cs +++ b/sdks/sandbox/csharp/src/OpenSandbox/Models/Execd.cs @@ -62,6 +62,12 @@ public class ServerStreamEvent /// [JsonPropertyName("execution_time")] public long? ExecutionTime { get; set; } + + /// + /// Gets or sets the monotonic event ID for SSE resume. + /// + [JsonPropertyName("eid")] + public long? Eid { get; set; } } /// diff --git a/sdks/sandbox/csharp/src/OpenSandbox/Models/Execution.cs b/sdks/sandbox/csharp/src/OpenSandbox/Models/Execution.cs index cfe34a85c..f98915db6 100644 --- a/sdks/sandbox/csharp/src/OpenSandbox/Models/Execution.cs +++ b/sdks/sandbox/csharp/src/OpenSandbox/Models/Execution.cs @@ -171,6 +171,11 @@ public class Execution /// Gets or sets the command exit code when available. /// public int? ExitCode { get; set; } + + /// + /// Gets or sets the highest event ID seen, used for SSE resume. + /// + public long LastEid { get; set; } } /// diff --git a/sdks/sandbox/csharp/tests/OpenSandbox.Tests/CommandsAdapterTests.cs b/sdks/sandbox/csharp/tests/OpenSandbox.Tests/CommandsAdapterTests.cs index d6db30db3..2a2444d37 100644 --- a/sdks/sandbox/csharp/tests/OpenSandbox.Tests/CommandsAdapterTests.cs +++ b/sdks/sandbox/csharp/tests/OpenSandbox.Tests/CommandsAdapterTests.cs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +using System.IO; using System.Net; using System.Text; using System.Text.Json; @@ -441,6 +442,62 @@ await act.Should().ThrowAsync() .WithMessage("*sessionId*"); } + [Fact] + public async Task RunAsync_ShouldAutoResumeOnDisconnect() + { + const int totalLines = 20; + var allEvents = new StringBuilder(); + allEvents.AppendLine("""{"type":"init","text":"cmd-resume","timestamp":1,"eid":1}"""); + for (int i = 1; i <= totalLines; i++) + { + allEvents.AppendLine($$"""{"type":"stdout","text":"line{{i}}","timestamp":1,"eid":{{i + 1}}}"""); + } + allEvents.AppendLine($$"""{"type":"execution_complete","execution_time":100,"timestamp":1,"eid":{{totalLines + 2}}}"""); + + var handler = new StubHttpMessageHandler((request, _) => + { + if (request.Method == HttpMethod.Post && request.RequestUri!.AbsolutePath == "/command") + { + return Task.FromResult(new HttpResponseMessage(HttpStatusCode.OK) + { + Content = new TruncatedStreamContent(allEvents.ToString(), maxLines: 5) + }); + } + + if (request.Method == HttpMethod.Get && request.RequestUri!.AbsolutePath.Contains("/resume")) + { + var query = request.RequestUri!.Query; + var afterEid = long.Parse(query.Split('=')[1]); + var remaining = new StringBuilder(); + for (long eid = afterEid + 1; eid <= totalLines + 1; eid++) + { + remaining.AppendLine($$"""{"type":"stdout","text":"line{{eid - 1}}","timestamp":1,"eid":{{eid}}}"""); + } + remaining.AppendLine($$"""{"type":"execution_complete","execution_time":100,"timestamp":1,"eid":{{totalLines + 2}}}"""); + + return Task.FromResult(new HttpResponseMessage(HttpStatusCode.OK) + { + Content = new StringContent(remaining.ToString(), Encoding.UTF8, "text/event-stream") + }); + } + + return Task.FromResult(new HttpResponseMessage(HttpStatusCode.NotFound)); + }); + + var adapter = CreateAdapter(handler); + var execution = await adapter.RunAsync("test command"); + + execution.Id.Should().Be("cmd-resume"); + execution.ExitCode.Should().Be(0); + execution.Complete.Should().NotBeNull(); + execution.Logs.Stdout.Should().HaveCount(totalLines); + for (int i = 1; i <= totalLines; i++) + { + execution.Logs.Stdout.Should().Contain(m => m.Text == $"line{i}"); + } + execution.LastEid.Should().BeGreaterOrEqualTo(totalLines + 1); + } + private static CommandsAdapter CreateAdapter(HttpMessageHandler httpHandler) { var baseUrl = "http://execd.local"; @@ -468,4 +525,33 @@ protected override async Task SendAsync(HttpRequestMessage return await _handler(request, cancellationToken).ConfigureAwait(false); } } + + private sealed class TruncatedStreamContent : HttpContent + { + private readonly string _content; + private readonly int _maxLines; + + public TruncatedStreamContent(string content, int maxLines) + { + _content = content; + _maxLines = maxLines; + Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue("text/event-stream"); + } + + protected override async Task SerializeToStreamAsync(Stream stream, TransportContext? context) + { + var lines = _content.Split('\n'); + var truncated = string.Join("\n", lines.Take(_maxLines)) + "\n"; + var bytes = Encoding.UTF8.GetBytes(truncated); + await stream.WriteAsync(bytes).ConfigureAwait(false); + await stream.FlushAsync().ConfigureAwait(false); + throw new IOException("Simulated disconnect for SSE resume test"); + } + + protected override bool TryComputeLength(out long length) + { + length = -1; + return false; + } + } } diff --git a/sdks/sandbox/go/execd.go b/sdks/sandbox/go/execd.go index 8a196f7f5..6643588e5 100644 --- a/sdks/sandbox/go/execd.go +++ b/sdks/sandbox/go/execd.go @@ -128,6 +128,13 @@ func (e *ExecdClient) RunCommand(ctx context.Context, req RunCommandRequest, han return e.client.doStreamRequest(ctx, http.MethodPost, "/command", req, handler) } +// ResumeCommand resumes an interrupted SSE stream for the given command +// starting from the specified event ID (exclusive). +func (e *ExecdClient) ResumeCommand(ctx context.Context, commandID string, afterEid int64, handler EventHandler) error { + path := "/command/" + url.PathEscape(commandID) + "/resume?after_eid=" + strconv.FormatInt(afterEid, 10) + return e.client.doStreamRequest(ctx, http.MethodGet, path, nil, handler) +} + // InterruptCommand interrupts the currently running command execution. func (e *ExecdClient) InterruptCommand(ctx context.Context, sessionID string) error { path := "/command?id=" + url.QueryEscape(sessionID) diff --git a/sdks/sandbox/go/execution.go b/sdks/sandbox/go/execution.go index 93692dd40..f0cf7f38e 100644 --- a/sdks/sandbox/go/execution.go +++ b/sdks/sandbox/go/execution.go @@ -81,6 +81,9 @@ type Execution struct { // ExitCode is the process exit code. Nil if not available. ExitCode *int + + // LastEid is the highest event ID seen so far, used for SSE resume. + LastEid int64 } // Text returns the combined stdout text. @@ -120,6 +123,7 @@ type sseEvent struct { Type string `json:"type"` Text string `json:"text"` Timestamp int64 `json:"timestamp"` + Eid int64 `json:"eid,omitempty"` ExitCode *int `json:"exit_code,omitempty"` ExecutionTime int64 `json:"execution_time,omitempty"` @@ -153,6 +157,10 @@ func processStreamEvent(exec *Execution, event StreamEvent, handlers *ExecutionH return nil } + if ev.Eid > exec.LastEid { + exec.LastEid = ev.Eid + } + switch ev.Type { case "init": initEvent := ExecutionInit{ID: ev.Text, Timestamp: ev.Timestamp} diff --git a/sdks/sandbox/go/opensandbox_test.go b/sdks/sandbox/go/opensandbox_test.go index 0fee93b42..25a8e9fd6 100644 --- a/sdks/sandbox/go/opensandbox_test.go +++ b/sdks/sandbox/go/opensandbox_test.go @@ -17,11 +17,14 @@ package opensandbox import ( "context" "encoding/json" + "errors" "fmt" "io" + "net" "net/http" "net/http/httptest" "os" + "strconv" "strings" "sync" "testing" @@ -2276,3 +2279,188 @@ func TestCreateSandbox_WithVolumes(t *testing.T) { }) require.NoErrorf(t, err, "CreateSandbox with Volumes") } + +// errorReader is an io.Reader that always returns a net.Error, simulating a +// TCP connection reset after a cleanly truncated response body. +type errorReader struct{} + +func (errorReader) Read([]byte) (int, error) { + return 0, &net.OpError{ + Op: "read", + Net: "tcp", + Err: errors.New("connection reset by peer"), + } +} + +// disconnectInjectTransport wraps a real transport. On POST /command it buffers +// the full SSE response, truncates it after maxEvents complete NDJSON events, +// then appends an errorReader so the client sees a mid-stream disconnect. +// On GET /resume it tracks the after_eid query parameter. +type disconnectInjectTransport struct { + real http.RoundTripper + maxEvents int + postCount *int + resumeCount *int + lastAfterEid *int64 + mu *sync.Mutex +} + +func (rt *disconnectInjectTransport) RoundTrip(req *http.Request) (*http.Response, error) { + resp, err := rt.real.RoundTrip(req) + if err != nil { + return nil, err + } + + rt.mu.Lock() + defer rt.mu.Unlock() + + if req.Method == http.MethodPost && req.URL.Path == "/command" { + *rt.postCount++ + + full, readErr := io.ReadAll(resp.Body) + resp.Body.Close() + if readErr != nil || len(full) == 0 { + resp.Body = io.NopCloser(errorReader{}) + return resp, nil + } + + // Truncate after maxEvents complete events. Each NDJSON event is + // one line followed by a blank line, i.e. two consecutive \n chars. + maxNewlines := rt.maxEvents * 2 + nl := 0 + cut := len(full) + for i, b := range full { + if b == '\n' { + nl++ + if nl >= maxNewlines { + cut = i + 1 + break + } + } + } + + if cut < len(full) { + resp.Body = io.NopCloser(io.MultiReader( + strings.NewReader(string(full[:cut])), + errorReader{}, + )) + } else { + resp.Body = io.NopCloser(strings.NewReader(string(full))) + } + } else if req.Method == http.MethodGet && strings.Contains(req.URL.Path, "/resume") { + *rt.resumeCount++ + if q := req.URL.Query().Get("after_eid"); q != "" { + if eid, err := strconv.ParseInt(q, 10, 64); err == nil { + *rt.lastAfterEid = eid + } + } + } + + return resp, nil +} + +func TestRunCommand_AutoResumeOnDisconnect(t *testing.T) { + var mu sync.Mutex + var postCount, resumeCount int + var lastAfterEid int64 + + // writeEvents sends SSE events for eid range [first, last]. + // eid=1 is init, 2..20 are line1..line19, 21 is execution_complete. + writeEvents := func(w io.Writer, first, last int64) { + flusher, _ := w.(http.Flusher) + for eid := first; eid <= last; eid++ { + switch { + case eid == 1: + fmt.Fprintf(w, `{"type":"init","text":"cmd-r1","eid":1,"timestamp":1}`+"\n\n") + case eid >= 2 && eid <= 20: + lineNum := eid - 1 + fmt.Fprintf(w, `{"type":"stdout","text":"line%d","eid":%d,"timestamp":%d}`+"\n\n", lineNum, eid, lineNum*10) + case eid == 21: + fmt.Fprintf(w, `{"type":"execution_complete","eid":21,"timestamp":200,"execution_time":50}`+"\n\n") + } + } + if flusher != nil { + flusher.Flush() + } + } + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + mu.Lock() + defer mu.Unlock() + + path := r.URL.Path + if r.Method == http.MethodPost && path == "/command" { + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + writeEvents(w, 1, 21) + } else if r.Method == http.MethodGet && strings.Contains(path, "/resume") { + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + afterEidStr := r.URL.Query().Get("after_eid") + afterEid, _ := strconv.ParseInt(afterEidStr, 10, 64) + firstEid := afterEid + 1 + if firstEid < 1 { + firstEid = 1 + } + writeEvents(w, firstEid, 21) + } else { + w.WriteHeader(http.StatusNotFound) + } + })) + defer srv.Close() + + transport := &disconnectInjectTransport{ + real: srv.Client().Transport, + maxEvents: 4, // disconnect after init + 3 stdout lines + postCount: &postCount, + resumeCount: &resumeCount, + lastAfterEid: &lastAfterEid, + mu: &mu, + } + + execd := NewExecdClient(srv.URL, "test-token", + WithHTTPClient(&http.Client{Transport: transport}), + ) + + sb := &Sandbox{ + id: "test-sandbox", + config: &ConnectionConfig{Domain: srv.URL}, + execd: execd, + } + + execution, err := sb.RunCommand(context.Background(), "echo test", nil) + require.NoErrorf(t, err, "RunCommand") + + if postCount != 1 { + assert.Fail(t, fmt.Sprintf("postCount = %d, want 1", postCount)) + } + if resumeCount < 1 { + assert.Fail(t, fmt.Sprintf("resumeCount = %d, want >= 1", resumeCount)) + } + if lastAfterEid < 2 { + assert.Fail(t, fmt.Sprintf("lastAfterEid = %d, want >= 2", lastAfterEid)) + } + + if len(execution.Stdout) != 19 { + assert.Fail(t, fmt.Sprintf("len(Stdout) = %d, want 19", len(execution.Stdout))) + } + for i, msg := range execution.Stdout { + expected := fmt.Sprintf("line%d", i+1) + if msg.Text != expected { + assert.Fail(t, fmt.Sprintf("Stdout[%d].Text = %q, want %q", i, msg.Text, expected)) + } + } + + if execution.Complete == nil { + assert.Fail(t, "expected Complete to be set") + } + if execution.ExitCode == nil || *execution.ExitCode != 0 { + assert.Fail(t, fmt.Sprintf("ExitCode = %v, want 0", execution.ExitCode)) + } + if execution.LastEid < 10 { + assert.Fail(t, fmt.Sprintf("LastEid = %d, want >= 10", execution.LastEid)) + } + + t.Logf("resume test: post=%d resume=%d afterEid=%d stdout=%d lastEid=%d", + postCount, resumeCount, lastAfterEid, len(execution.Stdout), execution.LastEid) +} diff --git a/sdks/sandbox/go/sandbox_exec.go b/sdks/sandbox/go/sandbox_exec.go index 599c7c88b..ae87ee1f4 100644 --- a/sdks/sandbox/go/sandbox_exec.go +++ b/sdks/sandbox/go/sandbox_exec.go @@ -16,7 +16,11 @@ package opensandbox import ( "context" + "errors" "fmt" + "io" + "net" + "time" ) // RunCommand executes a shell command and returns the structured result. @@ -24,19 +28,83 @@ func (s *Sandbox) RunCommand(ctx context.Context, command string, handlers *Exec return s.RunCommandWithOpts(ctx, RunCommandRequest{Command: command}, handlers) } +const maxResumeRetries = 3 + +// isNetworkError reports whether err is a transient network error that should +// trigger SSE resume. Context cancellation and deadline errors are not retryable. +func isNetworkError(err error) bool { + if err == nil { + return false + } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return false + } + for { + var netErr net.Error + if errors.As(err, &netErr) { + return true + } + if errors.Is(err, io.ErrUnexpectedEOF) { + return true + } + unwrapped := errors.Unwrap(err) + if unwrapped == nil { + break + } + err = unwrapped + } + return false +} + // RunCommandWithOpts executes a command with full options. +// Automatically resumes the SSE stream on transient network disconnects. func (s *Sandbox) RunCommandWithOpts(ctx context.Context, req RunCommandRequest, handlers *ExecutionHandlers) (*Execution, error) { if s.execd == nil { return nil, fmt.Errorf("opensandbox: execd client not initialized") } exec := &Execution{} - err := s.execd.RunCommand(ctx, req, func(event StreamEvent) error { - return processStreamEvent(exec, event, handlers) - }) - if err != nil { - return exec, err + var commandID string + + for attempt := 0; attempt <= maxResumeRetries; attempt++ { + var streamErr error + if attempt == 0 { + streamErr = s.execd.RunCommand(ctx, req, func(event StreamEvent) error { + return processStreamEvent(exec, event, handlers) + }) + } else { + streamErr = s.execd.ResumeCommand(ctx, commandID, exec.LastEid, func(event StreamEvent) error { + return processStreamEvent(exec, event, handlers) + }) + } + + if streamErr == nil { + return exec, nil + } + + if exec.ID != "" { + commandID = exec.ID + } + + var apiErr *APIError + if errors.As(streamErr, &apiErr) { + if apiErr.StatusCode == 409 && attempt < maxResumeRetries { + if err := retrySleep(ctx, 1*time.Second); err != nil { + return exec, err + } + continue + } + if apiErr.StatusCode == 404 && attempt > 0 { + return exec, nil + } + return exec, streamErr + } + + if commandID == "" || attempt >= maxResumeRetries || !isNetworkError(streamErr) { + return exec, streamErr + } } + return exec, nil } diff --git a/sdks/sandbox/javascript/src/adapters/commandsAdapter.ts b/sdks/sandbox/javascript/src/adapters/commandsAdapter.ts index 602723f75..c94fbb9ac 100644 --- a/sdks/sandbox/javascript/src/adapters/commandsAdapter.ts +++ b/sdks/sandbox/javascript/src/adapters/commandsAdapter.ts @@ -15,6 +15,7 @@ import type { ExecdClient } from "../openapi/execdClient.js"; import { throwOnOpenApiFetchError } from "./openapiError.js"; import { parseJsonEventStream } from "./sse.js"; +import { SandboxApiException } from "../core/exceptions.js"; import type { paths as ExecdPaths } from "../api/execd.js"; import type { CommandExecution, @@ -105,12 +106,26 @@ function inferForegroundExitCode(execution: CommandExecution): number | null { : null; } +const MAX_RESUME_RETRIES = 3; + function assertNonBlank(value: string, field: string): void { if (!value.trim()) { throw new Error(`${field} cannot be empty`); } } +function isNetworkError(err: unknown): boolean { + if (err instanceof DOMException && err.name === "AbortError") return false; + if (err instanceof TypeError) return true; + if (err instanceof Error) { + const msg = err.message.toLowerCase(); + if (msg.includes("fetch failed") || msg.includes("econnreset") || + msg.includes("socket") || msg.includes("connect") || + msg.includes("network")) return true; + } + return false; +} + function parseOptionalDate(value: unknown, field: string): Date | undefined { if (value == null) return undefined; if (value instanceof Date) return value; @@ -192,6 +207,30 @@ export class CommandsAdapter implements ExecdCommands { } } + private async *resumeStream( + commandId: string, + afterEid: number, + signal?: AbortSignal, + ): AsyncIterable { + const url = joinUrl( + this.opts.baseUrl, + `/command/${encodeURIComponent(commandId)}/resume?after_eid=${afterEid}`, + ); + const res = await this.fetch(url, { + method: "GET", + headers: { + accept: "text/event-stream", + ...(this.opts.headers ?? {}), + }, + signal, + }); + for await (const ev of parseJsonEventStream(res, { + fallbackErrorMessage: "Resume command failed", + })) { + yield ev; + } + } + private async consumeExecutionStream( stream: AsyncIterable, handlers?: ExecutionHandlers, @@ -201,6 +240,18 @@ export class CommandsAdapter implements ExecdCommands { logs: { stdout: [], stderr: [] }, result: [], }; + await this.consumeExecutionInto(stream, execution, handlers); + if (inferExitCode) { + execution.exitCode = inferForegroundExitCode(execution); + } + return execution; + } + + private async consumeExecutionInto( + stream: AsyncIterable, + execution: CommandExecution, + handlers?: ExecutionHandlers, + ): Promise { const dispatcher = new ExecutionEventDispatcher(execution, handlers); for await (const ev of stream) { if (ev.type === "init" && (ev.text ?? "") === "" && execution.id) { @@ -208,12 +259,6 @@ export class CommandsAdapter implements ExecdCommands { } await dispatcher.dispatch(ev as any); } - - if (inferExitCode) { - execution.exitCode = inferForegroundExitCode(execution); - } - - return execution; } async interrupt(sessionId: string): Promise { @@ -280,11 +325,46 @@ export class CommandsAdapter implements ExecdCommands { handlers?: ExecutionHandlers, signal?: AbortSignal, ): Promise { - return this.consumeExecutionStream( - this.runStream(command, opts, signal), - handlers, - !opts?.background, - ); + const inferExitCode = !opts?.background; + const execution: CommandExecution = { + logs: { stdout: [], stderr: [] }, + result: [], + }; + let commandId: string | undefined; + + for (let attempt = 0; attempt <= MAX_RESUME_RETRIES; attempt++) { + try { + const stream = attempt === 0 + ? this.runStream(command, opts, signal) + : this.resumeStream(commandId!, execution.lastEid ?? 0, signal); + + await this.consumeExecutionInto(stream, execution, handlers); + + if (inferExitCode) { + execution.exitCode = inferForegroundExitCode(execution); + } + return execution; + } catch (err) { + if (execution.id) commandId = execution.id; + + if (err instanceof SandboxApiException) { + if (err.statusCode === 409 && attempt < MAX_RESUME_RETRIES) { + await new Promise(resolve => setTimeout(resolve, 1000)); + continue; + } + if (err.statusCode === 404 && attempt > 0) { + return execution; + } + throw err; + } + + if (!commandId || attempt >= MAX_RESUME_RETRIES || !isNetworkError(err)) { + throw err; + } + } + } + + return execution; } async createSession(options?: { workingDirectory?: string }): Promise { diff --git a/sdks/sandbox/javascript/src/api/execd.ts b/sdks/sandbox/javascript/src/api/execd.ts index 03fb65517..475ad7cf1 100644 --- a/sdks/sandbox/javascript/src/api/execd.ts +++ b/sdks/sandbox/javascript/src/api/execd.ts @@ -266,6 +266,33 @@ export interface paths { patch?: never; trace?: never; }; + "/command/{id}/resume": { + parameters: { + query?: never; + header?: never; + path?: never; + cookie?: never; + }; + /** + * Resume command SSE stream (replay and optional live tail) + * @description Replays buffered events (stdout, stderr, execution_complete, execution_error) from + * the server-side ring buffer for events with `eid` strictly greater than + * `after_eid`, then—if the command is still running and no other client holds the + * primary SSE slot—continues streaming live events until completion or client + * disconnect. Event shape matches `POST /command` (`ServerStreamEvent`). + * + * This endpoint is mutually exclusive with the primary `POST /command` SSE: if that + * connection is still active, the server responds with 409 Conflict. + */ + get: operations["resumeCommandStream"]; + put?: never; + post?: never; + delete?: never; + options?: never; + head?: never; + patch?: never; + trace?: never; + }; "/command/{id}/logs": { parameters: { query?: never; @@ -948,6 +975,21 @@ export interface components { "application/json": components["schemas"]["ErrorResponse"]; }; }; + /** @description Request conflicts with current server state (e.g. resource in use) */ + Conflict: { + headers: { + [name: string]: unknown; + }; + content: { + /** + * @example { + * "code": "INVALID_REQUEST_BODY", + * "message": "primary SSE stream is still active; disconnect it before resuming" + * } + */ + "application/json": components["schemas"]["ErrorResponse"]; + }; + }; /** @description Runtime server error during operation */ InternalServerError: { headers: { @@ -1342,6 +1384,46 @@ export interface operations { 500: components["responses"]["InternalServerError"]; }; }; + resumeCommandStream: { + parameters: { + query?: { + /** + * @description Only events with `eid` greater than this value are replayed. All event types + * (stdout, stderr, execution_complete, execution_error) carry monotonically + * increasing `eid` values. Omit or use `0` to replay from the oldest buffered + * events. After replay, if the command is still running and the primary SSE slot + * is free, live events continue to stream. + * @example 42 + */ + after_eid?: number; + }; + header?: never; + path: { + /** + * @description Command ID returned by RunCommand + * @example cmd-abc123 + */ + id: string; + }; + cookie?: never; + }; + requestBody?: never; + responses: { + /** @description Stream of command execution events (replay then optional live continuation) */ + 200: { + headers: { + [name: string]: unknown; + }; + content: { + "text/event-stream": components["schemas"]["ServerStreamEvent"]; + }; + }; + 400: components["responses"]["BadRequest"]; + 404: components["responses"]["NotFound"]; + 409: components["responses"]["Conflict"]; + 500: components["responses"]["InternalServerError"]; + }; + }; getBackgroundCommandLogs: { parameters: { query?: { diff --git a/sdks/sandbox/javascript/src/models/execd.ts b/sdks/sandbox/javascript/src/models/execd.ts index 992dd8941..4f5930924 100644 --- a/sdks/sandbox/javascript/src/models/execd.ts +++ b/sdks/sandbox/javascript/src/models/execd.ts @@ -35,6 +35,8 @@ export interface ServerStreamEvent extends Record { text?: string; results?: Record; error?: Record; + /** Monotonic event ID for SSE resume. */ + eid?: number; } export interface CodeContextRequest extends Record { diff --git a/sdks/sandbox/javascript/src/models/execution.ts b/sdks/sandbox/javascript/src/models/execution.ts index 144236dc3..a527270e5 100644 --- a/sdks/sandbox/javascript/src/models/execution.ts +++ b/sdks/sandbox/javascript/src/models/execution.ts @@ -55,6 +55,8 @@ export interface Execution { error?: ExecutionError; complete?: ExecutionComplete; exitCode?: number | null; + /** Highest event ID seen, used for SSE resume. */ + lastEid?: number; } export interface ExecutionHandlers { diff --git a/sdks/sandbox/javascript/src/models/executionEventDispatcher.ts b/sdks/sandbox/javascript/src/models/executionEventDispatcher.ts index 303fdcc04..4ad887268 100644 --- a/sdks/sandbox/javascript/src/models/executionEventDispatcher.ts +++ b/sdks/sandbox/javascript/src/models/executionEventDispatcher.ts @@ -35,6 +35,10 @@ export class ExecutionEventDispatcher { ) {} async dispatch(ev: ServerStreamEvent): Promise { + if (ev.eid != null && ev.eid > (this.execution.lastEid ?? 0)) { + this.execution.lastEid = ev.eid; + } + await this.handlers?.onEvent?.(ev); const ts = ev.timestamp ?? Date.now(); diff --git a/sdks/sandbox/javascript/tests/commands.run.test.mjs b/sdks/sandbox/javascript/tests/commands.run.test.mjs index e05ef98d7..d55624e82 100644 --- a/sdks/sandbox/javascript/tests/commands.run.test.mjs +++ b/sdks/sandbox/javascript/tests/commands.run.test.mjs @@ -113,6 +113,62 @@ test("CommandsAdapter.runInSession sends command and timeout fields", async () = assert.equal(execution.exitCode, 0); }); +test("CommandsAdapter.run auto-resumes on SSE disconnect", async () => { + const initialEvents = [ + { type: "init", text: "cmd-resume", timestamp: 1, eid: 1 }, + { type: "stdout", text: "before-disconnect", timestamp: 2, eid: 2 }, + ]; + const resumeEvents = [ + { type: "stdout", text: "after-resume", timestamp: 3, eid: 3 }, + { type: "execution_complete", timestamp: 4, execution_time: 10, eid: 4 }, + ]; + + let callCount = 0; + const fetchImpl = async (url, init) => { + callCount++; + if (callCount === 1) { + const body = initialEvents.map(e => `data: ${JSON.stringify(e)}`).join("\n") + "\n"; + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode(body)); + }, + pull(controller) { + controller.error(new TypeError("fetch failed")); + }, + }); + return new Response(stream, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); + } + // Resume call + assert.ok(url.includes("/resume?after_eid=2"), `unexpected resume URL: ${url}`); + const body = resumeEvents.map(e => `data: ${JSON.stringify(e)}`).join("\n") + "\n\n"; + return new Response(body, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); + }; + + const adapter = new CommandsAdapter( + {}, + { + baseUrl: "http://127.0.0.1:8080", + fetch: fetchImpl, + }, + ); + + const execution = await adapter.run("echo test"); + + assert.equal(execution.id, "cmd-resume"); + assert.equal(execution.logs.stdout.length, 2); + assert.equal(execution.logs.stdout[0].text, "before-disconnect"); + assert.equal(execution.logs.stdout[1].text, "after-resume"); + assert.equal(execution.lastEid, 4); + assert.equal(execution.exitCode, 0); + assert.equal(callCount, 2); +}); + test("CommandsAdapter.runInSession infers non-zero exitCode from final error state", async () => { const adapter = createAdapter( [ diff --git a/sdks/sandbox/kotlin/sandbox-api/src/main/kotlin/com/alibaba/opensandbox/sandbox/api/models/execd/ExecutionModels.kt b/sdks/sandbox/kotlin/sandbox-api/src/main/kotlin/com/alibaba/opensandbox/sandbox/api/models/execd/ExecutionModels.kt index 91efc4ada..0051fa9a2 100644 --- a/sdks/sandbox/kotlin/sandbox-api/src/main/kotlin/com/alibaba/opensandbox/sandbox/api/models/execd/ExecutionModels.kt +++ b/sdks/sandbox/kotlin/sandbox-api/src/main/kotlin/com/alibaba/opensandbox/sandbox/api/models/execd/ExecutionModels.kt @@ -33,6 +33,8 @@ data class EventNode( @SerialName("execution_count") val executionCount: Long? = null, val error: ErrorData? = null, + @SerialName("eid") + val eid: Long? = null, ) @Serializable diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/HttpClientProvider.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/HttpClientProvider.kt index 93f9e5391..0f04ed981 100644 --- a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/HttpClientProvider.kt +++ b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/HttpClientProvider.kt @@ -81,12 +81,15 @@ class HttpClientProvider( .writeTimeout(config.requestTimeout.toMillis(), TimeUnit.MILLISECONDS) .callTimeout(0, TimeUnit.MILLISECONDS) .addInterceptor(ExtraHeadersInterceptor(getSseHeaders())) + .apply { sseInterceptors.forEach { addInterceptor(it) } } .addLoggingInterceptor() .build() } val sseClient: OkHttpClient by sseClientLazy + internal val sseInterceptors: MutableList = mutableListOf() + // --- Helper Extensions --- private fun OkHttpClient.Builder.applyStandardTimeouts(): OkHttpClient.Builder { diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/models/execd/executions/ExecutionModels.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/models/execd/executions/ExecutionModels.kt index a46b0938f..9c93ec8ea 100644 --- a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/models/execd/executions/ExecutionModels.kt +++ b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/models/execd/executions/ExecutionModels.kt @@ -39,6 +39,7 @@ class Execution( var complete: ExecutionComplete? = null, var exitCode: Int? = null, val logs: ExecutionLogs = ExecutionLogs(), + var lastEid: Long = 0L, ) { /** * Adds a new execution result to this execution. diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/adapters/converter/ExecutionEventDispatcher.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/adapters/converter/ExecutionEventDispatcher.kt index 0bef96ef7..daca42d04 100644 --- a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/adapters/converter/ExecutionEventDispatcher.kt +++ b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/adapters/converter/ExecutionEventDispatcher.kt @@ -30,6 +30,11 @@ class ExecutionEventDispatcher( private val handlers: ExecutionHandlers? = null, ) { fun dispatch(eventNode: EventNode) { + eventNode.eid?.let { eid -> + if (eid > execution.lastEid) { + execution.lastEid = eid + } + } val type = eventNode.type val timestamp = eventNode.timestamp when (type) { diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/adapters/service/CommandsAdapter.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/adapters/service/CommandsAdapter.kt index 76d2b249e..416959971 100644 --- a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/adapters/service/CommandsAdapter.kt +++ b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/adapters/service/CommandsAdapter.kt @@ -50,6 +50,7 @@ import okhttp3.Request import okhttp3.RequestBody.Companion.toRequestBody import okhttp3.Response import org.slf4j.LoggerFactory +import java.io.IOException import com.alibaba.opensandbox.sandbox.api.models.execd.CreateSessionRequest as CreateSessionRequestApi import com.alibaba.opensandbox.sandbox.api.models.execd.RunInSessionRequest as RunInSessionRequestApi @@ -64,6 +65,8 @@ internal class CommandsAdapter( companion object { private const val RUN_COMMAND_PATH = "/command" private const val SESSION_PATH_SEGMENT = "session" + private const val RESUME_PATH = "/command/%s/resume" + private const val MAX_RESUME_RETRIES = 3 } private val logger = LoggerFactory.getLogger(CommandsAdapter::class.java) @@ -88,28 +91,68 @@ internal class CommandsAdapter( if (request.command.isEmpty()) { throw InvalidArgumentException("Command cannot be empty") } - try { - val httpRequest = - Request.Builder() - .url("$execdBaseUrl$RUN_COMMAND_PATH") - .post( - jsonParser.encodeToString(request.toApiRunCommandRequest()).toRequestBody("application/json".toMediaType()), - ) - .headers(execdEndpoint.headers.toHeaders()) - .build() - return executeStreamingRequest( - httpRequest = httpRequest, - handlers = request.handlers, - inferExitCode = !request.background, - failureMessage = { statusCode, errorBody -> - "Failed to run commands. Status code: $statusCode, Body: $errorBody" - }, - ) - } catch (e: Exception) { - logger.error("Failed to run command (length: {})", request.command.length, e) - throw e.toSandboxException() + val execution = Execution() + var commandID: String? = null + + for (attempt in 0..MAX_RESUME_RETRIES) { + try { + val httpRequest = if (attempt == 0) { + Request.Builder() + .url("$execdBaseUrl$RUN_COMMAND_PATH") + .post( + jsonParser.encodeToString(request.toApiRunCommandRequest()) + .toRequestBody("application/json".toMediaType()), + ) + .headers(execdEndpoint.headers.toHeaders()) + .build() + } else { + val resumeUrl = RESUME_PATH.format(commandID!!) + Request.Builder() + .url("$execdBaseUrl$resumeUrl?after_eid=${execution.lastEid}") + .get() + .headers(execdEndpoint.headers.toHeaders()) + .build() + } + + streamEvents( + httpRequest = httpRequest, + execution = execution, + handlers = request.handlers, + failureMessage = { statusCode, errorBody -> + "Failed to run commands. Status code: $statusCode, Body: $errorBody" + }, + ) + + if (!request.background) { + execution.exitCode = inferForegroundExitCode(execution) + } + return execution + } catch (e: Exception) { + if (execution.id != null) { + commandID = execution.id + } + + if (e is SandboxApiException) { + if (e.statusCode == 409 && attempt < MAX_RESUME_RETRIES) { + retrySleep(1000L) + continue + } + if (e.statusCode == 404 && attempt > 0) { + return execution + } + logger.error("Failed to run command (length: {})", request.command.length, e) + throw e + } + + if (commandID == null || attempt >= MAX_RESUME_RETRIES || !isNetworkError(e)) { + logger.error("Failed to run command (length: {})", request.command.length, e) + throw e.toSandboxException() + } + } } + + return execution } override fun interrupt(executionId: String) { @@ -216,14 +259,17 @@ internal class CommandsAdapter( .headers(execdEndpoint.headers.toHeaders()) .build() - return executeStreamingRequest( + val execution = Execution() + streamEvents( httpRequest = httpRequest, + execution = execution, handlers = request.handlers, - inferExitCode = true, failureMessage = { statusCode, errorBody -> "run_in_session failed. Status: $statusCode, Body: $errorBody" }, ) + execution.exitCode = inferForegroundExitCode(execution) + return execution } catch (e: Exception) { logger.error("Failed to run in session", e) throw e.toSandboxException() @@ -242,14 +288,12 @@ internal class CommandsAdapter( } } - private fun executeStreamingRequest( + private fun streamEvents( httpRequest: Request, + execution: Execution, handlers: ExecutionHandlers?, - inferExitCode: Boolean, failureMessage: (Int, String?) -> String, - ): Execution { - val execution = Execution() - + ) { httpClientProvider.sseClient.newCall(httpRequest).execute().use { response -> ensureSuccessfulStreamingResponse(response, failureMessage) @@ -266,11 +310,6 @@ internal class CommandsAdapter( } } } - - if (inferExitCode) { - execution.exitCode = inferForegroundExitCode(execution) - } - return execution } private fun ensureSuccessfulStreamingResponse( @@ -325,4 +364,27 @@ internal class CommandsAdapter( if (execution.complete != null) 0 else null } } + + private fun isNetworkError(e: Exception): Boolean { + if (e is IOException) return true + var cause: Throwable? = e.cause + while (cause != null) { + if (cause is IOException) return true + cause = cause.cause + } + return false + } + + private fun retrySleep(millis: Long) { + try { + Thread.sleep(millis) + } catch (e: InterruptedException) { + Thread.currentThread().interrupt() + throw SandboxApiException( + message = "Interrupted during SSE resume retry sleep", + statusCode = 0, + error = SandboxError(UNEXPECTED_RESPONSE), + ) + } + } } diff --git a/sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/adapters/service/CommandsAdapterTest.kt b/sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/adapters/service/CommandsAdapterTest.kt index 0985dbe3b..c69f7e176 100644 --- a/sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/adapters/service/CommandsAdapterTest.kt +++ b/sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/adapters/service/CommandsAdapterTest.kt @@ -32,8 +32,10 @@ import kotlinx.serialization.json.jsonObject import kotlinx.serialization.json.jsonPrimitive import okhttp3.mockwebserver.MockResponse import okhttp3.mockwebserver.MockWebServer +import okhttp3.mockwebserver.SocketPolicy import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull import org.junit.jupiter.api.Assertions.assertThrows import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.BeforeEach @@ -412,4 +414,84 @@ data: {"type":"execution_complete","execution_time":100,"timestamp":167253120100 val ex = assertThrows(InvalidArgumentException::class.java) { commandsAdapter.deleteSession(" ") } assertEquals("session_id cannot be empty", ex.message) } + + // ----------------------------------------------------------------------- + // SSE resume test + // ----------------------------------------------------------------------- + + @Test + fun `run should auto-resume on SSE disconnect`() { + val totalLines = 40 + val allEvents = buildString { + appendLine("""{"type":"init","text":"cmd-resume","timestamp":1672531200000,"eid":1}""") + for (i in 1..totalLines) { + appendLine("""{"type":"stdout","text":"line$i","timestamp":1672531200000,"eid":${i + 1}}""") + } + appendLine("""{"type":"execution_complete","execution_time":100,"timestamp":1672531201000,"eid":${totalLines + 2}}""") + } + + var resumeCalled = false + var lastAfterEid: Long? = null + var postRequestCount = 0 + + mockWebServer.dispatcher = object : okhttp3.mockwebserver.Dispatcher() { + override fun dispatch(request: okhttp3.mockwebserver.RecordedRequest): MockResponse { + return when { + request.method == "POST" && request.path == "/command" -> { + postRequestCount++ + MockResponse() + .setResponseCode(200) + .setBody(allEvents) + .setSocketPolicy(SocketPolicy.DISCONNECT_DURING_RESPONSE_BODY) + } + request.method == "GET" && request.path!!.contains("/resume") -> { + resumeCalled = true + lastAfterEid = request.requestUrl!!.queryParameter("after_eid")!!.toLong() + val startEid = lastAfterEid!! + 1 + val remaining = buildString { + for (eid in startEid..(totalLines + 1)) { + appendLine("""{"type":"stdout","text":"line${eid - 1}","timestamp":1672531200000,"eid":$eid}""") + } + appendLine( + """{"type":"execution_complete","execution_time":100,"timestamp":1672531201000,"eid":${totalLines + 2}}""", + ) + } + MockResponse() + .setResponseCode(200) + .setBody(remaining) + } + else -> MockResponse().setResponseCode(404) + } + } + } + + val receivedLines = mutableSetOf() + val latch = CountDownLatch(1) + val handlers = ExecutionHandlers.builder() + .onStdout { msg -> receivedLines.add(msg.text) } + .onExecutionComplete { latch.countDown() } + .build() + + val request = RunCommandRequest.builder() + .command("for i in 1..40; do echo line\$i; done") + .handlers(handlers) + .build() + + val execution = commandsAdapter.run(request) + + assertTrue(resumeCalled, "Expected resume to be called after disconnect") + assertTrue(latch.await(5, TimeUnit.SECONDS), "Timed out waiting for completion") + assertEquals(totalLines, receivedLines.size, + "Expected all $totalLines lines, got ${receivedLines.size}: $receivedLines") + for (i in 1..totalLines) { + assertTrue(receivedLines.contains("line$i"), "Missing line$i") + } + assertEquals("cmd-resume", execution.id) + assertEquals(0, execution.exitCode) + assertNotNull(execution.complete) + assertEquals(1, postRequestCount, "Should send exactly one POST /command") + assertNotNull(lastAfterEid) + assertTrue(lastAfterEid!! >= 1, "after_eid should be >= 1, got $lastAfterEid") + assertTrue(execution.lastEid >= totalLines + 1) + } } diff --git a/sdks/sandbox/python/src/opensandbox/adapters/command_adapter.py b/sdks/sandbox/python/src/opensandbox/adapters/command_adapter.py index e8874b04c..71ce7ea68 100644 --- a/sdks/sandbox/python/src/opensandbox/adapters/command_adapter.py +++ b/sdks/sandbox/python/src/opensandbox/adapters/command_adapter.py @@ -21,6 +21,7 @@ synchronous and streaming execution modes with proper session management. """ +import asyncio import json import logging from datetime import timedelta @@ -137,6 +138,8 @@ class CommandsAdapter(Commands): INTERRUPT_COMMAND_PATH = "/command/{execution_id}/interrupt" SESSION_PATH = "/session" RUN_IN_SESSION_PATH = "/session/{session_id}/run" + _RESUME_PATH = "/command/{id}/resume" + _MAX_RESUME_RETRIES = 3 def __init__( self, @@ -225,30 +228,83 @@ async def _execute_streaming_request( result=[], error=None, ) - client = await self._get_sse_client() - - async with client.stream("POST", url, json=json_body) as response: - if response.status_code != 200: - await response.aread() - error_body = response.text - logger.error( - "%s. Status: %s, Body: %s", - failure_message, - response.status_code, - error_body, + command_id: str | None = None + last_eid: int = 0 + + for attempt in range(self._MAX_RESUME_RETRIES + 1): + try: + if attempt == 0: + client = await self._get_sse_client() + response_ctx = client.stream("POST", url, json=json_body) + else: + resume_url = self._get_execd_url( + self._RESUME_PATH.format(id=command_id) + + f"?after_eid={last_eid}" + ) + logger.info( + "SSE resume attempt %d/%d: command_id=%s after_eid=%d", + attempt, + self._MAX_RESUME_RETRIES, + command_id, + last_eid, + ) + client = await self._get_sse_client() + response_ctx = client.stream("GET", resume_url) + + async with response_ctx as response: + if response.status_code == 409: + await response.aread() + if attempt < self._MAX_RESUME_RETRIES: + await asyncio.sleep(1) + continue + logger.warning("SSE resume: 409 conflict, primary still active after retries") + break + if response.status_code == 404 and attempt > 0: + logger.info("SSE resume: 404, command finished or buffer expired") + break + if response.status_code != 200: + await response.aread() + error_body = response.text + logger.error( + "%s. Status: %s, Body: %s", + failure_message, + response.status_code, + error_body, + ) + raise SandboxApiException( + message=f"{failure_message}. Status code: {response.status_code}", + status_code=response.status_code, + request_id=extract_request_id(response.headers), + ) + + dispatcher = ExecutionEventDispatcher(execution, handlers) + async for line in response.aiter_lines(): + event_node = _decode_sse_event_line(line) + if event_node is None: + continue + if event_node.type == "init" and not command_id: + command_id = event_node.text + if event_node.eid: + last_eid = max(last_eid, event_node.eid) + await dispatcher.dispatch(event_node) + + # Stream completed normally + break + + except ( + httpx.ReadError, + httpx.RemoteProtocolError, + httpx.ConnectError, + httpx.ConnectTimeout, + ) as e: + if not command_id or attempt >= self._MAX_RESUME_RETRIES: + raise + logger.warning( + "SSE stream disconnected (attempt %d/%d): %s", + attempt + 1, + self._MAX_RESUME_RETRIES + 1, + e, ) - raise SandboxApiException( - message=f"{failure_message}. Status code: {response.status_code}", - status_code=response.status_code, - request_id=extract_request_id(response.headers), - ) - - dispatcher = ExecutionEventDispatcher(execution, handlers) - async for line in response.aiter_lines(): - event_node = _decode_sse_event_line(line) - if event_node is None: - continue - await dispatcher.dispatch(event_node) if infer_exit_code: execution.exit_code = _infer_foreground_exit_code(execution) diff --git a/sdks/sandbox/python/src/opensandbox/adapters/converter/event_node.py b/sdks/sandbox/python/src/opensandbox/adapters/converter/event_node.py index 9de7ffb40..47aa47769 100644 --- a/sdks/sandbox/python/src/opensandbox/adapters/converter/event_node.py +++ b/sdks/sandbox/python/src/opensandbox/adapters/converter/event_node.py @@ -60,6 +60,7 @@ class EventNode(BaseModel): Corresponds to ServerStreamEvent in OpenAPI spec. """ + eid: int | None = None type: str text: str | None = None execution_count: int | None = Field(default=None, alias="execution_count") diff --git a/sdks/sandbox/python/src/opensandbox/sync/adapters/command_adapter.py b/sdks/sandbox/python/src/opensandbox/sync/adapters/command_adapter.py index e33bdffd1..ef7a1c1cd 100644 --- a/sdks/sandbox/python/src/opensandbox/sync/adapters/command_adapter.py +++ b/sdks/sandbox/python/src/opensandbox/sync/adapters/command_adapter.py @@ -19,6 +19,7 @@ import json import logging +import time from datetime import timedelta import httpx @@ -127,6 +128,8 @@ class CommandsAdapterSync(CommandsSync): RUN_COMMAND_PATH = "/command" SESSION_PATH = "/session" RUN_IN_SESSION_PATH = "/session/{session_id}/run" + _RESUME_PATH = "/command/{id}/resume" + _MAX_RESUME_RETRIES = 3 def __init__(self, connection_config: ConnectionConfigSync, execd_endpoint: SandboxEndpoint) -> None: """ @@ -192,23 +195,75 @@ def _execute_streaming_request( failure_message: str, ) -> Execution: execution = Execution(id=None, execution_count=None, result=[], error=None) - dispatcher = ExecutionEventDispatcherSync(execution, handlers) - - with self._sse_client.stream("POST", url, json=json_body) as response: - if response.status_code != 200: - response.read() - raise SandboxApiException( - message=f"{failure_message}. Status code: {response.status_code}", - status_code=response.status_code, - request_id=extract_request_id(response.headers), + command_id: str | None = None + last_eid: int = 0 + + for attempt in range(self._MAX_RESUME_RETRIES + 1): + try: + if attempt == 0: + response_ctx = self._sse_client.stream("POST", url, json=json_body) + else: + resume_url = self._get_execd_url( + self._RESUME_PATH.format(id=command_id) + + f"?after_eid={last_eid}" + ) + logger.info( + "SSE resume attempt %d/%d: command_id=%s after_eid=%d", + attempt, + self._MAX_RESUME_RETRIES, + command_id, + last_eid, + ) + response_ctx = self._sse_client.stream("GET", resume_url) + + with response_ctx as response: + if response.status_code == 409: + response.read() + if attempt < self._MAX_RESUME_RETRIES: + time.sleep(1) + continue + logger.warning("SSE resume: 409 conflict, primary still active after retries") + break + if response.status_code == 404 and attempt > 0: + logger.info("SSE resume: 404, command finished or buffer expired") + break + if response.status_code != 200: + response.read() + raise SandboxApiException( + message=f"{failure_message}. Status code: {response.status_code}", + status_code=response.status_code, + request_id=extract_request_id(response.headers), + ) + + dispatcher = ExecutionEventDispatcherSync(execution, handlers) + for line in response.iter_lines(): + event_node = _decode_sse_event_line(line) + if event_node is None: + continue + if event_node.type == "init" and not command_id: + command_id = event_node.text + if event_node.eid: + last_eid = max(last_eid, event_node.eid) + dispatcher.dispatch(event_node) + + # Stream completed normally + break + + except ( + httpx.ReadError, + httpx.RemoteProtocolError, + httpx.ConnectError, + httpx.ConnectTimeout, + ) as e: + if not command_id or attempt >= self._MAX_RESUME_RETRIES: + raise + logger.warning( + "SSE stream disconnected (attempt %d/%d): %s", + attempt + 1, + self._MAX_RESUME_RETRIES + 1, + e, ) - for line in response.iter_lines(): - event_node = _decode_sse_event_line(line) - if event_node is None: - continue - dispatcher.dispatch(event_node) - if infer_exit_code: execution.exit_code = _infer_foreground_exit_code(execution) diff --git a/sdks/sandbox/python/tests/test_command_service_adapter_streaming.py b/sdks/sandbox/python/tests/test_command_service_adapter_streaming.py index eae23a8f8..1dbc7ec72 100644 --- a/sdks/sandbox/python/tests/test_command_service_adapter_streaming.py +++ b/sdks/sandbox/python/tests/test_command_service_adapter_streaming.py @@ -27,6 +27,68 @@ from opensandbox.models.sandboxes import SandboxEndpoint +class _ErrorAfterAsyncStream(httpx.AsyncByteStream): + """Async byte stream that yields data then raises ReadError.""" + + def __init__(self, *chunks: bytes) -> None: + self._chunks = iter(chunks) + + def __aiter__(self) -> httpx.AsyncByteStream: + return self + + async def __anext__(self) -> bytes: + try: + chunk = next(self._chunks) + except StopIteration: + raise httpx.ReadError("simulated disconnect") from None + if chunk is None: + raise httpx.ReadError("simulated disconnect") + return chunk + + +class _ResumeTransport(httpx.AsyncBaseTransport): + """Simulates SSE disconnect then resume on GET /command/:id/resume.""" + + def __init__(self) -> None: + self.post_count = 0 + self.resume_count = 0 + self.last_resume_eid: int | None = None + + async def handle_async_request(self, request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/command": + self.post_count += 1 + partial = ( + b'data: {"type":"init","text":"cmd-resume-1","timestamp":100}\n\n', + b'data: {"type":"stdout","eid":1,"text":"line1","timestamp":101}\n\n', + b'data: {"type":"stdout","eid":2,"text":"line2","timestamp":102}\n\n', + None, # sentinel: disconnect after line2 + ) + return httpx.Response( + 200, + headers={"Content-Type": "text/event-stream"}, + stream=_ErrorAfterAsyncStream(*partial), + request=request, + ) + + if request.method == "GET" and "/resume" in request.url.path: + self.resume_count += 1 + qp = request.url.params.get("after_eid") + if qp: + self.last_resume_eid = int(qp) + remaining = ( + b'data: {"type":"stdout","eid":3,"text":"line3","timestamp":103}\n\n' + b'data: {"type":"execution_complete","eid":4,"execution_time":42,"timestamp":104}\n\n' + ) + return httpx.Response( + 200, + headers={"Content-Type": "text/event-stream"}, + content=remaining, + request=request, + ) + + return httpx.Response(500, content=b"unexpected", request=request) + + class _SseTransport(httpx.AsyncBaseTransport): def __init__(self) -> None: self.last_request: httpx.Request | None = None @@ -217,3 +279,27 @@ async def test_run_in_session_non_zero_exit_updates_exit_code() -> None: assert execution.error.value == "7" assert execution.complete is None assert execution.exit_code == 7 + + +@pytest.mark.asyncio +async def test_run_command_auto_resume_on_sse_disconnect() -> None: + """SSE drops after first two stdout lines; resume replays the rest transparently.""" + transport = _ResumeTransport() + cfg = ConnectionConfig(protocol="http", transport=transport) + endpoint = SandboxEndpoint(endpoint="localhost:44772", port=44772) + adapter = CommandsAdapter(cfg, endpoint) + + execution = await adapter.run("echo lines") + + assert transport.post_count == 1, "should send POST /command" + assert transport.resume_count == 1, "should send GET /command/:id/resume" + assert transport.last_resume_eid == 2, "resume after_eid should match last received eid" + + assert execution.id == "cmd-resume-1" + assert len(execution.logs.stdout) == 3 + assert execution.logs.stdout[0].text == "line1" + assert execution.logs.stdout[1].text == "line2" + assert execution.logs.stdout[2].text == "line3" + assert execution.complete is not None + assert execution.complete.execution_time_in_millis == 42 + assert execution.exit_code == 0 diff --git a/sdks/sandbox/python/tests/test_sync_command_service_adapter_streaming.py b/sdks/sandbox/python/tests/test_sync_command_service_adapter_streaming.py index a92e174f6..9f5cdfdeb 100644 --- a/sdks/sandbox/python/tests/test_sync_command_service_adapter_streaming.py +++ b/sdks/sandbox/python/tests/test_sync_command_service_adapter_streaming.py @@ -25,6 +25,68 @@ from opensandbox.sync.adapters.command_adapter import CommandsAdapterSync +class _SyncErrorAfterStream(httpx.SyncByteStream): + """Sync byte stream that yields data then raises ReadError.""" + + def __init__(self, *chunks: bytes) -> None: + self._chunks = iter(chunks) + + def __iter__(self): + return self + + def __next__(self) -> bytes: + try: + chunk = next(self._chunks) + except StopIteration: + raise httpx.ReadError("simulated disconnect") from None + if chunk is None: + raise httpx.ReadError("simulated disconnect") + return chunk + + +class _SyncResumeTransport(httpx.BaseTransport): + """Simulates SSE disconnect then resume on GET /command/:id/resume (sync).""" + + def __init__(self) -> None: + self.post_count = 0 + self.resume_count = 0 + self.last_resume_eid: int | None = None + + def handle_request(self, request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/command": + self.post_count += 1 + partial = ( + b'data: {"type":"init","text":"cmd-resume-1","timestamp":100}\n\n', + b'data: {"type":"stdout","eid":1,"text":"line1","timestamp":101}\n\n', + b'data: {"type":"stdout","eid":2,"text":"line2","timestamp":102}\n\n', + None, # sentinel: disconnect after line2 + ) + return httpx.Response( + 200, + headers={"Content-Type": "text/event-stream"}, + stream=_SyncErrorAfterStream(*partial), + request=request, + ) + + if request.method == "GET" and "/resume" in request.url.path: + self.resume_count += 1 + qp = request.url.params.get("after_eid") + if qp: + self.last_resume_eid = int(qp) + remaining = ( + b'data: {"type":"stdout","eid":3,"text":"line3","timestamp":103}\n\n' + b'data: {"type":"execution_complete","eid":4,"execution_time":42,"timestamp":104}\n\n' + ) + return httpx.Response( + 200, + headers={"Content-Type": "text/event-stream"}, + content=remaining, + request=request, + ) + + return httpx.Response(500, content=b"unexpected", request=request) + + class _SseTransport(httpx.BaseTransport): def handle_request(self, request: httpx.Request) -> httpx.Response: body = request.content.decode("utf-8") if isinstance(request.content, (bytes, bytearray)) else "" @@ -164,3 +226,26 @@ def test_sync_run_in_session_non_zero_exit_updates_exit_code() -> None: assert execution.error.value == "7" assert execution.complete is None assert execution.exit_code == 7 + + +def test_sync_run_command_auto_resume_on_sse_disconnect() -> None: + """SSE drops after first two stdout lines; resume replays the rest transparently.""" + transport = _SyncResumeTransport() + cfg = ConnectionConfigSync(protocol="http", transport=transport) + endpoint = SandboxEndpoint(endpoint="localhost:44772", port=44772) + adapter = CommandsAdapterSync(cfg, endpoint) + + execution = adapter.run("echo lines") + + assert transport.post_count == 1, "should send POST /command" + assert transport.resume_count == 1, "should send GET /command/:id/resume" + assert transport.last_resume_eid == 2, "resume after_eid should match last received eid" + + assert execution.id == "cmd-resume-1" + assert len(execution.logs.stdout) == 3 + assert execution.logs.stdout[0].text == "line1" + assert execution.logs.stdout[1].text == "line2" + assert execution.logs.stdout[2].text == "line3" + assert execution.complete is not None + assert execution.complete.execution_time_in_millis == 42 + assert execution.exit_code == 0 diff --git a/specs/execd-api.yaml b/specs/execd-api.yaml index 2a46a484b..317d048cf 100644 --- a/specs/execd-api.yaml +++ b/specs/execd-api.yaml @@ -502,6 +502,60 @@ paths: "500": $ref: "#/components/responses/InternalServerError" + /command/{id}/resume: + get: + summary: Resume command SSE stream (replay and optional live tail) + description: | + Replays buffered events (stdout, stderr, execution_complete, execution_error) from + the server-side ring buffer for events with `eid` strictly greater than + `after_eid`, then—if the command is still running and no other client holds the + primary SSE slot—continues streaming live events until completion or client + disconnect. Event shape matches `POST /command` (`ServerStreamEvent`). + + This endpoint is mutually exclusive with the primary `POST /command` SSE: if that + connection is still active, the server responds with 409 Conflict. + operationId: resumeCommandStream + tags: + - Command + parameters: + - name: id + in: path + required: true + description: Command ID returned by RunCommand + schema: + type: string + example: cmd-abc123 + - name: after_eid + in: query + required: false + description: | + Only events with `eid` greater than this value are replayed. All event types + (stdout, stderr, execution_complete, execution_error) carry monotonically + increasing `eid` values. Omit or use `0` to replay from the oldest buffered + events. After replay, if the command is still running and the primary SSE slot + is free, live events continue to stream. + schema: + type: integer + format: int64 + minimum: 0 + default: 0 + example: 42 + responses: + "200": + description: Stream of command execution events (replay then optional live continuation) + content: + text/event-stream: + schema: + $ref: "#/components/schemas/ServerStreamEvent" + "400": + $ref: "#/components/responses/BadRequest" + "404": + $ref: "#/components/responses/NotFound" + "409": + $ref: "#/components/responses/Conflict" + "500": + $ref: "#/components/responses/InternalServerError" + /command/{id}/logs: get: summary: Get background command stdout/stderr (non-streamed) @@ -1396,6 +1450,16 @@ components: code: FILE_NOT_FOUND message: "file not found" + Conflict: + description: Request conflicts with current server state (e.g. resource in use) + content: + application/json: + schema: + $ref: "#/components/schemas/ErrorResponse" + example: + code: INVALID_REQUEST_BODY + message: "primary SSE stream is still active; disconnect it before resuming" + InternalServerError: description: Runtime server error during operation content: diff --git a/tests/python/tests/test_command_resume_e2e.py b/tests/python/tests/test_command_resume_e2e.py new file mode 100644 index 000000000..1851560e5 --- /dev/null +++ b/tests/python/tests/test_command_resume_e2e.py @@ -0,0 +1,247 @@ +# +# Copyright 2026 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +E2E tests for SSE stream disconnect and automatic resume. + +Creates a dedicated sandbox, injects a mid-stream disconnect via a custom +httpx transport, and verifies the SDK transparently resumes via the real +execd resume endpoint. +""" + +from __future__ import annotations + +import logging +from datetime import timedelta + +import httpx +import pytest +from opensandbox.adapters.command_adapter import CommandsAdapter +from opensandbox.config import ConnectionConfig +from opensandbox.constants import DEFAULT_EXECD_PORT +from opensandbox.models.sandboxes import Host, SandboxEndpoint, SandboxImageSpec, Volume +from opensandbox.sandbox import Sandbox + +from tests.base_e2e_test import ( + TEST_API_KEY, + TEST_DOMAIN, + TEST_PROTOCOL, + create_connection_config, + get_e2e_sandbox_resource, + get_sandbox_image, + should_use_server_proxy, +) + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Custom async transport: injects disconnect mid-stream on POST /command +# --------------------------------------------------------------------------- + + +class _DisconnectInjectStream(httpx.AsyncByteStream): + """Wraps real byte stream; raises ReadError after N chunks yielded.""" + + def __init__(self, real_stream, disconnect_after_chunks: int = 4) -> None: + self._real = real_stream + self._disconnect_after = disconnect_after_chunks + self._chunk_count = 0 + self._real_iter = None + + def __aiter__(self): + self._real_iter = self._real.__aiter__() + return self + + async def __anext__(self) -> bytes: + if self._chunk_count >= self._disconnect_after: + raise httpx.ReadError("simulated disconnect for e2e test") + try: + chunk = await self._real_iter.__anext__() + except StopAsyncIteration: + raise + self._chunk_count += 1 + return chunk + + async def aclose(self) -> None: + if self._real_iter is not None and hasattr(self._real_iter, "aclose"): + await self._real_iter.aclose() + elif hasattr(self._real, "aclose"): + await self._real.aclose() + + +class _DisconnectInjectTransport(httpx.AsyncHTTPTransport): + """Wraps real transport; injects stream disconnect on POST command endpoints.""" + + def __init__(self) -> None: + self._real = httpx.AsyncHTTPTransport() + self.post_count: int = 0 + self.resume_count: int = 0 + self.last_resume_eid: int | None = None + + @staticmethod + def _is_command_post(path: str) -> bool: + """Match POST /command or POST /session/:id/run (with optional proxy prefix).""" + return path == "/command" or path.endswith("/command") or path.endswith("/run") + + async def handle_async_request(self, request: httpx.Request) -> httpx.Response: + response = await self._real.handle_async_request(request) + + if request.method == "POST" and self._is_command_post(request.url.path): + self.post_count += 1 + response.stream = _DisconnectInjectStream( + response.stream, disconnect_after_chunks=4 + ) + + elif request.method == "GET" and "/resume" in request.url.path: + self.resume_count += 1 + qp = request.url.params.get("after_eid") + if qp: + self.last_resume_eid = int(qp) + + return response + + +# --------------------------------------------------------------------------- +# Shared sandbox fixture — created once per class, killed on teardown +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +class TestCommandResumeE2E: + """E2E tests: SSE disconnect mid-stream triggers transparent resume.""" + + sandbox: Sandbox | None = None + execd_endpoint: SandboxEndpoint | None = None + + @pytest.fixture(scope="class", autouse=True) + async def _sandbox_lifecycle(self, request: pytest.FixtureRequest) -> None: + """Create a dedicated sandbox and ALWAYS clean it up.""" + logger.info("=" * 80) + logger.info("SETUP: Creating sandbox for SSE resume E2E") + logger.info("=" * 80) + + connection_config = create_connection_config() + + sandbox = await Sandbox.create( + image=SandboxImageSpec(get_sandbox_image()), + entrypoint=["/opt/opensandbox/code-interpreter.sh"], + connection_config=connection_config, + resource=get_e2e_sandbox_resource(), + timeout=timedelta(minutes=15), + ready_timeout=timedelta(seconds=60), + metadata={"tag": "e2e-command-resume"}, + env={ + "E2E_TEST": "true", + "EXECD_LOG_FILE": "/tmp/opensandbox-e2e/logs/execd-resume.log", + "EXECD_API_GRACE_SHUTDOWN": "3s", + }, + health_check_polling_interval=timedelta(milliseconds=500), + volumes=[ + Volume( + name="execd-log", + host=Host(path="/tmp/opensandbox-e2e/logs"), + mountPath="/tmp/opensandbox-e2e/logs", + readOnly=False, + ), + ], + ) + + endpoint_obj = await sandbox.get_endpoint(DEFAULT_EXECD_PORT) + assert endpoint_obj is not None + assert endpoint_obj.endpoint + + request.cls.sandbox = sandbox + request.cls.execd_endpoint = endpoint_obj + + logger.info("Sandbox ready: %s execd endpoint: %s", sandbox.id, endpoint_obj.endpoint) + + try: + yield + finally: + if sandbox is not None: + try: + await sandbox.kill() + except Exception as e: + logger.warning("Teardown: sandbox.kill() failed: %s", e, exc_info=True) + try: + await sandbox.close() + except Exception as e: + logger.warning("Teardown: sandbox.close() failed: %s", e, exc_info=True) + + # ----------------------------------------------------------------------- + # Test: standalone command with injected disconnect + # ----------------------------------------------------------------------- + + @pytest.mark.timeout(120) + async def test_run_command_auto_resume_on_disconnect(self) -> None: + """Inject disconnect mid-stream; verify resume fires and all events arrive.""" + transport = _DisconnectInjectTransport() + cfg = ConnectionConfig( + domain=TEST_DOMAIN, + api_key=TEST_API_KEY, + transport=transport, + protocol=TEST_PROTOCOL, + request_timeout=timedelta(minutes=3), + use_server_proxy=should_use_server_proxy(), + ) + + adapter = CommandsAdapter(cfg, self.execd_endpoint) + + # Long-ish command with sleeps so events flush in separate chunks + cmd = ( + "for i in 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20; " + "do echo \"line$i\"; sleep 0.2; done" + ) + + execution = await adapter.run(cmd) + + # --- assertions: resume actually happened --- + assert transport.post_count == 1, "should send exactly one POST /command" + assert transport.resume_count >= 1, ( + f"should resume at least once, got resume_count={transport.resume_count}" + ) + assert transport.last_resume_eid is not None, "resume should include after_eid" + assert transport.last_resume_eid >= 1, ( + f"after_eid should be >=1, got {transport.last_resume_eid}" + ) + + # --- assertions: all output received --- + assert len(execution.logs.stdout) == 20, ( + f"expected 20 stdout lines, got {len(execution.logs.stdout)}" + ) + for i, msg in enumerate(execution.logs.stdout): + expected = f"line{i + 1}" + actual = msg.text.strip() + assert actual == expected, f"stdout[{i}]: expected {expected!r}, got {actual!r}" + + assert execution.complete is not None, "should have completion event" + assert execution.complete.execution_time_in_millis >= 0 + assert execution.exit_code == 0 + + logger.info( + "Resume e2e: post=%d resume=%d after_eid=%d lines=%d", + transport.post_count, + transport.resume_count, + transport.last_resume_eid, + len(execution.logs.stdout), + ) + + # NOTE: run_in_session resume is intentionally NOT tested here. + # execd does not currently support resume for session-scoped commands: + # - No resume route under /session group + # - GetCommandStatus only looks in commandClientMap (sessions use bashSessionClientMap) + # - resumeEnabled is never set for RunInSession + # The SDK will attempt resume on disconnect but execd returns 404.