Skip to content

Commit f326249

Browse files
authored
Release lock for interruptable client requests (#237)
* release lock on client disconnect * move back * fmt * move clean up back to original place * added changeset * format * delete ineffective timeout tests * added server tests * undo server tests * interrupt * format * bugbot * addressed comments
1 parent 52b5767 commit f326249

File tree

5 files changed

+135
-6
lines changed

5 files changed

+135
-6
lines changed

.changeset/clever-bears-accept.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@e2b/code-interpreter-template': patch
3+
---
4+
5+
interrupt kernel execution on client disconnect

js/tests/interrupt.test.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import { expect } from 'vitest'
2+
3+
import { sandboxTest, wait } from './setup'
4+
5+
sandboxTest(
6+
'subsequent execution works after client timeout',
7+
async ({ sandbox }) => {
8+
// Start a long-running execution with a short timeout.
9+
// This simulates a client disconnect: the SDK aborts the connection,
10+
// which should trigger the server to interrupt the kernel (#213).
11+
await expect(
12+
sandbox.runCode('import time; time.sleep(300)', { timeoutMs: 3_000 })
13+
).rejects.toThrow()
14+
15+
// Wait for the server to detect the disconnect (via keepalive write
16+
// failure) and interrupt the kernel.
17+
await wait(5_000)
18+
19+
// Run a simple execution. Without the kernel interrupt fix, this would
20+
// block behind the still-running sleep(30) and time out.
21+
const result = await sandbox.runCode('1 + 1', { timeoutMs: 10_000 })
22+
expect(result.text).toEqual('2')
23+
},
24+
60_000
25+
)
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import asyncio
2+
3+
import pytest
4+
5+
from e2b import TimeoutException
6+
from e2b_code_interpreter.code_interpreter_async import AsyncSandbox
7+
8+
9+
async def test_subsequent_execution_works_after_client_timeout(
10+
async_sandbox: AsyncSandbox,
11+
):
12+
# Start a long-running execution with a short timeout.
13+
# This simulates a client disconnect: the SDK closes the connection,
14+
# which should trigger the server to interrupt the kernel (#213).
15+
with pytest.raises(TimeoutException):
16+
await async_sandbox.run_code("import time; time.sleep(300)", timeout=3)
17+
18+
# Wait for the server to detect the disconnect (via keepalive write
19+
# failure) and interrupt the kernel.
20+
await asyncio.sleep(5)
21+
22+
# Run a simple execution. Without the kernel interrupt fix, this would
23+
# block behind the still-running sleep(30) and time out.
24+
result = await async_sandbox.run_code("1 + 1", timeout=10)
25+
assert result.text == "2"
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import time
2+
3+
import pytest
4+
5+
from e2b import TimeoutException
6+
from e2b_code_interpreter.code_interpreter_sync import Sandbox
7+
8+
9+
def test_subsequent_execution_works_after_client_timeout(sandbox: Sandbox):
10+
# Start a long-running execution with a short timeout.
11+
# This simulates a client disconnect: the SDK closes the connection,
12+
# which should trigger the server to interrupt the kernel (#213).
13+
with pytest.raises(TimeoutException):
14+
sandbox.run_code("import time; time.sleep(300)", timeout=3)
15+
16+
# Wait for the server to detect the disconnect (via keepalive write
17+
# failure) and interrupt the kernel.
18+
time.sleep(5)
19+
20+
# Run a simple execution. Without the kernel interrupt fix, this would
21+
# block behind the still-running sleep(30) and time out.
22+
result = sandbox.run_code("1 + 1", timeout=10)
23+
assert result.text == "2"

template/server/messaging.py

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import uuid
55
import asyncio
66

7+
import httpx
8+
79
from asyncio import Queue
810
from typing import (
911
Dict,
@@ -26,13 +28,15 @@
2628
OutputType,
2729
UnexpectedEndOfExecution,
2830
)
31+
from consts import JUPYTER_BASE_URL
2932
from errors import ExecutionError
3033
from envs import get_envs
3134

3235
logger = logging.getLogger(__name__)
3336

3437
MAX_RECONNECT_RETRIES = 3
3538
PING_TIMEOUT = 30
39+
KEEPALIVE_INTERVAL = 5 # seconds between keepalive pings during streaming
3640

3741

3842
class Execution:
@@ -97,6 +101,22 @@ async def connect(self):
97101
name="receive_message",
98102
)
99103

104+
async def interrupt(self):
105+
"""Interrupt the current kernel execution via the Jupyter REST API."""
106+
try:
107+
async with httpx.AsyncClient() as client:
108+
response = await client.post(
109+
f"{JUPYTER_BASE_URL}/api/kernels/{self.context_id}/interrupt"
110+
)
111+
if response.is_success:
112+
logger.info(f"Kernel {self.context_id} interrupted successfully")
113+
else:
114+
logger.error(
115+
f"Failed to interrupt kernel {self.context_id}: {response.status_code}"
116+
)
117+
except Exception as e:
118+
logger.error(f"Error interrupting kernel {self.context_id}: {e}")
119+
100120
def _get_execute_request(
101121
self, msg_id: str, code: Union[str, StrictStr], background: bool
102122
) -> str:
@@ -238,8 +258,24 @@ async def _cleanup_env_vars(self, env_vars: Dict[StrictStr, str]):
238258
async def _wait_for_result(self, message_id: str):
239259
queue = self._executions[message_id].queue
240260

261+
# Use a timeout on queue.get() to periodically send keepalives.
262+
# Without keepalives, the generator blocks indefinitely waiting for
263+
# kernel output. If the client silently disappears (e.g. network
264+
# failure), uvicorn can only detect the broken connection when it
265+
# tries to write — so we force a write every KEEPALIVE_INTERVAL
266+
# seconds. This ensures timely disconnect detection and kernel
267+
# interrupt for abandoned executions (see #213).
241268
while True:
242-
output = await queue.get()
269+
try:
270+
output = await asyncio.wait_for(queue.get(), timeout=KEEPALIVE_INTERVAL)
271+
except asyncio.TimeoutError:
272+
# Yield a keepalive so Starlette writes to the socket.
273+
# If the client has disconnected, the write fails and
274+
# uvicorn delivers http.disconnect, which cancels this
275+
# generator via CancelledError.
276+
yield {"type": "keepalive"}
277+
continue
278+
243279
if output.type == OutputType.END_OF_EXECUTION:
244280
break
245281

@@ -362,11 +398,26 @@ async def execute(
362398
)
363399
await execution.queue.put(UnexpectedEndOfExecution())
364400

365-
# Stream the results
366-
async for item in self._wait_for_result(message_id):
367-
yield item
368-
369-
del self._executions[message_id]
401+
# Stream the results.
402+
# If the client disconnects (Starlette cancels the task), we
403+
# interrupt the kernel so the next execution isn't blocked (#213).
404+
try:
405+
async for item in self._wait_for_result(message_id):
406+
yield item
407+
except (asyncio.CancelledError, GeneratorExit):
408+
logger.warning(
409+
f"Client disconnected during execution ({message_id}), interrupting kernel"
410+
)
411+
# Shield the interrupt from the ongoing cancellation so
412+
# the HTTP request to the kernel actually completes.
413+
try:
414+
await asyncio.shield(self.interrupt())
415+
except asyncio.CancelledError:
416+
pass
417+
raise
418+
finally:
419+
if message_id in self._executions:
420+
del self._executions[message_id]
370421

371422
# Clean up env vars in a separate request after the main code has run
372423
if env_vars:

0 commit comments

Comments
 (0)