Skip to content

Commit 12b808a

Browse files
Merge pull request #20 from onyx-dot-app/fix/k8s-executor-cleanup
fix(k8s): Kubernetes executor pod cleanup
2 parents 53792b4 + 7377167 commit 12b808a

2 files changed

Lines changed: 163 additions & 26 deletions

File tree

code-interpreter/app/services/executor_kubernetes.py

Lines changed: 98 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import time
88
import uuid
99
from collections.abc import Generator, Sequence
10-
from contextlib import contextmanager, suppress
10+
from contextlib import contextmanager
1111
from dataclasses import dataclass
1212
from pathlib import Path
1313
from typing import Any
@@ -41,6 +41,10 @@
4141

4242
logger = logging.getLogger(__name__)
4343

44+
POD_DELETE_RETRIES = 3
45+
POD_DELETE_RETRY_DELAY_SECONDS = 0.2
46+
POD_DELETE_CONFIRM_TIMEOUT_SECONDS = 2.0
47+
4448

4549
def _parse_exit_code(error: str) -> int | None:
4650
"""Parse the exit code from a Kubernetes exec error channel message."""
@@ -75,7 +79,11 @@ def __init__(self) -> None:
7579
except config.ConfigException:
7680
config.load_kube_config()
7781

78-
self.v1 = client.CoreV1Api()
82+
# Keep REST calls on a dedicated ApiClient. kubernetes.stream.stream mutates
83+
# the ApiClient request path for websocket exec calls, so mixing CRUD and
84+
# exec traffic on one client can leave later REST calls in a broken state.
85+
self._rest_api_client = client.ApiClient()
86+
self.v1 = client.CoreV1Api(api_client=self._rest_api_client)
7987
self.namespace = KUBERNETES_EXECUTOR_NAMESPACE
8088
self.image = KUBERNETES_EXECUTOR_IMAGE
8189
self.service_account = KUBERNETES_EXECUTOR_SERVICE_ACCOUNT
@@ -263,20 +271,42 @@ def _wait_for_pod_ready(self, pod_name: str, timeout_sec: int = 30) -> None:
263271
time.sleep(0.1)
264272
raise RuntimeError(f"Pod {pod_name} did not become ready in {timeout_sec} seconds")
265273

274+
def _stream_pod_exec(
275+
self,
276+
pod_name: str,
277+
command: list[str],
278+
*,
279+
stderr: bool,
280+
stdin: bool,
281+
stdout: bool,
282+
tty: bool,
283+
preload_content: bool = False,
284+
) -> ws_client.WSClient:
285+
"""Run a websocket exec call using an isolated ApiClient instance."""
286+
stream_api = client.CoreV1Api(api_client=client.ApiClient())
287+
return stream.stream(
288+
stream_api.connect_get_namespaced_pod_exec,
289+
pod_name,
290+
self.namespace,
291+
command=command,
292+
stderr=stderr,
293+
stdin=stdin,
294+
stdout=stdout,
295+
tty=tty,
296+
_preload_content=preload_content,
297+
)
298+
266299
def _upload_tar_to_pod(self, pod_name: str, tar_archive: bytes) -> None:
267300
"""Upload and extract a tar archive into the pod's workspace."""
268301
logger.info(f"Uploading tar archive ({len(tar_archive)} bytes) to pod {pod_name}")
269302
exec_command = ["tar", "-x", "-C", "/workspace"]
270-
resp = stream.stream(
271-
self.v1.connect_get_namespaced_pod_exec,
303+
resp = self._stream_pod_exec(
272304
pod_name,
273-
self.namespace,
274305
command=exec_command,
275306
stderr=True,
276307
stdin=True,
277308
stdout=True,
278309
tty=False,
279-
_preload_content=False,
280310
)
281311

282312
resp.write_stdin(tar_archive)
@@ -314,17 +344,17 @@ def _upload_tar_to_pod(self, pod_name: str, tar_archive: bytes) -> None:
314344

315345
def _kill_python_process(self, pod_name: str) -> None:
316346
"""Kill the Python process running in the pod."""
317-
with suppress(Exception):
318-
stream.stream(
319-
self.v1.connect_get_namespaced_pod_exec,
347+
try:
348+
self._stream_pod_exec(
320349
pod_name,
321-
self.namespace,
322350
command=["pkill", "-9", "python"],
323351
stderr=False,
324352
stdin=False,
325353
stdout=False,
326354
tty=False,
327355
)
356+
except Exception:
357+
logger.warning("Failed to kill Python process in pod %s", pod_name, exc_info=True)
328358

329359
@contextmanager
330360
def _run_in_pod(
@@ -369,16 +399,13 @@ def _run_in_pod(
369399
start = time.perf_counter()
370400
exec_command = ["python", "/workspace/__main__.py"]
371401

372-
exec_resp = stream.stream(
373-
self.v1.connect_get_namespaced_pod_exec,
402+
exec_resp = self._stream_pod_exec(
374403
pod_name,
375-
self.namespace,
376404
command=exec_command,
377405
stderr=True,
378406
stdin=True,
379407
stdout=True,
380408
tty=False,
381-
_preload_content=False,
382409
)
383410

384411
yield _KubeExecContext(
@@ -409,16 +436,13 @@ def _extract_workspace_snapshot(self, pod_name: str) -> tuple[WorkspaceEntry, ..
409436
]
410437

411438
logger.info(f"Starting tar extraction from pod {pod_name}")
412-
resp = stream.stream(
413-
self.v1.connect_get_namespaced_pod_exec,
439+
resp = self._stream_pod_exec(
414440
pod_name,
415-
self.namespace,
416441
command=exec_command,
417442
stderr=True,
418443
stdin=False,
419444
stdout=True,
420445
tty=False,
421-
_preload_content=False,
422446
)
423447

424448
base64_data = ""
@@ -485,14 +509,63 @@ def _extract_workspace_snapshot(self, pod_name: str) -> tuple[WorkspaceEntry, ..
485509
logger.error(f"Failed to extract workspace snapshot: {e}", exc_info=True)
486510
return tuple()
487511

512+
def _wait_for_pod_deleted(self, pod_name: str, timeout_sec: float) -> bool:
513+
deadline = time.time() + timeout_sec
514+
while time.time() < deadline:
515+
try:
516+
self.v1.read_namespaced_pod(pod_name, self.namespace)
517+
except ApiException as e:
518+
if e.status == 404:
519+
return True
520+
logger.warning(
521+
"Error while checking pod deletion for %s in namespace %s: %s",
522+
pod_name,
523+
self.namespace,
524+
e,
525+
)
526+
return False
527+
time.sleep(0.1)
528+
return False
529+
488530
def _cleanup_pod(self, pod_name: str) -> None:
489-
"""Delete a pod and wait for cleanup."""
490-
with suppress(ApiException):
491-
self.v1.delete_namespaced_pod(
492-
name=pod_name,
493-
namespace=self.namespace,
494-
body=client.V1DeleteOptions(grace_period_seconds=0),
495-
)
531+
"""Delete a pod and log any cleanup failures."""
532+
for attempt in range(1, POD_DELETE_RETRIES + 1):
533+
try:
534+
self.v1.delete_namespaced_pod(
535+
name=pod_name,
536+
namespace=self.namespace,
537+
body=client.V1DeleteOptions(grace_period_seconds=0),
538+
)
539+
except ApiException as e:
540+
if e.status == 404:
541+
return
542+
logger.warning(
543+
"Failed to delete pod %s in namespace %s on attempt %s/%s: %s",
544+
pod_name,
545+
self.namespace,
546+
attempt,
547+
POD_DELETE_RETRIES,
548+
e,
549+
)
550+
else:
551+
if self._wait_for_pod_deleted(pod_name, POD_DELETE_CONFIRM_TIMEOUT_SECONDS):
552+
return
553+
logger.warning(
554+
"Pod %s still exists after delete request on attempt %s/%s",
555+
pod_name,
556+
attempt,
557+
POD_DELETE_RETRIES,
558+
)
559+
560+
if attempt < POD_DELETE_RETRIES:
561+
time.sleep(POD_DELETE_RETRY_DELAY_SECONDS * attempt)
562+
563+
logger.error(
564+
"Failed to confirm deletion of pod %s in namespace %s after %s attempts",
565+
pod_name,
566+
self.namespace,
567+
POD_DELETE_RETRIES,
568+
)
496569

497570
def execute_python(
498571
self,

code-interpreter/tests/integration_tests/test_kubernetes_streaming.py

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@
88

99
import base64
1010
import io
11+
import logging
1112
import tarfile
1213
from typing import Any
1314
from unittest.mock import MagicMock, patch
1415

1516
import pytest
17+
from kubernetes.client.exceptions import ApiException # type: ignore[import-untyped]
1618

1719
from app.services.executor_base import StreamChunk, StreamEvent, StreamResult
1820
from app.services.executor_kubernetes import KubernetesExecutor
@@ -32,7 +34,13 @@ def executor() -> KubernetesExecutor:
3234
inst.service_account = ""
3335
pod_mock = MagicMock()
3436
pod_mock.status.phase = "Running"
35-
inst.v1.read_namespaced_pod.return_value = pod_mock
37+
38+
def _read_namespaced_pod(*args: object, **kwargs: object) -> MagicMock:
39+
if inst.v1.delete_namespaced_pod.called:
40+
raise ApiException(status=404)
41+
return pod_mock
42+
43+
inst.v1.read_namespaced_pod.side_effect = _read_namespaced_pod
3644
return inst
3745

3846

@@ -327,6 +335,62 @@ def test_streaming_cleans_up_pod(executor: KubernetesExecutor) -> None:
327335
executor.v1.delete_namespaced_pod.assert_called_once()
328336

329337

338+
def test_cleanup_retries_delete_failures(
339+
executor: KubernetesExecutor, caplog: pytest.LogCaptureFixture
340+
) -> None:
341+
executor.v1.delete_namespaced_pod.side_effect = [
342+
ApiException(status=500, reason="boom"),
343+
None,
344+
]
345+
executor.v1.read_namespaced_pod.side_effect = [
346+
MagicMock(),
347+
ApiException(status=404),
348+
]
349+
350+
with caplog.at_level(logging.WARNING):
351+
executor._cleanup_pod("code-exec-test")
352+
353+
assert executor.v1.delete_namespaced_pod.call_count == 2
354+
assert "Failed to delete pod code-exec-test" in caplog.text
355+
356+
357+
def test_cleanup_logs_when_delete_never_succeeds(
358+
executor: KubernetesExecutor, caplog: pytest.LogCaptureFixture
359+
) -> None:
360+
executor.v1.delete_namespaced_pod.side_effect = ApiException(status=500, reason="boom")
361+
362+
with caplog.at_level(logging.WARNING):
363+
executor._cleanup_pod("code-exec-test")
364+
365+
assert executor.v1.delete_namespaced_pod.call_count == 3
366+
assert "Failed to delete pod code-exec-test" in caplog.text
367+
assert "Failed to confirm deletion of pod code-exec-test" in caplog.text
368+
369+
370+
def test_stream_exec_uses_fresh_api_client(executor: KubernetesExecutor) -> None:
371+
with (
372+
patch("app.services.executor_kubernetes.client.ApiClient") as api_client_cls,
373+
patch("app.services.executor_kubernetes.client.CoreV1Api") as core_v1_cls,
374+
patch("app.services.executor_kubernetes.stream.stream") as mock_stream,
375+
):
376+
stream_api = MagicMock()
377+
core_v1_cls.return_value = stream_api
378+
379+
executor._stream_pod_exec(
380+
"code-exec-test",
381+
["python", "/workspace/__main__.py"],
382+
stderr=True,
383+
stdin=True,
384+
stdout=True,
385+
tty=False,
386+
)
387+
388+
api_client_cls.assert_called_once()
389+
core_v1_cls.assert_called_once_with(api_client=api_client_cls.return_value)
390+
assert mock_stream.call_args.args[0] is stream_api.connect_get_namespaced_pod_exec
391+
assert mock_stream.call_args.kwargs["_preload_content"] is False
392+
393+
330394
def test_streaming_empty_output(executor: KubernetesExecutor) -> None:
331395
events = _run_streaming(executor, FakeExecResp())
332396

0 commit comments

Comments
 (0)