Skip to content

Commit 83261c0

Browse files
fix: release lock before streaming and add kernel interrupt support
Fixes #213 — asyncio.Lock in messaging.py not released on client disconnect, causing cascading timeouts. Changes: - Narrow lock scope in ContextWebSocket.execute() to only cover the prepare+send phase (Phase A), releasing it before result streaming (Phase B). This prevents orphaned locks on client disconnect. - Schedule env var cleanup task under the lock (before release) to avoid the race condition flagged in PRs #234/#235. - Add POST /contexts/{id}/interrupt endpoint that calls Jupyter's kernel interrupt API, allowing clients to stop long-running code without restarting the kernel (preserves state). - Add interrupt_code_context/interruptCodeContext to Python and JS SDKs. Co-Authored-By: vasek <vasek.mlejnsky@gmail.com>
1 parent 6d703a4 commit 83261c0

File tree

5 files changed

+163
-8
lines changed

5 files changed

+163
-8
lines changed

js/src/sandbox.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,4 +436,47 @@ export class Sandbox extends BaseSandbox {
436436
throw formatRequestTimeoutError(error)
437437
}
438438
}
439+
440+
/**
441+
* Interrupt a running execution in a context.
442+
*
443+
* This sends an interrupt signal to the Jupyter kernel, which stops the
444+
* currently running code without restarting the kernel. All previously
445+
* defined variables, imports, and state are preserved.
446+
*
447+
* This is useful for stopping long-running code after a timeout without
448+
* losing kernel state.
449+
*
450+
* @param context context to interrupt.
451+
*
452+
* @returns void.
453+
*/
454+
async interruptCodeContext(context: Context | string): Promise<void> {
455+
try {
456+
const id = typeof context === 'string' ? context : context.id
457+
const headers: Record<string, string> = {
458+
'Content-Type': 'application/json',
459+
}
460+
461+
if (this.trafficAccessToken) {
462+
headers['E2B-Traffic-Access-Token'] = this.trafficAccessToken
463+
}
464+
465+
const res = await fetch(`${this.jupyterUrl}/contexts/${id}/interrupt`, {
466+
method: 'POST',
467+
headers,
468+
keepalive: true,
469+
signal: this.connectionConfig.getSignal(
470+
this.connectionConfig.requestTimeoutMs
471+
),
472+
})
473+
474+
const error = await extractError(res)
475+
if (error) {
476+
throw error
477+
}
478+
} catch (error) {
479+
throw formatRequestTimeoutError(error)
480+
}
481+
}
439482
}

python/e2b_code_interpreter/code_interpreter_async.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,3 +369,41 @@ async def restart_code_context(
369369
raise err
370370
except httpx.TimeoutException:
371371
raise format_request_timeout_error()
372+
373+
async def interrupt_code_context(
374+
self,
375+
context: Union[Context, str],
376+
) -> None:
377+
"""
378+
Interrupt a running execution in a context.
379+
380+
This sends an interrupt signal to the Jupyter kernel, which stops the
381+
currently running code without restarting the kernel. All previously
382+
defined variables, imports, and state are preserved.
383+
384+
This is useful for stopping long-running code after a timeout without
385+
losing kernel state.
386+
387+
:param context: Context to interrupt. Can be a Context object or a context ID string.
388+
389+
:return: None
390+
"""
391+
context_id = context.id if isinstance(context, Context) else context
392+
try:
393+
headers = {
394+
"Content-Type": "application/json",
395+
}
396+
if self.traffic_access_token:
397+
headers["E2B-Traffic-Access-Token"] = self.traffic_access_token
398+
399+
response = await self._client.post(
400+
f"{self._jupyter_url}/contexts/{context_id}/interrupt",
401+
headers=headers,
402+
timeout=self.connection_config.request_timeout,
403+
)
404+
405+
err = await aextract_exception(response)
406+
if err:
407+
raise err
408+
except httpx.TimeoutException:
409+
raise format_request_timeout_error()

python/e2b_code_interpreter/code_interpreter_sync.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,3 +366,42 @@ def restart_code_context(
366366
raise err
367367
except httpx.TimeoutException:
368368
raise format_request_timeout_error()
369+
370+
def interrupt_code_context(
371+
self,
372+
context: Union[Context, str],
373+
) -> None:
374+
"""
375+
Interrupt a running execution in a context.
376+
377+
This sends an interrupt signal to the Jupyter kernel, which stops the
378+
currently running code without restarting the kernel. All previously
379+
defined variables, imports, and state are preserved.
380+
381+
This is useful for stopping long-running code after a timeout without
382+
losing kernel state.
383+
384+
:param context: Context to interrupt. Can be a Context object or a context ID string.
385+
386+
:return: None
387+
"""
388+
context_id = context.id if isinstance(context, Context) else context
389+
390+
try:
391+
headers: Dict[str, str] = {"Content-Type": "application/json"}
392+
if self._envd_access_token:
393+
headers["X-Access-Token"] = self._envd_access_token
394+
if self.traffic_access_token:
395+
headers["E2B-Traffic-Access-Token"] = self.traffic_access_token
396+
397+
response = self._client.post(
398+
f"{self._jupyter_url}/contexts/{context_id}/interrupt",
399+
headers=headers,
400+
timeout=self.connection_config.request_timeout,
401+
)
402+
403+
err = extract_exception(response)
404+
if err:
405+
raise err
406+
except httpx.TimeoutException:
407+
raise format_request_timeout_error()

template/server/main.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,27 @@ async def restart_context(context_id: str) -> None:
183183
websockets[context_id] = ws
184184

185185

186+
@app.post("/contexts/{context_id}/interrupt")
187+
async def interrupt_context(context_id: str) -> None:
188+
logger.info(f"Interrupting context {context_id}")
189+
190+
ws = websockets.get(context_id, None)
191+
if not ws:
192+
return PlainTextResponse(
193+
f"Context {context_id} not found",
194+
status_code=404,
195+
)
196+
197+
response = await client.post(
198+
f"{JUPYTER_BASE_URL}/api/kernels/{ws.context_id}/interrupt"
199+
)
200+
if not response.is_success:
201+
return PlainTextResponse(
202+
f"Failed to interrupt context {context_id}",
203+
status_code=500,
204+
)
205+
206+
186207
@app.delete("/contexts/{context_id}")
187208
async def remove_context(context_id: str) -> None:
188209
logger.info(f"Removing context {context_id}")

template/server/messaging.py

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,11 @@ async def execute(
294294
if self._ws is None:
295295
raise Exception("WebSocket not connected")
296296

297+
# Phase A (under lock): prepare env vars, send request to kernel,
298+
# and schedule env var cleanup. The lock serializes sends to the
299+
# Jupyter kernel WebSocket and protects shared state like
300+
# _global_env_vars and _cleanup_task.
301+
message_id = str(uuid.uuid4())
297302
async with self._lock:
298303
# Wait for any pending cleanup task to complete
299304
if self._cleanup_task and not self._cleanup_task.done():
@@ -327,7 +332,6 @@ async def execute(
327332
)
328333
complete_code = f"{indented_env_code}\n{complete_code}"
329334

330-
message_id = str(uuid.uuid4())
331335
execution = Execution()
332336
self._executions[message_id] = execution
333337

@@ -362,18 +366,28 @@ async def execute(
362366
)
363367
await execution.queue.put(UnexpectedEndOfExecution())
364368

365-
# Stream the results
366-
async for item in self._wait_for_result(message_id):
367-
yield item
368-
369-
del self._executions[message_id]
370-
371-
# Clean up env vars in a separate request after the main code has run
369+
# Schedule env var cleanup while still holding the lock.
370+
# The cleanup sends a background execute_request to the kernel,
371+
# which will be processed after the main code finishes (kernel
372+
# serializes execution). The next execute() call will await
373+
# this task in Phase A before proceeding.
372374
if env_vars:
373375
self._cleanup_task = asyncio.create_task(
374376
self._cleanup_env_vars(env_vars)
375377
)
376378

379+
# Phase B (no lock held): stream results back to client.
380+
# Results are routed by unique message_id in _process_message(),
381+
# so no shared state is accessed during streaming. If the client
382+
# disconnects (SDK timeout), the generator is abandoned but the
383+
# lock is already released — no orphaned lock.
384+
try:
385+
async for item in self._wait_for_result(message_id):
386+
yield item
387+
finally:
388+
# Clean up execution entry even if generator is abandoned
389+
self._executions.pop(message_id, None)
390+
377391
async def _receive_message(self):
378392
if not self._ws:
379393
logger.error("No WebSocket connection")

0 commit comments

Comments
 (0)