-
Notifications
You must be signed in to change notification settings - Fork 19
Expand file tree
/
Copy pathexec.py
More file actions
187 lines (163 loc) · 6.42 KB
/
exec.py
File metadata and controls
187 lines (163 loc) · 6.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
"""Code execution API endpoint compatible with LibreChat API.
This is a thin endpoint that delegates to ExecutionOrchestrator for
the actual execution workflow logic.
Uses a streaming response with keepalive whitespace to prevent client
socket timeouts (Node.js 20 defaults to 5s) during long-running
executions like large file operations or cold sandbox starts.
"""
import asyncio
import structlog
from fastapi import APIRouter, Request
from fastapi.responses import StreamingResponse
from ..models import ExecRequest, ExecResponse
from ..models.errors import (
CodeInterpreterException,
ErrorResponse,
ErrorType,
ValidationError,
ServiceUnavailableError,
)
from ..services.orchestrator import ExecutionOrchestrator
from ..dependencies.services import (
SessionServiceDep,
FileServiceDep,
ExecutionServiceDep,
StateServiceDep,
StateArchivalServiceDep,
)
from ..utils.id_generator import generate_request_id
logger = structlog.get_logger(__name__)
router = APIRouter()
# Keepalive interval: send a space every 3 seconds to prevent
# Node.js 20's default 5-second socket timeout from firing.
_KEEPALIVE_INTERVAL = 3
@router.post(
"/exec",
responses={200: {"model": ExecResponse}},
response_model_exclude_none=True,
)
async def execute_code(
request: ExecRequest,
http_request: Request,
session_service: SessionServiceDep,
file_service: FileServiceDep,
execution_service: ExecutionServiceDep,
state_service: StateServiceDep,
state_archival_service: StateArchivalServiceDep,
):
"""Execute code with specified language and parameters.
This endpoint is compatible with LibreChat's Code Interpreter API.
It supports 12 programming languages: py, js, ts, go, java, c, cpp, php, rs, r, f90, d
Python sessions are stateful when a session is reused. Variables and
functions defined in one execution are available in subsequent executions
within the same session, whether the caller supplies `session_id` directly
or the orchestrator reuses a session through same-user file references or
`entity_id` continuity. State is stored in Redis (2 hour TTL) with
automatic archival to S3 for long-term storage (configurable TTL).
Returns a streaming response that sends keepalive whitespace before the
JSON body to prevent client socket timeouts during long operations.
JSON parsers ignore leading whitespace, so this is fully compatible.
"""
request_id = generate_request_id()[:8]
# Get API key info from request state (set by SecurityMiddleware)
api_key_hash = getattr(http_request.state, "api_key_hash", None)
is_env_key = getattr(http_request.state, "is_env_key", False)
logger.info(
"Code execution request",
request_id=request_id,
language=request.lang,
code_length=len(request.code),
entity_id=request.entity_id,
user_id=request.user_id,
api_key_hash=api_key_hash[:8] if api_key_hash else "unknown",
)
# Create orchestrator with injected services
orchestrator = ExecutionOrchestrator(
session_service=session_service,
file_service=file_service,
execution_service=execution_service,
state_service=state_service,
state_archival_service=state_archival_service,
)
async def _execute() -> ExecResponse:
return await orchestrator.execute(
request,
request_id,
api_key_hash=api_key_hash,
is_env_key=is_env_key,
)
execution_task = asyncio.create_task(_execute())
try:
response = await asyncio.wait_for(
asyncio.shield(execution_task), timeout=_KEEPALIVE_INTERVAL
)
logger.info(
"Code execution completed",
request_id=request_id,
session_id=response.session_id,
)
return response
except asyncio.TimeoutError:
if execution_task.done():
response = await execution_task
logger.info(
"Code execution completed",
request_id=request_id,
session_id=response.session_id,
)
return response
# Fall through to streamed keepalives for genuinely long-running work.
pass
except (ValidationError, ServiceUnavailableError):
raise
async def _stream_response():
"""Execute code and stream the response with keepalive whitespace.
Sends a space character every few seconds while the execution is
running. Once the result is ready, sends the JSON body. Leading
whitespace is ignored by JSON parsers, so this is transparent
to clients.
"""
# The endpoint already spent one interval deciding whether to switch to
# streaming. Emit a first keepalive immediately so long-running
# requests stay under client-side socket timeout thresholds.
if not execution_task.done():
yield b" "
# Send keepalive spaces while execution is running
while not execution_task.done():
try:
await asyncio.wait_for(
asyncio.shield(execution_task), timeout=_KEEPALIVE_INTERVAL
)
except asyncio.TimeoutError:
# Execution still running — send keepalive space
if not execution_task.done():
yield b" "
except Exception:
# Task raised an exception — it will be handled below.
break
# Ensure the task is complete
try:
response = await execution_task
except Exception as err:
# Once the streaming response has started, surface failures as a JSON
# error payload instead of raising after headers have been sent.
error_type = ErrorType.INTERNAL_SERVER
if isinstance(err, CodeInterpreterException):
error_type = err.error_type
error_resp = ErrorResponse(
error=str(err),
error_type=error_type,
)
yield error_resp.model_dump_json().encode()
return
# Send the JSON response
logger.info(
"Code execution completed",
request_id=request_id,
session_id=response.session_id,
)
yield response.model_dump_json(exclude_none=True).encode()
return StreamingResponse(
_stream_response(),
media_type="application/json",
)