Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
3925eb1
fix(execd): fix mismatched image tag
Pangjiping Mar 25, 2026
6893583
feat(execd): span event-id(eid) by sse statement
Pangjiping Mar 25, 2026
97b6d8c
feat(execd): add command/session resume api
Pangjiping Mar 25, 2026
4098580
feat(execd): extract stream buffer (sbuf) package for ring buffer queue
Pangjiping Mar 25, 2026
441e3b5
feat(execd): extract stream buffer (sbuf) package for ring buffer queue
Pangjiping Mar 27, 2026
49e3494
fix(execd): flush resume buffer tail after SSE attach
Pangjiping Mar 28, 2026
7ac7afd
fix(execd): serialize streamHub holder access under h.mu
Pangjiping Mar 28, 2026
e3c6a7d
Merge remote-tracking branch 'origin/main' into feat/sse-resumption
Pangjiping Mar 28, 2026
a13bf02
chore(execd): update README for sse stream resume
Pangjiping Mar 30, 2026
3005a68
Merge remote-tracking branch 'origin/main' into feat/sse-resumption
Pangjiping Mar 30, 2026
44b7b64
Merge remote-tracking branch 'origin/main' into feat/sse-resumption
Pangjiping May 2, 2026
788c3d3
fix(execd): resolve merge conflicts and fix test compilation
Pangjiping May 2, 2026
f604769
fix(execd): review fixes for SSE stream resume
Pangjiping May 2, 2026
2473b33
docs(execd): update SSE resume spec to reflect buffered terminal events
Pangjiping May 2, 2026
dd67b75
feat(sdk): auto-resume SSE stream on connection drop
Pangjiping May 2, 2026
c9be8ef
test(sdk): add unit tests for automatic SSE resume on disconnect
Pangjiping May 2, 2026
7cf90b1
chore(sdk): fix ruff B904 — raise ... from None in test error streams
Pangjiping May 3, 2026
4ae1344
test(e2e): add SSE resume e2e test with disconnect injection
Pangjiping May 3, 2026
ce06aa5
fix(e2e): fix __anext__ call in DisconnectInjectStream
Pangjiping May 3, 2026
171a6e2
fix(e2e): get real async iterator via __aiter__ in DisconnectInjectSt…
Pangjiping May 3, 2026
f9b1b10
test(e2e): remove session resume test — execd does not support it yet
Pangjiping May 3, 2026
b7b5dff
feat(sdk): add SSE stream auto-resume across all sandbox SDKs
Pangjiping May 3, 2026
ac2382e
fix(sdk): break long line in Kotlin CommandsAdapterTest to satisfy kt…
Pangjiping May 3, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/execd-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ jobs:

sleep 5
python3 tests/smoke_api.py
python3 tests/command_resume_test.py

- name: SIGTERM forward test
if: matrix.os == 'ubuntu-latest'
working-directory: components/execd
Expand Down
32 changes: 16 additions & 16 deletions components/execd/RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ Thanks to these contributors ❤️
- @csdbianhua

---
- Docker Hub: opensandbox/execd:v1.0.9
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9
- Docker Hub: opensandbox/execd:v1.0.8
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.8

# components/execd 1.0.7

Expand Down Expand Up @@ -144,8 +144,8 @@ Thanks to these contributors ❤️
- @dependabot

---
- Docker Hub: opensandbox/execd:v1.0.9
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9
- Docker Hub: opensandbox/execd:v1.0.7
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.7

# components/execd 1.0.6

Expand All @@ -167,8 +167,8 @@ Thanks to these contributors ❤️
- @dependabot

---
- Docker Hub: opensandbox/execd:v1.0.9
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9
- Docker Hub: opensandbox/execd:v1.0.6
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.6

# components/execd 1.0.5

Expand All @@ -185,8 +185,8 @@ Thanks to these contributors ❤️
- @Pangjiping

---
- Docker Hub: opensandbox/execd:v1.0.9
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9
- Docker Hub: opensandbox/execd:v1.0.5
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.5

# components/execd 1.0.4

Expand All @@ -209,8 +209,8 @@ Thanks to these contributors ❤️
- @ninan-nn

---
- Docker Hub: opensandbox/execd:v1.0.9
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9
- Docker Hub: opensandbox/execd:v1.0.4
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.4

# components/execd 1.0.3

Expand All @@ -233,8 +233,8 @@ Thanks to these contributors ❤️
- @jwx0925

---
- Docker Hub: opensandbox/execd:v1.0.9
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9
- Docker Hub: opensandbox/execd:v1.0.3
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.3

# components/execd 1.0.2

Expand Down Expand Up @@ -262,8 +262,8 @@ Thanks to these contributors ❤️
- @ninan-nn

---
- Docker Hub: opensandbox/execd:v1.0.9
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9
- Docker Hub: opensandbox/execd:v1.0.2
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.2

# components/execd 1.0.1

Expand Down Expand Up @@ -295,8 +295,8 @@ Thanks to these contributors ❤️
- @jwx0925

---
- Docker Hub: opensandbox/execd:v1.0.9
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9
- Docker Hub: opensandbox/execd:v1.0.1
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.1

# components/execd 1.0.0

Expand Down
2 changes: 1 addition & 1 deletion components/execd/pkg/runtime/bash_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (s *bashSession) run(ctx context.Context, request *ExecuteCodeRequest) erro
continue
}
if request.Hooks.OnExecuteStdout != nil {
request.Hooks.OnExecuteStdout(line)
request.Hooks.OnExecuteStdout(request.NextEventID(), line)
}
}
}
Expand Down
20 changes: 10 additions & 10 deletions components/execd/pkg/runtime/bash_session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestBashSession_NonZeroExitEmitsError(t *testing.T) {
Timeout: 5 * time.Second,
Hooks: ExecuteResultHook{
OnExecuteInit: func(s string) { sessionID = s },
OnExecuteStdout: func(s string) { stdoutLine = s },
OnExecuteStdout: func(_ int64, s string) { stdoutLine = s },
OnExecuteError: func(err *execute.ErrorOutput) { errCh <- err },
OnExecuteComplete: func(_ time.Duration) {
completeCh <- struct{}{}
Expand Down Expand Up @@ -105,7 +105,7 @@ func TestBashSession_envAndExitCode(t *testing.T) {
require.Equal(t, session.config.Session, ctx, "unexpected session in OnExecuteInit")
initCalls++
},
OnExecuteStdout: func(text string) {
OnExecuteStdout: func(_ int64, text string) {
t.Log(text)
stdoutLines = append(stdoutLines, text)
},
Expand Down Expand Up @@ -178,7 +178,7 @@ func TestBashSession_envLargeOutputChained(t *testing.T) {
require.Equal(t, session.config.Session, ctx, "unexpected session in OnExecuteInit")
initCalls++
},
OnExecuteStdout: func(text string) {
OnExecuteStdout: func(_ int64, text string) {
t.Log(text)
stdoutLines = append(stdoutLines, text)
},
Expand Down Expand Up @@ -223,7 +223,7 @@ func TestBashSession_cwdPersistsWithoutOverride(t *testing.T) {
targetDir := t.TempDir()
var stdoutLines []string
hooks := ExecuteResultHook{
OnExecuteStdout: func(line string) {
OnExecuteStdout: func(_ int64, line string) {
stdoutLines = append(stdoutLines, line)
},
}
Expand Down Expand Up @@ -265,7 +265,7 @@ func TestBashSession_requestCwdOverridesAfterCd(t *testing.T) {

var stdoutLines []string
hooks := ExecuteResultHook{
OnExecuteStdout: func(line string) {
OnExecuteStdout: func(_ int64, line string) {
stdoutLines = append(stdoutLines, line)
},
}
Expand Down Expand Up @@ -307,7 +307,7 @@ func TestBashSession_envDumpNotLeakedWhenNoTrailingNewline(t *testing.T) {

var stdoutLines []string
hooks := ExecuteResultHook{
OnExecuteStdout: func(line string) {
OnExecuteStdout: func(_ int64, line string) {
stdoutLines = append(stdoutLines, line)
},
}
Expand Down Expand Up @@ -335,7 +335,7 @@ func TestBashSession_envDumpNotLeakedWhenNoOutput(t *testing.T) {

var stdoutLines []string
hooks := ExecuteResultHook{
OnExecuteStdout: func(line string) {
OnExecuteStdout: func(_ int64, line string) {
stdoutLines = append(stdoutLines, line)
},
}
Expand Down Expand Up @@ -366,7 +366,7 @@ func TestBashSession_heredoc(t *testing.T) {
t.Cleanup(func() { _ = controller.DeleteBashSession(sessionID) })

hooks := ExecuteResultHook{
OnExecuteStdout: func(line string) {
OnExecuteStdout: func(_ int64, line string) {
fmt.Printf("[stdout] %s\n", line)
},
OnExecuteComplete: func(d time.Duration) {
Expand Down Expand Up @@ -419,7 +419,7 @@ func TestBashSession_execReplacesShell(t *testing.T) {

var stdoutLines []string
hooks := ExecuteResultHook{
OnExecuteStdout: func(line string) {
OnExecuteStdout: func(_ int64, line string) {
stdoutLines = append(stdoutLines, line)
},
}
Expand Down Expand Up @@ -459,7 +459,7 @@ func TestBashSession_complexExec(t *testing.T) {

var stdoutLines []string
hooks := ExecuteResultHook{
OnExecuteStdout: func(line string) {
OnExecuteStdout: func(_ int64, line string) {
stdoutLines = append(stdoutLines, line)
},
}
Expand Down
4 changes: 2 additions & 2 deletions components/execd/pkg/runtime/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,11 @@ func (c *Controller) runCommand(ctx context.Context, request *ExecuteCodeRequest
wg.Add(2)
safego.Go(func() {
defer wg.Done()
c.tailStdPipe(stdoutPath, request.Hooks.OnExecuteStdout, done)
c.tailStdPipe(stdoutPath, request.wrapStdoutPipeHook(), done)
})
safego.Go(func() {
defer wg.Done()
c.tailStdPipe(stderrPath, request.Hooks.OnExecuteStderr, done)
c.tailStdPipe(stderrPath, request.wrapStderrPipeHook(), done)
})

err = cmd.Start()
Expand Down
16 changes: 8 additions & 8 deletions components/execd/pkg/runtime/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ func TestRunCommand_Echo(t *testing.T) {
Timeout: 5 * time.Second,
Hooks: ExecuteResultHook{
OnExecuteInit: func(s string) { sessionID = s },
OnExecuteStdout: func(s string) {
OnExecuteStdout: func(_ int64, s string) {
stdoutLines = append(stdoutLines, s)
},
OnExecuteStderr: func(s string) {
OnExecuteStderr: func(_ int64, s string) {
stderrLines = append(stderrLines, s)
},
OnExecuteError: func(err *execute.ErrorOutput) {
Expand Down Expand Up @@ -188,8 +188,8 @@ func TestRunCommand_Error(t *testing.T) {
Timeout: 5 * time.Second,
Hooks: ExecuteResultHook{
OnExecuteInit: func(s string) { sessionID = s },
OnExecuteStdout: func(s string) { stdoutLines = append(stdoutLines, s) },
OnExecuteStderr: func(s string) { stderrLines = append(stderrLines, s) },
OnExecuteStdout: func(_ int64, s string) { stdoutLines = append(stdoutLines, s) },
OnExecuteStderr: func(_ int64, s string) { stderrLines = append(stderrLines, s) },
OnExecuteError: func(err *execute.ErrorOutput) {
gotErr = err
completeCh <- struct{}{}
Expand Down Expand Up @@ -241,8 +241,8 @@ func TestRunCommand_ExpandsHomeInCwd(t *testing.T) {
Timeout: 5 * time.Second,
Hooks: ExecuteResultHook{
OnExecuteInit: func(_ string) {},
OnExecuteStdout: func(s string) { stdoutLines = append(stdoutLines, s) },
OnExecuteStderr: func(_ string) {},
OnExecuteStdout: func(_ int64, s string) { stdoutLines = append(stdoutLines, s) },
OnExecuteStderr: func(_ int64, _ string) {},
OnExecuteError: func(err *execute.ErrorOutput) {
require.Failf(t, "unexpected error hook", "%+v", err)
},
Expand Down Expand Up @@ -303,8 +303,8 @@ func TestRunCommand_ExpandsCwdFromRequestEnvWithHigherPriority(t *testing.T) {
},
Hooks: ExecuteResultHook{
OnExecuteInit: func(_ string) {},
OnExecuteStdout: func(s string) { stdoutLines = append(stdoutLines, s) },
OnExecuteStderr: func(_ string) {},
OnExecuteStdout: func(_ int64, s string) { stdoutLines = append(stdoutLines, s) },
OnExecuteStderr: func(_ int64, _ string) {},
OnExecuteError: func(err *execute.ErrorOutput) {
gotErr = err
},
Expand Down
4 changes: 2 additions & 2 deletions components/execd/pkg/runtime/command_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ func (c *Controller) runCommand(ctx context.Context, request *ExecuteCodeRequest

done := make(chan struct{}, 1)
safego.Go(func() {
c.tailStdPipe(c.stdoutFileName(session), request.Hooks.OnExecuteStdout, done)
c.tailStdPipe(c.stdoutFileName(session), request.wrapStdoutPipeHook(), done)
})
safego.Go(func() {
c.tailStdPipe(c.stderrFileName(session), request.Hooks.OnExecuteStderr, done)
c.tailStdPipe(c.stderrFileName(session), request.wrapStderrPipeHook(), done)
})

err = cmd.Start()
Expand Down
11 changes: 9 additions & 2 deletions components/execd/pkg/runtime/jupyter.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,16 @@ func dispatchExecutionResultHooks(request *ExecuteCodeRequest, result *execute.E
for _, stream := range result.Stream {
switch stream.Name {
case execute.StreamStdout:
request.Hooks.OnExecuteStdout(stream.Text)
if stream.Text != "" && request.Hooks.OnExecuteStdout != nil {
eid := request.NextEventID()
request.Hooks.OnExecuteStdout(eid, stream.Text)
}
case execute.StreamStderr:
request.Hooks.OnExecuteStderr(stream.Text)
if stream.Text != "" && request.Hooks.OnExecuteStderr != nil {
eid := request.NextEventID()
request.Hooks.OnExecuteStderr(eid, stream.Text)
}
default:
}
}
}
Expand Down
38 changes: 34 additions & 4 deletions components/execd/pkg/runtime/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package runtime
import (
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/alibaba/opensandbox/execd/pkg/jupyter/execute"
Expand All @@ -27,8 +28,8 @@ type ExecuteResultHook struct {
OnExecuteInit func(context string)
OnExecuteResult func(result map[string]any, count int)
OnExecuteStatus func(status string)
OnExecuteStdout func(stdout string) //nolint:predeclared
OnExecuteStderr func(stderr string) //nolint:predeclared
OnExecuteStdout func(eid int64, stdout string) //nolint:predeclared
OnExecuteStderr func(eid int64, stderr string) //nolint:predeclared
OnExecuteError func(err *execute.ErrorOutput)
OnExecuteComplete func(executionTime time.Duration)
}
Expand All @@ -44,6 +45,35 @@ type ExecuteCodeRequest struct {
Uid *uint32 `json:"uid,omitempty"`
Gid *uint32 `json:"gid,omitempty"`
Hooks ExecuteResultHook

eventSeq atomic.Uint64
}

func (req *ExecuteCodeRequest) NextEventID() int64 {
if req == nil {
return 0
}
return int64(req.eventSeq.Add(1))
}

func (req *ExecuteCodeRequest) wrapStdoutPipeHook() func(string) {
return func(text string) {
if text == "" || req.Hooks.OnExecuteStdout == nil {
return
}
eid := req.NextEventID()
req.Hooks.OnExecuteStdout(eid, text)
}
}

func (req *ExecuteCodeRequest) wrapStderrPipeHook() func(string) {
return func(text string) {
if text == "" || req.Hooks.OnExecuteStderr == nil {
return
}
eid := req.NextEventID()
req.Hooks.OnExecuteStderr(eid, text)
}
}

// SetDefaultHooks installs stdout logging fallbacks for unset hooks.
Expand All @@ -55,10 +85,10 @@ func (req *ExecuteCodeRequest) SetDefaultHooks() {
req.Hooks.OnExecuteStatus = func(status string) { fmt.Printf("OnExecuteStatus: %s\n", status) }
}
if req.Hooks.OnExecuteStdout == nil {
req.Hooks.OnExecuteStdout = func(stdout string) { fmt.Printf("OnExecuteStdout: %s\n", stdout) }
req.Hooks.OnExecuteStdout = func(eid int64, stdout string) { fmt.Printf("OnExecuteStdout: eid=%d %s\n", eid, stdout) }
}
if req.Hooks.OnExecuteStderr == nil {
req.Hooks.OnExecuteStderr = func(stderr string) { fmt.Printf("OnExecuteStderr: %s\n", stderr) }
req.Hooks.OnExecuteStderr = func(eid int64, stderr string) { fmt.Printf("OnExecuteStderr: eid=%d %s\n", eid, stderr) }
}
if req.Hooks.OnExecuteError == nil {
req.Hooks.OnExecuteError = func(err *execute.ErrorOutput) { fmt.Printf("OnExecuteError: %++v\n", err) }
Expand Down
37 changes: 37 additions & 0 deletions components/execd/pkg/sbuf/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2026 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sbuf

// Config controls per-stream bounds and append policy.
type Config struct {
// MaxEvents is the maximum number of events retained per stream. Oldest events are dropped when exceeded.
// Zero defaults to DefaultMaxEvents.
MaxEvents int
// MaxBytes is the approximate upper bound on total payload bytes per stream (sum of len(Payload)).
// Oldest events are dropped until under the limit. Zero means no byte limit.
MaxBytes int64
// StrictMonotonic rejects Append when eid <= last eid for that stream. Recommended for execd SSE eids.
StrictMonotonic bool
}

const DefaultMaxEvents = 1024

func (c *Config) normalized() Config {
out := *c
if out.MaxEvents <= 0 {
out.MaxEvents = DefaultMaxEvents
}
return out
}
Loading
Loading