Skip to content

Commit 74eb494

Browse files
authored
Merge pull request #31 from onyx-dot-app/dane/ll-session-ttl
feat(sessions): TTL enforcement and crash-safe cleanup
2 parents 2c2207f + 352cb5b commit 74eb494

9 files changed

Lines changed: 416 additions & 43 deletions

File tree

code-interpreter/app/api/routes.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,10 @@ def _stage_request_files(
7676
req: ExecuteRequest,
7777
storage: FileStorageService,
7878
) -> tuple[list[tuple[str, bytes]], dict[str, bytes]]:
79-
"""Resolve uploaded file IDs into content for the executor."""
79+
"""Resolve uploaded file IDs into content for the executor.
80+
81+
Returns (staged_files, input_files_map).
82+
"""
8083
return _resolve_uploaded_files(req.files, storage)
8184

8285

@@ -267,16 +270,18 @@ def delete_file(file_id: str) -> Response:
267270
status_code=status.HTTP_201_CREATED,
268271
)
269272
def create_session(req: CreateSessionRequest) -> CreateSessionResponse:
270-
"""Create a long-lived code-executor pod.
273+
"""Create a long-lived code-executor pod with the given TTL.
271274
272-
The session must be torn down explicitly via DELETE /v1/sessions/{id}.
275+
The pod is guaranteed to be torn down at or before the TTL expires, even
276+
if the API service crashes and restarts.
273277
"""
274278
settings = get_settings()
275279
storage = get_file_storage()
276280
staged_files, _ = _resolve_uploaded_files(req.files, storage)
277281

278282
try:
279283
info = get_executor().create_session(
284+
ttl_seconds=req.ttl_seconds,
280285
files=staged_files,
281286
cpu_time_limit_sec=settings.cpu_time_limit_sec,
282287
memory_limit_mb=settings.memory_limit_mb,
@@ -292,7 +297,10 @@ def create_session(req: CreateSessionRequest) -> CreateSessionResponse:
292297
detail=str(exc),
293298
) from exc
294299

295-
return CreateSessionResponse(session_id=info.session_id)
300+
return CreateSessionResponse(
301+
session_id=info.session_id,
302+
expires_at=info.expires_at,
303+
)
296304

297305

298306
@router.delete("/sessions/{session_id}", status_code=status.HTTP_204_NO_CONTENT)

code-interpreter/app/main.py

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
from __future__ import annotations
22

3+
import asyncio
34
import logging
45
import subprocess
56
from collections.abc import AsyncGenerator
6-
from contextlib import asynccontextmanager
7+
from contextlib import asynccontextmanager, suppress
78
from shutil import which
89
from typing import Final
910

@@ -14,6 +15,8 @@
1415
from app.models.schemas import HealthResponse
1516
from app.services.executor_factory import get_executor
1617

18+
SESSION_REAPER_INTERVAL_SEC = 30
19+
1720
# Configure logging
1821
logging.basicConfig(
1922
level=logging.INFO,
@@ -78,6 +81,24 @@ def _ensure_docker_image_available() -> None:
7881
) from e
7982

8083

84+
async def _reap_expired_sessions_once() -> None:
85+
"""Run a single reap pass via the configured executor."""
86+
try:
87+
count = await asyncio.to_thread(get_executor().reap_expired_sessions)
88+
except Exception:
89+
logger.warning("Session reaper pass failed", exc_info=True)
90+
return
91+
if count > 0:
92+
logger.info("Reaped %d expired session(s)", count)
93+
94+
95+
async def _session_reaper_loop() -> None:
96+
"""Periodically delete sessions whose TTL has elapsed."""
97+
while True:
98+
await asyncio.sleep(SESSION_REAPER_INTERVAL_SEC)
99+
await _reap_expired_sessions_once()
100+
101+
81102
@asynccontextmanager
82103
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
83104
"""Manage application lifespan events."""
@@ -87,9 +108,16 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
87108
_ensure_docker_image_available()
88109
logger.info("Docker executor image is ready")
89110

90-
yield
111+
# Reap any sessions whose TTL elapsed while the service was down.
112+
await _reap_expired_sessions_once()
113+
reaper_task = asyncio.create_task(_session_reaper_loop())
91114

92-
# Shutdown: Add any cleanup logic here if needed in the future
115+
try:
116+
yield
117+
finally:
118+
reaper_task.cancel()
119+
with suppress(asyncio.CancelledError):
120+
await reaper_task
93121

94122

95123
def create_app() -> FastAPI:

code-interpreter/app/models/schemas.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,12 +122,28 @@ class HealthResponse(BaseModel):
122122
message: StrictStr | None = None
123123

124124

125+
DEFAULT_SESSION_TTL_SEC = 15 * 60
126+
MAX_SESSION_TTL_SEC = 24 * 60 * 60
127+
128+
125129
class CreateSessionRequest(BaseModel):
126130
files: list[ExecuteFile] = Field(
127131
default_factory=list,
128132
description="Files to stage in the session workspace at create time.",
129133
)
134+
ttl_seconds: StrictInt = Field(
135+
DEFAULT_SESSION_TTL_SEC,
136+
ge=1,
137+
le=MAX_SESSION_TTL_SEC,
138+
description=(
139+
"Session lifetime in seconds. The session pod is automatically "
140+
"destroyed after this duration even if the API service crashes."
141+
),
142+
)
130143

131144

132145
class CreateSessionResponse(BaseModel):
133146
session_id: StrictStr = Field(..., description="Identifier for the session pod/container.")
147+
expires_at: float = Field(
148+
..., description="Unix timestamp when the session is scheduled to expire."
149+
)

code-interpreter/app/services/executor_base.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,11 +111,13 @@ class SessionInfo:
111111
"""Identifying information for a long-lived session."""
112112

113113
session_id: str
114+
expires_at: float
114115

115116

116117
SESSION_NAME_PREFIX = "code-session-"
117118
SESSION_APP_LABEL = "code-interpreter"
118119
SESSION_COMPONENT_LABEL = "session"
120+
SESSION_EXPIRES_AT_KEY = "code-interpreter.expires-at"
119121

120122

121123
class ExecutorProtocol(Protocol):
@@ -183,21 +185,27 @@ def execute_python_streaming(
183185
def create_session(
184186
self,
185187
*,
188+
ttl_seconds: int,
186189
files: Sequence[tuple[str, bytes]] | None = None,
187190
cpu_time_limit_sec: int | None = None,
188191
memory_limit_mb: int | None = None,
189192
) -> SessionInfo:
190193
"""Create a long-lived execution environment.
191194
192-
Returns identifying information for the session. The caller is
193-
responsible for invoking ``delete_session`` when finished.
195+
Returns identifying information for the session. The session is
196+
guaranteed to be torn down at or before ``expires_at`` even if this
197+
process crashes.
194198
"""
195199
raise NotImplementedError(f"{type(self).__name__} does not support sessions")
196200

197201
def delete_session(self, session_id: str) -> bool:
198202
"""Tear down a session by ID. Returns True if found and deleted."""
199203
raise NotImplementedError(f"{type(self).__name__} does not support sessions")
200204

205+
def reap_expired_sessions(self) -> int:
206+
"""Delete sessions whose TTL has elapsed. Returns number reaped."""
207+
return 0
208+
201209
@staticmethod
202210
def truncate_output(stream: bytes, max_bytes: int) -> str:
203211
if len(stream) <= max_bytes:

code-interpreter/app/services/executor_docker.py

Lines changed: 60 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from app.services.executor_base import (
2424
SESSION_APP_LABEL,
2525
SESSION_COMPONENT_LABEL,
26+
SESSION_EXPIRES_AT_KEY,
2627
SESSION_NAME_PREFIX,
2728
BaseExecutor,
2829
EntryKind,
@@ -38,10 +39,6 @@
3839

3940
logger = logging.getLogger(__name__)
4041

41-
# Sessions keep their idle container alive for at most this many seconds; a
42-
# follow-up PR replaces this with a per-session TTL plus a reaper.
43-
SESSION_MAX_LIFETIME_SECONDS = 24 * 60 * 60
44-
4542

4643
@dataclass
4744
class _ExecContext:
@@ -405,20 +402,23 @@ def _run_in_container(
405402
def create_session(
406403
self,
407404
*,
405+
ttl_seconds: int,
408406
files: Sequence[tuple[str, bytes]] | None = None,
409407
cpu_time_limit_sec: int | None = None,
410408
memory_limit_mb: int | None = None,
411409
) -> SessionInfo:
412410
container_name = f"{SESSION_NAME_PREFIX}{uuid.uuid4().hex}"
411+
expires_at = time.time() + ttl_seconds
413412

414413
cmd = self._build_run_command(
415414
container_name=container_name,
416415
cpu_time_limit_sec=cpu_time_limit_sec,
417416
memory_limit_mb=memory_limit_mb,
418-
sleep_seconds=SESSION_MAX_LIFETIME_SECONDS,
417+
sleep_seconds=ttl_seconds,
419418
labels={
420419
"app": SESSION_APP_LABEL,
421420
"component": SESSION_COMPONENT_LABEL,
421+
SESSION_EXPIRES_AT_KEY: str(expires_at),
422422
},
423423
)
424424
start_proc = subprocess.run(cmd, capture_output=True, text=True) # nosec B603
@@ -433,8 +433,8 @@ def create_session(
433433
self._kill_container(container_name)
434434
raise
435435

436-
logger.info("Created session container %s", container_name)
437-
return SessionInfo(session_id=container_name)
436+
logger.info("Created session container %s (expires at %s)", container_name, expires_at)
437+
return SessionInfo(session_id=container_name, expires_at=expires_at)
438438

439439
def delete_session(self, session_id: str) -> bool:
440440
if not session_id.startswith(SESSION_NAME_PREFIX):
@@ -444,15 +444,64 @@ def delete_session(self, session_id: str) -> bool:
444444
capture_output=True,
445445
text=True,
446446
)
447-
# `docker rm -f <missing>` exits 0 on modern Docker, so check stderr
448-
# for the "not found" message regardless of exit code.
447+
if result.returncode == 0:
448+
return True
449+
# docker rm -f exits non-zero only when the container does not exist.
449450
stderr = (result.stderr or "").lower()
450451
if "no such container" in stderr or "not found" in stderr:
451452
return False
452-
if result.returncode == 0:
453-
return True
454453
raise RuntimeError(f"Failed to delete session {session_id}: {result.stderr}")
455454

455+
def reap_expired_sessions(self) -> int:
456+
list_cmd = [
457+
self.docker_binary,
458+
"ps",
459+
"-a",
460+
"--filter",
461+
f"label=app={SESSION_APP_LABEL}",
462+
"--filter",
463+
f"label=component={SESSION_COMPONENT_LABEL}",
464+
"--format",
465+
f'{{{{.Names}}}}\t{{{{.Label "{SESSION_EXPIRES_AT_KEY}"}}}}',
466+
]
467+
try:
468+
list_result = subprocess.run( # nosec B603
469+
list_cmd, capture_output=True, text=True, timeout=10
470+
)
471+
except subprocess.TimeoutExpired:
472+
logger.warning("Timed out listing session containers for reap")
473+
return 0
474+
475+
if list_result.returncode != 0:
476+
logger.warning("Failed to list session containers: %s", list_result.stderr)
477+
return 0
478+
479+
now = time.time()
480+
reaped = 0
481+
for line in list_result.stdout.splitlines():
482+
name, _, expires_str = line.partition("\t")
483+
name = name.strip()
484+
expires_str = expires_str.strip()
485+
if not name or not expires_str:
486+
continue
487+
try:
488+
expires_at = float(expires_str)
489+
except ValueError:
490+
continue
491+
if expires_at >= now:
492+
continue
493+
rm_result = subprocess.run( # nosec B603
494+
[self.docker_binary, "rm", "-f", name],
495+
capture_output=True,
496+
text=True,
497+
)
498+
if rm_result.returncode == 0:
499+
reaped += 1
500+
logger.info("Reaped expired session container %s", name)
501+
else:
502+
logger.warning("Failed to reap session container %s: %s", name, rm_result.stderr)
503+
return reaped
504+
456505
def execute_python(
457506
self,
458507
*,

0 commit comments

Comments
 (0)