Skip to content

Commit 5a5043f

Browse files
authored
Merge pull request opensandbox-group#990 from Pangjiping/feat/sdk-skip-accumulation
feat(sdk): add skipAccumulation to prevent OutputMessage OOM
2 parents d980b3b + 642a5e3 commit 5a5043f

11 files changed

Lines changed: 82 additions & 14 deletions

File tree

sdks/sandbox/csharp/src/OpenSandbox/Internal/ExecutionEventDispatcher.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,10 @@ private async Task HandleStdoutAsync(ServerStreamEvent ev, long timestamp)
9595
IsError = false
9696
};
9797

98-
_execution.Logs.Stdout.Add(msg);
98+
if (_handlers is not { SkipAccumulation: true })
99+
{
100+
_execution.Logs.Stdout.Add(msg);
101+
}
99102

100103
if (_handlers?.OnStdout != null)
101104
{
@@ -112,7 +115,10 @@ private async Task HandleStderrAsync(ServerStreamEvent ev, long timestamp)
112115
IsError = true
113116
};
114117

115-
_execution.Logs.Stderr.Add(msg);
118+
if (_handlers is not { SkipAccumulation: true })
119+
{
120+
_execution.Logs.Stderr.Add(msg);
121+
}
116122

117123
if (_handlers?.OnStderr != null)
118124
{

sdks/sandbox/csharp/src/OpenSandbox/Models/Execution.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,4 +207,11 @@ public class ExecutionHandlers
207207
/// Gets or sets the handler for execution initialization.
208208
/// </summary>
209209
public Func<ExecutionInit, Task>? OnInit { get; set; }
210+
211+
/// <summary>
212+
/// When true, stdout/stderr messages are only delivered to handlers without
213+
/// being accumulated in <see cref="ExecutionLogs"/>. Use for long-running
214+
/// executions to prevent unbounded memory growth.
215+
/// </summary>
216+
public bool SkipAccumulation { get; set; }
210217
}

sdks/sandbox/go/execution.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,11 @@ type ExecutionHandlers struct {
104104
OnResult func(ExecutionResult) error
105105
OnComplete func(ExecutionComplete) error
106106
OnError func(ExecutionError) error
107+
108+
// SkipAccumulation, when true, prevents stdout/stderr messages from being
109+
// accumulated in the Execution struct. Messages are still delivered to handlers.
110+
// Use for long-running executions to prevent unbounded memory growth.
111+
SkipAccumulation bool
107112
}
108113

109114
// sseErrorPayload is the nested error object in a ServerStreamEvent.
@@ -146,7 +151,9 @@ func processStreamEvent(exec *Execution, event StreamEvent, handlers *ExecutionH
146151
if err := json.Unmarshal([]byte(data), &ev); err != nil {
147152
// Not JSON — treat as raw stdout
148153
msg := OutputMessage{Text: data}
149-
exec.Stdout = append(exec.Stdout, msg)
154+
if handlers == nil || !handlers.SkipAccumulation {
155+
exec.Stdout = append(exec.Stdout, msg)
156+
}
150157
if handlers != nil && handlers.OnStdout != nil {
151158
return handlers.OnStdout(msg)
152159
}
@@ -163,14 +170,18 @@ func processStreamEvent(exec *Execution, event StreamEvent, handlers *ExecutionH
163170

164171
case "stdout":
165172
msg := OutputMessage{Text: ev.Text, Timestamp: ev.Timestamp}
166-
exec.Stdout = append(exec.Stdout, msg)
173+
if handlers == nil || !handlers.SkipAccumulation {
174+
exec.Stdout = append(exec.Stdout, msg)
175+
}
167176
if handlers != nil && handlers.OnStdout != nil {
168177
return handlers.OnStdout(msg)
169178
}
170179

171180
case "stderr":
172181
msg := OutputMessage{Text: ev.Text, Timestamp: ev.Timestamp}
173-
exec.Stderr = append(exec.Stderr, msg)
182+
if handlers == nil || !handlers.SkipAccumulation {
183+
exec.Stderr = append(exec.Stderr, msg)
184+
}
174185
if handlers != nil && handlers.OnStderr != nil {
175186
return handlers.OnStderr(msg)
176187
}
@@ -239,7 +250,9 @@ func processStreamEvent(exec *Execution, event StreamEvent, handlers *ExecutionH
239250
// Unknown event type — treat as stdout
240251
if ev.Text != "" {
241252
msg := OutputMessage{Text: ev.Text, Timestamp: ev.Timestamp}
242-
exec.Stdout = append(exec.Stdout, msg)
253+
if handlers == nil || !handlers.SkipAccumulation {
254+
exec.Stdout = append(exec.Stdout, msg)
255+
}
243256
if handlers != nil && handlers.OnStdout != nil {
244257
return handlers.OnStdout(msg)
245258
}

sdks/sandbox/javascript/src/models/execution.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,4 +69,10 @@ export interface ExecutionHandlers {
6969
onExecutionComplete?: (c: ExecutionComplete) => void | Promise<void>;
7070
onError?: (err: ExecutionError) => void | Promise<void>;
7171
onInit?: (init: ExecutionInit) => void | Promise<void>;
72+
/**
73+
* When true, stdout/stderr messages are only delivered to handlers without
74+
* being accumulated in the execution logs. Use for long-running executions
75+
* to prevent unbounded memory growth.
76+
*/
77+
skipAccumulation?: boolean;
7278
}

sdks/sandbox/javascript/src/models/executionEventDispatcher.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,17 @@ export class ExecutionEventDispatcher {
4848
}
4949
case "stdout": {
5050
const msg: OutputMessage = { text: ev.text ?? "", timestamp: ts, isError: false };
51-
this.execution.logs.stdout.push(msg);
51+
if (!this.handlers?.skipAccumulation) {
52+
this.execution.logs.stdout.push(msg);
53+
}
5254
await this.handlers?.onStdout?.(msg);
5355
return;
5456
}
5557
case "stderr": {
5658
const msg: OutputMessage = { text: ev.text ?? "", timestamp: ts, isError: true };
57-
this.execution.logs.stderr.push(msg);
59+
if (!this.handlers?.skipAccumulation) {
60+
this.execution.logs.stderr.push(msg);
61+
}
5862
await this.handlers?.onStderr?.(msg);
5963
return;
6064
}

sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/models/execd/executions/ExecutionModels.kt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,12 @@ class ExecutionHandlers private constructor(
216216
* Called when code execution starts.
217217
*/
218218
val onInit: OutputHandler<ExecutionInit>? = null,
219+
/**
220+
* When true, stdout/stderr messages are only delivered to handlers without
221+
* being accumulated in [ExecutionLogs]. Use this for long-running executions
222+
* to prevent unbounded memory growth.
223+
*/
224+
val skipAccumulation: Boolean = false,
219225
) {
220226
companion object {
221227
@JvmStatic
@@ -229,6 +235,7 @@ class ExecutionHandlers private constructor(
229235
private var onExecutionComplete: OutputHandler<ExecutionComplete>? = null
230236
private var onError: OutputHandler<ExecutionError>? = null
231237
private var onInit: OutputHandler<ExecutionInit>? = null
238+
private var skipAccumulation: Boolean = false
232239

233240
fun onStdout(handler: OutputHandler<OutputMessage>): Builder {
234241
this.onStdout = handler
@@ -260,6 +267,11 @@ class ExecutionHandlers private constructor(
260267
return this
261268
}
262269

270+
fun skipAccumulation(skip: Boolean): Builder {
271+
this.skipAccumulation = skip
272+
return this
273+
}
274+
263275
fun build(): ExecutionHandlers {
264276
return ExecutionHandlers(
265277
onStdout = onStdout,
@@ -268,6 +280,7 @@ class ExecutionHandlers private constructor(
268280
onExecutionComplete = onExecutionComplete,
269281
onError = onError,
270282
onInit = onInit,
283+
skipAccumulation = skipAccumulation,
271284
)
272285
}
273286
}

sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/adapters/converter/ExecutionEventDispatcher.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@ class ExecutionEventDispatcher(
6262
) {
6363
val stdoutText = eventNode.text ?: ""
6464
val stdoutMessage = OutputMessage(stdoutText, timestamp, false)
65-
execution.logs.addStdout(stdoutMessage)
65+
if (handlers?.skipAccumulation != true) {
66+
execution.logs.addStdout(stdoutMessage)
67+
}
6668
handlers?.onStdout?.handle(stdoutMessage)
6769
}
6870

@@ -72,7 +74,9 @@ class ExecutionEventDispatcher(
7274
) {
7375
val stderrText = eventNode.text ?: ""
7476
val stderrMessage = OutputMessage(stderrText, timestamp, true)
75-
execution.logs.addStderr(stderrMessage)
77+
if (handlers?.skipAccumulation != true) {
78+
execution.logs.addStderr(stderrMessage)
79+
}
7680
handlers?.onStderr?.handle(stderrMessage)
7781
}
7882

sdks/sandbox/python/src/opensandbox/adapters/converter/execution_event_dispatcher.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ async def _handle_stdout(self, event_node: EventNode, timestamp: int) -> None:
8080
timestamp=timestamp,
8181
is_error=False,
8282
)
83-
self.execution.logs.add_stdout(message)
83+
if not (self.handlers and self.handlers.skip_accumulation):
84+
self.execution.logs.add_stdout(message)
8485
if self.handlers and self.handlers.on_stdout:
8586
await self.handlers.on_stdout(message)
8687

@@ -91,7 +92,8 @@ async def _handle_stderr(self, event_node: EventNode, timestamp: int) -> None:
9192
timestamp=timestamp,
9293
is_error=True,
9394
)
94-
self.execution.logs.add_stderr(message)
95+
if not (self.handlers and self.handlers.skip_accumulation):
96+
self.execution.logs.add_stderr(message)
9597
if self.handlers and self.handlers.on_stderr:
9698
await self.handlers.on_stderr(message)
9799

sdks/sandbox/python/src/opensandbox/models/execd.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,12 @@ async def handle_stdout(msg: OutputMessage):
260260
on_init: AsyncOutputHandler | None = Field(
261261
default=None, description="Async handler for execution init"
262262
)
263+
skip_accumulation: bool = Field(
264+
default=False,
265+
description="When True, stdout/stderr messages are only delivered to handlers "
266+
"without being accumulated in ExecutionLogs. Use for long-running "
267+
"executions to prevent unbounded memory growth.",
268+
)
263269

264270
model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True)
265271

sdks/sandbox/python/src/opensandbox/models/execd_sync.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,5 +39,10 @@ class ExecutionHandlersSync(BaseModel):
3939
on_execution_complete: SyncOutputHandler | None = Field(default=None, alias="on_execution_complete")
4040
on_error: SyncOutputHandler | None = Field(default=None)
4141
on_init: SyncOutputHandler | None = Field(default=None)
42+
skip_accumulation: bool = Field(
43+
default=False,
44+
description="When True, stdout/stderr messages are only delivered to handlers "
45+
"without being accumulated in ExecutionLogs.",
46+
)
4247

4348
model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True)

0 commit comments

Comments
 (0)