Skip to content

Commit e6c4af8

Browse files
authored
Merge pull request #12 from onyx-dot-app/docker-exec-stream
feat: stream code execution
2 parents 26c3cb6 + 6fe88e7 commit e6c4af8

6 files changed

Lines changed: 494 additions & 7 deletions

File tree

code-interpreter/app/api/routes.py

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,25 @@
11
from __future__ import annotations
22

3+
from collections.abc import Iterator
34
from pathlib import Path
45

56
from fastapi import APIRouter, File, HTTPException, UploadFile, status
6-
from fastapi.responses import Response
7+
from fastapi.responses import Response, StreamingResponse
78

89
from app.app_configs import get_settings
910
from app.models.schemas import (
1011
ExecuteRequest,
1112
ExecuteResponse,
1213
FileMetadataResponse,
1314
ListFilesResponse,
15+
StreamErrorEvent,
16+
StreamOutputEvent,
17+
StreamResultEvent,
1418
UploadFileResponse,
1519
WorkspaceFile,
1620
)
17-
from app.services.executor_base import EntryKind, WorkspaceEntry
18-
from app.services.executor_factory import execute_python
21+
from app.services.executor_base import EntryKind, StreamChunk, StreamResult, WorkspaceEntry
22+
from app.services.executor_factory import execute_python, execute_python_streaming
1923
from app.services.file_storage import FileStorageService
2024

2125
router = APIRouter()
@@ -118,6 +122,51 @@ def execute(req: ExecuteRequest) -> ExecuteResponse:
118122
)
119123

120124

125+
@router.post("/execute/stream")
126+
def execute_stream(req: ExecuteRequest) -> StreamingResponse:
127+
"""Execute Python code with streaming output via Server-Sent Events."""
128+
_validate_timeout(req)
129+
settings = get_settings()
130+
storage = get_file_storage()
131+
staged_files, input_files_map = _stage_request_files(req, storage)
132+
133+
def generate() -> Iterator[str]:
134+
try:
135+
for event in execute_python_streaming(
136+
code=req.code,
137+
stdin=req.stdin,
138+
timeout_ms=req.timeout_ms,
139+
max_output_bytes=settings.max_output_bytes,
140+
cpu_time_limit_sec=settings.cpu_time_limit_sec,
141+
memory_limit_mb=settings.memory_limit_mb,
142+
files=staged_files,
143+
last_line_interactive=req.last_line_interactive,
144+
):
145+
if isinstance(event, StreamChunk):
146+
yield StreamOutputEvent(stream=event.stream, data=event.data).to_sse()
147+
148+
elif isinstance(event, StreamResult):
149+
yield StreamResultEvent(
150+
exit_code=event.exit_code,
151+
timed_out=event.timed_out,
152+
duration_ms=event.duration_ms,
153+
files=_save_workspace_files(event.files, input_files_map, storage),
154+
).to_sse()
155+
156+
except Exception as exc:
157+
yield StreamErrorEvent(message=str(exc)).to_sse()
158+
159+
return StreamingResponse(
160+
generate(),
161+
media_type="text/event-stream",
162+
headers={
163+
"Cache-Control": "no-cache",
164+
"Connection": "keep-alive",
165+
"X-Accel-Buffering": "no",
166+
},
167+
)
168+
169+
121170
@router.post("/files", response_model=UploadFileResponse, status_code=status.HTTP_201_CREATED)
122171
async def upload_file(file: UploadFile = File(...)) -> UploadFileResponse: # noqa: B008
123172
"""Upload a file for later use in code execution."""

code-interpreter/app/models/schemas.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from __future__ import annotations
22

3+
from typing import ClassVar, Literal
4+
35
from pydantic import BaseModel, Field, StrictInt, StrictStr
46

57
from app.services.executor_base import EntryKind
@@ -50,6 +52,51 @@ class ExecuteResponse(BaseModel):
5052
)
5153

5254

55+
class SSEModel(BaseModel):
56+
"""Base for Server-Sent Event payloads.
57+
58+
Subclasses declare ``sse_event`` as a ClassVar to set the SSE event type.
59+
Call ``to_sse()`` to get a fully-formatted SSE frame.
60+
"""
61+
62+
sse_event: ClassVar[str]
63+
64+
def to_sse(self) -> str:
65+
data = self.model_dump_json()
66+
return f"event: {self.sse_event}\ndata: {data}\n\n"
67+
68+
69+
class StreamOutputEvent(SSEModel):
70+
"""Payload for 'output' SSE events."""
71+
72+
sse_event: ClassVar[str] = "output"
73+
74+
stream: Literal["stdout", "stderr"]
75+
data: StrictStr
76+
77+
78+
class StreamResultEvent(SSEModel):
79+
"""Payload for the final 'result' SSE event."""
80+
81+
sse_event: ClassVar[str] = "result"
82+
83+
exit_code: int | None
84+
timed_out: bool
85+
duration_ms: StrictInt
86+
files: list[WorkspaceFile] = Field(
87+
default_factory=list,
88+
description="Snapshot of the execution workspace after completion.",
89+
)
90+
91+
92+
class StreamErrorEvent(SSEModel):
93+
"""Payload for 'error' SSE events."""
94+
95+
sse_event: ClassVar[str] = "error"
96+
97+
message: StrictStr
98+
99+
53100
class UploadFileResponse(BaseModel):
54101
file_id: StrictStr = Field(..., description="Unique identifier for the uploaded file.")
55102
filename: StrictStr = Field(..., description="Original filename as provided during upload.")

code-interpreter/app/services/executor_base.py

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
from __future__ import annotations
22

33
from abc import ABC, abstractmethod
4-
from collections.abc import Sequence
4+
from collections.abc import Generator, Sequence
55
from dataclasses import dataclass
66
from enum import StrEnum
7-
from typing import Protocol
7+
from typing import Literal, Protocol
88

99

1010
def wrap_last_line_interactive(code: str) -> str:
@@ -77,6 +77,27 @@ class WorkspaceEntry:
7777
content: bytes | None = None
7878

7979

80+
@dataclass(frozen=True, slots=True)
81+
class StreamChunk:
82+
"""A chunk of output from the execution."""
83+
84+
stream: Literal["stdout", "stderr"]
85+
data: str
86+
87+
88+
@dataclass(frozen=True, slots=True)
89+
class StreamResult:
90+
"""Final execution result emitted at end of stream."""
91+
92+
exit_code: int | None
93+
timed_out: bool
94+
duration_ms: int
95+
files: tuple[WorkspaceEntry, ...]
96+
97+
98+
StreamEvent = StreamChunk | StreamResult
99+
100+
80101
class ExecutorProtocol(Protocol):
81102
def execute_python(
82103
self,
@@ -113,6 +134,25 @@ def execute_python(
113134
if it's a bare expression (only the last line is affected).
114135
"""
115136

137+
def execute_python_streaming(
138+
self,
139+
*,
140+
code: str,
141+
stdin: str | None,
142+
timeout_ms: int,
143+
max_output_bytes: int,
144+
cpu_time_limit_sec: int | None = None,
145+
memory_limit_mb: int | None = None,
146+
files: Sequence[tuple[str, bytes]] | None = None,
147+
last_line_interactive: bool = True,
148+
) -> Generator[StreamEvent, None, None]:
149+
"""Execute Python code and yield output chunks as they arrive.
150+
151+
Yields StreamChunk events during execution, then a single StreamResult
152+
at the end. Default implementation raises NotImplementedError.
153+
"""
154+
raise NotImplementedError(f"{type(self).__name__} does not support streaming execution")
155+
116156
@staticmethod
117157
def truncate_output(stream: bytes, max_bytes: int) -> str:
118158
if len(stream) <= max_bytes:

code-interpreter/app/services/executor_docker.py

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
import codecs
12
import io
23
import logging
4+
import os
5+
import selectors
36
import shlex
47
import subprocess
58
import tarfile
@@ -10,6 +13,7 @@
1013
from dataclasses import dataclass
1114
from pathlib import Path
1215
from shutil import which
16+
from typing import Literal
1317

1418
from app.app_configs import (
1519
PYTHON_EXECUTOR_DOCKER_BIN,
@@ -20,6 +24,9 @@
2024
BaseExecutor,
2125
EntryKind,
2226
ExecutionResult,
27+
StreamChunk,
28+
StreamEvent,
29+
StreamResult,
2330
WorkspaceEntry,
2431
wrap_last_line_interactive,
2532
)
@@ -406,3 +413,159 @@ def execute_python(
406413
duration_ms=duration_ms,
407414
files=workspace_snapshot,
408415
)
416+
417+
def _terminate_process(self, ctx: _ExecContext, timed_out: bool) -> None:
418+
"""Kill the process on timeout or wait for normal exit."""
419+
if timed_out:
420+
subprocess.run(
421+
[
422+
self.docker_binary,
423+
"exec",
424+
ctx.container_name,
425+
"pkill",
426+
"-9",
427+
"python",
428+
],
429+
capture_output=True,
430+
)
431+
ctx.proc.kill()
432+
ctx.proc.wait()
433+
434+
def execute_python_streaming(
435+
self,
436+
*,
437+
code: str,
438+
stdin: str | None,
439+
timeout_ms: int,
440+
max_output_bytes: int,
441+
cpu_time_limit_sec: int | None = None,
442+
memory_limit_mb: int | None = None,
443+
files: Sequence[tuple[str, bytes]] | None = None,
444+
last_line_interactive: bool = True,
445+
) -> Generator[StreamEvent, None, None]:
446+
"""Execute Python code and yield output chunks as they arrive via SSE.
447+
448+
Yields StreamChunk events during execution, then a single StreamResult
449+
at the end containing exit_code, timing, and workspace files.
450+
"""
451+
with self._run_in_container(
452+
code=code,
453+
cpu_time_limit_sec=cpu_time_limit_sec,
454+
memory_limit_mb=memory_limit_mb,
455+
timeout_ms=timeout_ms,
456+
files=files,
457+
last_line_interactive=last_line_interactive,
458+
) as ctx:
459+
_write_stdin(ctx.proc, stdin)
460+
461+
deadline = time.monotonic() + (timeout_ms / 1000.0)
462+
timed_out = yield from _stream_process_output(ctx.proc, deadline, max_output_bytes)
463+
464+
self._terminate_process(ctx, timed_out)
465+
workspace_snapshot = self._extract_workspace_snapshot(ctx.container_name)
466+
467+
duration_ms = int((time.perf_counter() - ctx.start) * 1000)
468+
exit_code = None if timed_out else ctx.proc.returncode
469+
470+
yield StreamResult(
471+
exit_code=exit_code,
472+
timed_out=timed_out,
473+
duration_ms=duration_ms,
474+
files=workspace_snapshot,
475+
)
476+
477+
478+
def _write_stdin(proc: subprocess.Popen[bytes], stdin: str | None) -> None:
479+
"""Write optional stdin data and close the pipe."""
480+
if proc.stdin is None:
481+
raise RuntimeError("Failed to open subprocess stdin pipe")
482+
if stdin is not None:
483+
proc.stdin.write(stdin.encode("utf-8"))
484+
proc.stdin.close()
485+
486+
487+
class _StreamTracker:
488+
"""Per-stream state for incremental decoding with truncation."""
489+
490+
__slots__ = ("stream", "decoder", "bytes_sent", "max_bytes")
491+
492+
def __init__(self, stream: Literal["stdout", "stderr"], max_bytes: int) -> None:
493+
self.stream = stream
494+
self.decoder = codecs.getincrementaldecoder("utf-8")("replace")
495+
self.bytes_sent = 0
496+
self.max_bytes = max_bytes
497+
498+
def decode_chunk(self, data: bytes) -> StreamChunk | None:
499+
"""Decode a raw chunk and return a ``StreamChunk`` if within limits."""
500+
chunk: StreamChunk | None = None
501+
if self.bytes_sent < self.max_bytes:
502+
allowed = self.max_bytes - self.bytes_sent
503+
text = self.decoder.decode(data[:allowed], False)
504+
if text:
505+
chunk = StreamChunk(stream=self.stream, data=text)
506+
self.bytes_sent += len(data)
507+
return chunk
508+
509+
def flush(self) -> StreamChunk | None:
510+
"""Flush the decoder and return a final chunk if any bytes remain."""
511+
text = self.decoder.decode(b"", True)
512+
if text:
513+
return StreamChunk(stream=self.stream, data=text)
514+
return None
515+
516+
517+
def _stream_process_output(
518+
proc: subprocess.Popen[bytes],
519+
deadline: float,
520+
max_output_bytes: int,
521+
) -> Generator[StreamChunk, None, bool]:
522+
"""Read stdout/stderr incrementally and yield ``StreamChunk`` events.
523+
524+
Returns ``True`` if the process timed out, ``False`` otherwise.
525+
"""
526+
if proc.stdout is None or proc.stderr is None:
527+
raise RuntimeError("Failed to open subprocess output pipes")
528+
529+
sel = selectors.DefaultSelector()
530+
sel.register(proc.stdout, selectors.EVENT_READ, "stdout")
531+
sel.register(proc.stderr, selectors.EVENT_READ, "stderr")
532+
533+
trackers: dict[str, _StreamTracker] = {
534+
"stdout": _StreamTracker("stdout", max_output_bytes),
535+
"stderr": _StreamTracker("stderr", max_output_bytes),
536+
}
537+
fds: dict[str, int] = {
538+
"stdout": proc.stdout.fileno(),
539+
"stderr": proc.stderr.fileno(),
540+
}
541+
timed_out = False
542+
chunk_size = 4096
543+
544+
try:
545+
while sel.get_map():
546+
remaining = deadline - time.monotonic()
547+
if remaining <= 0:
548+
timed_out = True
549+
break
550+
551+
events = sel.select(timeout=min(remaining, 5.0))
552+
553+
for key, _ in events:
554+
stream_name: str = key.data
555+
data = os.read(fds[stream_name], chunk_size)
556+
if not data:
557+
sel.unregister(key.fileobj)
558+
continue
559+
560+
chunk = trackers[stream_name].decode_chunk(data)
561+
if chunk is not None:
562+
yield chunk
563+
finally:
564+
sel.close()
565+
566+
for tracker in trackers.values():
567+
chunk = tracker.flush()
568+
if chunk is not None:
569+
yield chunk
570+
571+
return timed_out

0 commit comments

Comments
 (0)