Skip to content

Commit 977a1e4

Browse files
authored
Merge pull request #29 from onyx-dot-app/dane/ll-session-refactor
refactor(executors): extract reusable primitives
2 parents 54b9123 + 59fafdb commit 977a1e4

2 files changed

Lines changed: 134 additions & 124 deletions

File tree

code-interpreter/app/services/executor_docker.py

Lines changed: 70 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import tarfile
99
import time
1010
import uuid
11-
from collections.abc import Generator, Sequence
11+
from collections.abc import Generator, Mapping, Sequence
1212
from contextlib import contextmanager, suppress
1313
from dataclasses import dataclass
1414
from pathlib import Path
@@ -134,58 +134,51 @@ def _validate_relative_path(self, path_str: str) -> Path:
134134

135135
def _create_tar_archive(
136136
self,
137-
code: str,
137+
code: str | None = None,
138138
files: Sequence[tuple[str, bytes]] | None = None,
139139
last_line_interactive: bool = True,
140140
) -> bytes:
141-
"""Create a tar archive containing the code and any additional files.
141+
"""Create a tar archive optionally containing an entrypoint and files.
142142
143143
Args:
144-
last_line_interactive: If True, wrap code so the last line prints its value
145-
if it's a bare expression (only the last line is affected).
144+
code: If provided, written as ``__main__.py`` at the archive root.
145+
last_line_interactive: If True and code is provided, wrap the code so
146+
the last line prints its value if it's a bare expression.
146147
"""
147148
tar_buffer = io.BytesIO()
148149
with tarfile.open(fileobj=tar_buffer, mode="w") as tar:
149-
# Add __main__.py - optionally wrap in last-line-interactive mode
150-
code_to_execute = code
151-
if last_line_interactive:
152-
# Wrap to make the last expression value print to stdout like Jupyter/REPL
153-
code_to_execute = wrap_last_line_interactive(code)
154-
155-
code_bytes = code_to_execute.encode("utf-8")
156-
code_info = tarfile.TarInfo(name="__main__.py")
157-
code_info.size = len(code_bytes)
158-
code_info.mode = 0o644
159-
tar.addfile(code_info, io.BytesIO(code_bytes))
160-
161-
# Track directories we've created
162-
created_dirs = set()
163-
164-
# Add any additional files
165-
if files:
166-
for file_path, content in files:
167-
# Validate the path
168-
validated_path = self._validate_relative_path(file_path)
169-
if validated_path == Path("__main__.py"):
170-
raise ValueError(
171-
"File path '__main__.py' is reserved for the execution entrypoint."
172-
)
173-
174-
# Create parent directories if needed
175-
parent_parts = validated_path.parts[:-1]
176-
for i in range(len(parent_parts)):
177-
dir_path = "/".join(parent_parts[: i + 1])
178-
if dir_path not in created_dirs:
179-
dir_info = tarfile.TarInfo(name=dir_path + "/")
180-
dir_info.type = tarfile.DIRTYPE
181-
dir_info.mode = 0o755
182-
tar.addfile(dir_info)
183-
created_dirs.add(dir_path)
184-
185-
file_info = tarfile.TarInfo(name=validated_path.as_posix())
186-
file_info.size = len(content)
187-
file_info.mode = 0o644
188-
tar.addfile(file_info, io.BytesIO(content))
150+
if code is not None:
151+
code_to_execute = (
152+
wrap_last_line_interactive(code) if last_line_interactive else code
153+
)
154+
code_bytes = code_to_execute.encode("utf-8")
155+
code_info = tarfile.TarInfo(name="__main__.py")
156+
code_info.size = len(code_bytes)
157+
code_info.mode = 0o644
158+
tar.addfile(code_info, io.BytesIO(code_bytes))
159+
160+
created_dirs: set[str] = set()
161+
for file_path, content in files or ():
162+
validated_path = self._validate_relative_path(file_path)
163+
if code is not None and validated_path == Path("__main__.py"):
164+
raise ValueError(
165+
"File path '__main__.py' is reserved for the execution entrypoint."
166+
)
167+
168+
parent_parts = validated_path.parts[:-1]
169+
for i in range(len(parent_parts)):
170+
dir_path = "/".join(parent_parts[: i + 1])
171+
if dir_path not in created_dirs:
172+
dir_info = tarfile.TarInfo(name=dir_path + "/")
173+
dir_info.type = tarfile.DIRTYPE
174+
dir_info.mode = 0o755
175+
tar.addfile(dir_info)
176+
created_dirs.add(dir_path)
177+
178+
file_info = tarfile.TarInfo(name=validated_path.as_posix())
179+
file_info.size = len(content)
180+
file_info.mode = 0o644
181+
tar.addfile(file_info, io.BytesIO(content))
189182

190183
return tar_buffer.getvalue()
191184

@@ -245,11 +238,16 @@ def _build_run_command(
245238
container_name: str,
246239
cpu_time_limit_sec: int | None,
247240
memory_limit_mb: int | None,
248-
timeout_ms: int,
241+
sleep_seconds: int,
242+
labels: Mapping[str, str] | None = None,
249243
) -> list[str]:
250-
"""Build the ``docker run`` command for an ephemeral container."""
251-
# Start the container in detached mode
252-
# We need CAP_CHOWN to set up the workspace, but we'll drop privileges for execution
244+
"""Build a detached ``docker run`` command.
245+
246+
``sleep_seconds`` controls how long the container's idle ``sleep`` lasts;
247+
callers must ensure it exceeds their work duration. ``labels`` are
248+
attached for later filtering (e.g. by the session reaper).
249+
"""
250+
# We need CAP_CHOWN to set up the workspace, but drop privileges for execution
253251
cmd: list[str] = [
254252
self.docker_binary,
255253
"run",
@@ -267,7 +265,6 @@ def _build_run_command(
267265
"64",
268266
"--security-opt",
269267
"no-new-privileges",
270-
# Keep CAP_CHOWN to allow setting up workspace permissions
271268
"--cap-drop",
272269
"ALL",
273270
"--cap-add",
@@ -288,31 +285,26 @@ def _build_run_command(
288285
"MPLCONFIGDIR=/tmp/matplotlib",
289286
]
290287

288+
for key, value in (labels or {}).items():
289+
cmd.extend(["--label", f"{key}={value}"])
290+
291291
if cpu_time_limit_sec is not None:
292-
cpu_limit = max(int(cpu_time_limit_sec), 1)
292+
cpu_limit = max(cpu_time_limit_sec, 1)
293293
cmd.extend(["--ulimit", f"cpu={cpu_limit}:{cpu_limit}"])
294294

295295
if memory_limit_mb is not None:
296-
memory_limit = max(int(memory_limit_mb), 16)
296+
memory_limit = max(memory_limit_mb, 16)
297297
mem_flag = f"{memory_limit}m"
298298
cmd.extend(["--memory", mem_flag, "--memory-swap", mem_flag])
299299

300300
if self.run_args:
301301
cmd.extend(shlex.split(self.run_args))
302302

303-
# Just sleep - workspace is already created as tmpfs with correct ownership
304-
cmd.extend([self.image, "sleep", str((timeout_ms * 1000) + 10)])
303+
cmd.extend([self.image, "sleep", str(sleep_seconds)])
305304
return cmd
306305

307-
def _stage_files_in_container(
308-
self,
309-
container_name: str,
310-
code: str,
311-
files: Sequence[tuple[str, bytes]] | None,
312-
last_line_interactive: bool,
313-
) -> None:
314-
"""Create a tar archive and stream it into the container workspace."""
315-
tar_archive = self._create_tar_archive(code, files, last_line_interactive)
306+
def _upload_tar_to_container(self, container_name: str, tar_archive: bytes) -> None:
307+
"""Stream a tar archive into the container workspace."""
316308
tar_cmd = [
317309
self.docker_binary,
318310
"exec",
@@ -331,6 +323,17 @@ def _stage_files_in_container(
331323
f"Failed to extract files: {tar_proc.stderr.decode('utf-8', errors='replace')}"
332324
)
333325

326+
def _stage_files_in_container(
327+
self,
328+
container_name: str,
329+
code: str,
330+
files: Sequence[tuple[str, bytes]] | None,
331+
last_line_interactive: bool,
332+
) -> None:
333+
"""Create a tar archive and stream it into the container workspace."""
334+
tar_archive = self._create_tar_archive(code, files, last_line_interactive)
335+
self._upload_tar_to_container(container_name, tar_archive)
336+
334337
@contextmanager
335338
def _run_in_container(
336339
self,
@@ -351,7 +354,10 @@ def _run_in_container(
351354
container_name = f"code-exec-{uuid.uuid4().hex}"
352355

353356
cmd = self._build_run_command(
354-
container_name, cpu_time_limit_sec, memory_limit_mb, timeout_ms
357+
container_name=container_name,
358+
cpu_time_limit_sec=cpu_time_limit_sec,
359+
memory_limit_mb=memory_limit_mb,
360+
sleep_seconds=(timeout_ms * 1000) + 10,
355361
)
356362
start_proc = subprocess.run(cmd, capture_output=True, text=True) # nosec B603
357363
if start_proc.returncode != 0:

code-interpreter/app/services/executor_kubernetes.py

Lines changed: 64 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import tarfile
77
import time
88
import uuid
9-
from collections.abc import Generator, Sequence
9+
from collections.abc import Generator, Mapping, Sequence
1010
from contextlib import contextmanager
1111
from dataclasses import dataclass
1212
from pathlib import Path
@@ -131,30 +131,36 @@ def check_health(self) -> HealthCheck:
131131
def _create_pod_manifest(
132132
self,
133133
pod_name: str,
134-
memory_limit_mb: int | None,
135-
cpu_time_limit_sec: int | None,
134+
*,
135+
command: Sequence[str],
136+
labels: Mapping[str, str],
137+
annotations: Mapping[str, str] | None = None,
138+
active_deadline_seconds: int | None = None,
139+
memory_limit_mb: int | None = None,
140+
cpu_time_limit_sec: int | None = None,
136141
) -> V1Pod:
137-
"""Create a Kubernetes pod manifest for code execution."""
142+
"""Build a Kubernetes pod manifest for an isolated executor container.
138143
139-
resources: dict[str, dict[str, Any]] = {
140-
"limits": {},
141-
"requests": {},
142-
}
144+
``command`` is the executor container's command (e.g. ``["sleep", "3600"]``).
145+
``active_deadline_seconds``, when set, instructs kubelet to stop the pod
146+
at that age — used by sessions to enforce TTL even if the API is down.
147+
"""
148+
resources: dict[str, dict[str, Any]] = {"limits": {}, "requests": {}}
143149

144150
if memory_limit_mb is not None:
145-
memory_limit = max(int(memory_limit_mb), 16)
151+
memory_limit = max(memory_limit_mb, 16)
146152
resources["limits"]["memory"] = f"{memory_limit}Mi"
147153
resources["requests"]["memory"] = f"{min(memory_limit, 64)}Mi"
148154

149155
if cpu_time_limit_sec is not None:
150-
cpu_limit = max(int(cpu_time_limit_sec), 1)
156+
cpu_limit = max(cpu_time_limit_sec, 1)
151157
resources["limits"]["cpu"] = str(cpu_limit)
152158
resources["requests"]["cpu"] = "100m"
153159

154160
container = V1Container(
155161
name="executor",
156162
image=self.image,
157-
command=["sleep", "3600"],
163+
command=list(command),
158164
working_dir="/workspace",
159165
resources=resources if resources["limits"] else None,
160166
security_context={
@@ -204,6 +210,7 @@ def _create_pod_manifest(
204210
init_containers=[network_lockdown_container],
205211
containers=[container],
206212
restart_policy="Never",
213+
active_deadline_seconds=active_deadline_seconds,
207214
service_account_name=self.service_account if self.service_account else None,
208215
volumes=[
209216
{"name": "workspace", "emptyDir": {"sizeLimit": "100Mi"}},
@@ -218,70 +225,65 @@ def _create_pod_manifest(
218225
metadata = V1ObjectMeta(
219226
name=pod_name,
220227
namespace=self.namespace,
221-
labels={
222-
"app": "code-interpreter",
223-
"component": "executor",
224-
},
228+
labels=dict(labels),
229+
annotations=dict(annotations) if annotations else None,
225230
)
226231

227232
return V1Pod(api_version="v1", kind="Pod", metadata=metadata, spec=spec)
228233

229234
def _create_tar_archive(
230235
self,
231-
code: str,
236+
code: str | None = None,
232237
files: Sequence[tuple[str, bytes]] | None = None,
233238
last_line_interactive: bool = True,
234239
) -> bytes:
235-
"""Create a tar archive containing the code and any additional files.
240+
"""Create a tar archive optionally containing an entrypoint and files.
236241
237242
Args:
238-
last_line_interactive: If True, wrap code so the last line prints its value
239-
if it's a bare expression (only the last line is affected).
243+
code: If provided, written as ``__main__.py`` at the archive root.
244+
last_line_interactive: If True and code is provided, wrap the code so
245+
the last line prints its value if it's a bare expression.
240246
"""
241247
tar_buffer = io.BytesIO()
242248
with tarfile.open(fileobj=tar_buffer, mode="w") as tar:
243-
# Add __main__.py - optionally wrap in last-line-interactive mode
244-
code_to_execute = code
245-
if last_line_interactive:
246-
# Wrap to make the last expression value print to stdout like Jupyter/REPL
247-
code_to_execute = wrap_last_line_interactive(code)
248-
249-
code_bytes = code_to_execute.encode("utf-8")
250-
code_info = tarfile.TarInfo(name="__main__.py")
251-
code_info.size = len(code_bytes)
252-
code_info.mode = 0o644
253-
code_info.uid = 65532
254-
code_info.gid = 65532
255-
tar.addfile(code_info, io.BytesIO(code_bytes))
256-
257-
created_dirs = set()
258-
259-
if files:
260-
for file_path, content in files:
261-
validated_path = self._validate_relative_path(file_path)
262-
if validated_path == Path("__main__.py"):
263-
raise ValueError(
264-
"File path '__main__.py' is reserved for the execution entrypoint."
265-
)
249+
if code is not None:
250+
code_to_execute = (
251+
wrap_last_line_interactive(code) if last_line_interactive else code
252+
)
253+
code_bytes = code_to_execute.encode("utf-8")
254+
code_info = tarfile.TarInfo(name="__main__.py")
255+
code_info.size = len(code_bytes)
256+
code_info.mode = 0o644
257+
code_info.uid = 65532
258+
code_info.gid = 65532
259+
tar.addfile(code_info, io.BytesIO(code_bytes))
260+
261+
created_dirs: set[str] = set()
262+
for file_path, content in files or ():
263+
validated_path = self._validate_relative_path(file_path)
264+
if code is not None and validated_path == Path("__main__.py"):
265+
raise ValueError(
266+
"File path '__main__.py' is reserved for the execution entrypoint."
267+
)
266268

267-
parent_parts = validated_path.parts[:-1]
268-
for i in range(len(parent_parts)):
269-
dir_path = "/".join(parent_parts[: i + 1])
270-
if dir_path not in created_dirs:
271-
dir_info = tarfile.TarInfo(name=dir_path + "/")
272-
dir_info.type = tarfile.DIRTYPE
273-
dir_info.mode = 0o755
274-
dir_info.uid = 65532
275-
dir_info.gid = 65532
276-
tar.addfile(dir_info)
277-
created_dirs.add(dir_path)
278-
279-
file_info = tarfile.TarInfo(name=validated_path.as_posix())
280-
file_info.size = len(content)
281-
file_info.mode = 0o644
282-
file_info.uid = 65532
283-
file_info.gid = 65532
284-
tar.addfile(file_info, io.BytesIO(content))
269+
parent_parts = validated_path.parts[:-1]
270+
for i in range(len(parent_parts)):
271+
dir_path = "/".join(parent_parts[: i + 1])
272+
if dir_path not in created_dirs:
273+
dir_info = tarfile.TarInfo(name=dir_path + "/")
274+
dir_info.type = tarfile.DIRTYPE
275+
dir_info.mode = 0o755
276+
dir_info.uid = 65532
277+
dir_info.gid = 65532
278+
tar.addfile(dir_info)
279+
created_dirs.add(dir_path)
280+
281+
file_info = tarfile.TarInfo(name=validated_path.as_posix())
282+
file_info.size = len(content)
283+
file_info.mode = 0o644
284+
file_info.uid = 65532
285+
file_info.gid = 65532
286+
tar.addfile(file_info, io.BytesIO(content))
285287

286288
return tar_buffer.getvalue()
287289

@@ -404,6 +406,8 @@ def _run_in_pod(
404406

405407
pod_manifest = self._create_pod_manifest(
406408
pod_name=pod_name,
409+
command=["sleep", "3600"],
410+
labels={"app": "code-interpreter", "component": "executor"},
407411
memory_limit_mb=memory_limit_mb,
408412
cpu_time_limit_sec=cpu_time_limit_sec,
409413
)

0 commit comments

Comments
 (0)