Skip to content

Commit ef0ee43

Browse files
committed
Fix concurrent
1 parent bbe245c commit ef0ee43

File tree

2 files changed

+104
-5
lines changed

2 files changed

+104
-5
lines changed

code-interpreter/app/services/executor_kubernetes.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,12 @@ def __init__(self) -> None:
7575
except config.ConfigException:
7676
config.load_kube_config()
7777

78-
self.v1 = client.CoreV1Api()
78+
# Use separate ApiClient instances for REST vs streaming operations.
79+
# stream.stream() monkey-patches ApiClient.request to use WebSocket.
80+
# Sharing an ApiClient causes REST calls to be routed through the
81+
# WebSocket path ("Handshake status 200 OK" errors).
82+
self.v1 = client.CoreV1Api(api_client=client.ApiClient())
83+
self.v1_streaming = client.CoreV1Api(api_client=client.ApiClient())
7984
self.namespace = KUBERNETES_EXECUTOR_NAMESPACE
8085
self.image = KUBERNETES_EXECUTOR_IMAGE
8186
self.service_account = KUBERNETES_EXECUTOR_SERVICE_ACCOUNT
@@ -268,7 +273,7 @@ def _upload_tar_to_pod(self, pod_name: str, tar_archive: bytes) -> None:
268273
logger.info(f"Uploading tar archive ({len(tar_archive)} bytes) to pod {pod_name}")
269274
exec_command = ["tar", "-x", "-C", "/workspace"]
270275
resp = stream.stream(
271-
self.v1.connect_get_namespaced_pod_exec,
276+
self.v1_streaming.connect_get_namespaced_pod_exec,
272277
pod_name,
273278
self.namespace,
274279
command=exec_command,
@@ -316,7 +321,7 @@ def _kill_python_process(self, pod_name: str) -> None:
316321
"""Kill the Python process running in the pod."""
317322
with suppress(Exception):
318323
stream.stream(
319-
self.v1.connect_get_namespaced_pod_exec,
324+
self.v1_streaming.connect_get_namespaced_pod_exec,
320325
pod_name,
321326
self.namespace,
322327
command=["pkill", "-9", "python"],
@@ -370,7 +375,7 @@ def _run_in_pod(
370375
exec_command = ["python", "/workspace/__main__.py"]
371376

372377
exec_resp = stream.stream(
373-
self.v1.connect_get_namespaced_pod_exec,
378+
self.v1_streaming.connect_get_namespaced_pod_exec,
374379
pod_name,
375380
self.namespace,
376381
command=exec_command,
@@ -410,7 +415,7 @@ def _extract_workspace_snapshot(self, pod_name: str) -> tuple[WorkspaceEntry, ..
410415

411416
logger.info(f"Starting tar extraction from pod {pod_name}")
412417
resp = stream.stream(
413-
self.v1.connect_get_namespaced_pod_exec,
418+
self.v1_streaming.connect_get_namespaced_pod_exec,
414419
pod_name,
415420
self.namespace,
416421
command=exec_command,
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
"""Reproduce: concurrent /v1/execute requests fail with "Handshake status 200 OK".
2+
3+
The KubernetesExecutor uses a single CoreV1Api client for both REST and
4+
streaming operations. stream.stream() temporarily monkey-patches the
5+
shared api_client.request to use WebSocket. Under concurrent load, a
6+
REST call from one request can land during another request's patch window,
7+
causing a WebSocket handshake against a non-WebSocket endpoint.
8+
9+
This test fires multiple concurrent requests at the code interpreter and
10+
asserts that all succeed. With the current bug, at least one will fail
11+
with an error containing "Handshake status" or a 500 status.
12+
13+
After the fix (separate ApiClient instances for REST vs streaming), all
14+
requests should succeed.
15+
"""
16+
17+
from __future__ import annotations
18+
19+
import concurrent.futures
20+
from typing import Any, Final
21+
22+
import httpx
23+
import pytest
24+
25+
BASE_URL: Final[str] = "http://localhost:8000"
26+
# Number of concurrent requests — enough to reliably trigger the race.
27+
CONCURRENCY: Final[int] = 5
28+
29+
30+
def _execute_request(index: int) -> dict[str, Any]:
31+
"""Send a single /v1/execute request and return the parsed result.
32+
33+
Raises on transport errors or non-200 status so the caller can
34+
collect failures.
35+
"""
36+
timeout = httpx.Timeout(60.0, connect=10.0)
37+
with httpx.Client(base_url=BASE_URL, timeout=timeout) as client:
38+
response = client.post(
39+
"/v1/execute",
40+
json={
41+
"code": f"print('request {index}')",
42+
"timeout_ms": 30000,
43+
},
44+
)
45+
response.raise_for_status()
46+
return {"index": index, "result": response.json()}
47+
48+
49+
def test_concurrent_execute_requests_all_succeed() -> None:
50+
"""Fire N concurrent /v1/execute requests.
51+
52+
With the shared-client bug, overlapping stream.stream() calls cause
53+
REST calls to be routed through the WebSocket path, producing errors
54+
like "Handshake status 200 OK".
55+
56+
All N requests must return exit_code == 0 for this test to pass.
57+
"""
58+
# Verify the service is reachable first
59+
timeout = httpx.Timeout(10.0, connect=5.0)
60+
with httpx.Client(base_url=BASE_URL, timeout=timeout) as client:
61+
try:
62+
health = client.get("/health")
63+
except httpx.TransportError as exc:
64+
pytest.fail(f"Code interpreter not reachable at {BASE_URL}: {exc}")
65+
assert health.status_code == 200 and health.json()["status"] == "ok"
66+
67+
# Fire concurrent requests
68+
results: list[dict[str, Any]] = []
69+
errors: list[str] = []
70+
71+
with concurrent.futures.ThreadPoolExecutor(max_workers=CONCURRENCY) as pool:
72+
futures = {pool.submit(_execute_request, i): i for i in range(CONCURRENCY)}
73+
74+
for future in concurrent.futures.as_completed(futures):
75+
idx = futures[future]
76+
try:
77+
results.append(future.result())
78+
except Exception as exc:
79+
errors.append(f"request {idx}: {exc}")
80+
81+
# Report all failures together for easier debugging
82+
assert not errors, (
83+
f"{len(errors)}/{CONCURRENCY} concurrent requests failed:\n"
84+
+ "\n".join(errors)
85+
)
86+
87+
# Every successful response should have exit_code == 0
88+
for r in results:
89+
result = r["result"]
90+
assert result["exit_code"] == 0, (
91+
f"request {r['index']} failed: "
92+
f"stdout={result.get('stdout')!r} "
93+
f"stderr={result.get('stderr')!r}"
94+
)

0 commit comments

Comments
 (0)