Skip to content

Commit 2c267d1

Browse files
authored
Merge pull request #21 from onyx-dot-app/fix-concurrent-streaming
fix(kubernetes): Concurrent request failures from shared ApiClient
2 parents 4dbcaa2 + ac3300b commit 2c267d1

File tree

1 file changed

+93
-0
lines changed

1 file changed

+93
-0
lines changed
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
"""Reproduce: concurrent /v1/execute requests fail with "Handshake status 200 OK".
2+
3+
The KubernetesExecutor uses a single CoreV1Api client for both REST and
4+
streaming operations. stream.stream() temporarily monkey-patches the
5+
shared api_client.request to use WebSocket. Under concurrent load, a
6+
REST call from one request can land during another request's patch window,
7+
causing a WebSocket handshake against a non-WebSocket endpoint.
8+
9+
This test fires multiple concurrent requests at the code interpreter and
10+
asserts that all succeed. With the current bug, at least one will fail
11+
with an error containing "Handshake status" or a 500 status.
12+
13+
After the fix (separate ApiClient instances for REST vs streaming), all
14+
requests should succeed.
15+
"""
16+
17+
from __future__ import annotations
18+
19+
import concurrent.futures
20+
from typing import Any, Final
21+
22+
import httpx
23+
import pytest
24+
25+
BASE_URL: Final[str] = "http://localhost:8000"
26+
# Number of concurrent requests — enough to reliably trigger the race.
27+
CONCURRENCY: Final[int] = 5
28+
29+
30+
def _execute_request(index: int) -> dict[str, Any]:
31+
"""Send a single /v1/execute request and return the parsed result.
32+
33+
Raises on transport errors or non-200 status so the caller can
34+
collect failures.
35+
"""
36+
timeout = httpx.Timeout(60.0, connect=10.0)
37+
with httpx.Client(base_url=BASE_URL, timeout=timeout) as client:
38+
response = client.post(
39+
"/v1/execute",
40+
json={
41+
"code": f"print('request {index}')",
42+
"timeout_ms": 30000,
43+
},
44+
)
45+
response.raise_for_status()
46+
return {"index": index, "result": response.json()}
47+
48+
49+
def test_concurrent_execute_requests_all_succeed() -> None:
50+
"""Fire N concurrent /v1/execute requests.
51+
52+
With the shared-client bug, overlapping stream.stream() calls cause
53+
REST calls to be routed through the WebSocket path, producing errors
54+
like "Handshake status 200 OK".
55+
56+
All N requests must return exit_code == 0 for this test to pass.
57+
"""
58+
# Verify the service is reachable first
59+
timeout = httpx.Timeout(10.0, connect=5.0)
60+
with httpx.Client(base_url=BASE_URL, timeout=timeout) as client:
61+
try:
62+
health = client.get("/health")
63+
except httpx.TransportError as exc:
64+
pytest.fail(f"Code interpreter not reachable at {BASE_URL}: {exc}")
65+
assert health.status_code == 200 and health.json()["status"] == "ok"
66+
67+
# Fire concurrent requests
68+
results: list[dict[str, Any]] = []
69+
errors: list[str] = []
70+
71+
with concurrent.futures.ThreadPoolExecutor(max_workers=CONCURRENCY) as pool:
72+
futures = {pool.submit(_execute_request, i): i for i in range(CONCURRENCY)}
73+
74+
for future in concurrent.futures.as_completed(futures):
75+
idx = futures[future]
76+
try:
77+
results.append(future.result())
78+
except Exception as exc:
79+
errors.append(f"request {idx}: {exc}")
80+
81+
# Report all failures together for easier debugging
82+
assert not errors, f"{len(errors)}/{CONCURRENCY} concurrent requests failed:\n" + "\n".join(
83+
errors
84+
)
85+
86+
# Every successful response should have exit_code == 0
87+
for r in results:
88+
result = r["result"]
89+
assert result["exit_code"] == 0, (
90+
f"request {r['index']} failed: "
91+
f"stdout={result.get('stdout')!r} "
92+
f"stderr={result.get('stderr')!r}"
93+
)

0 commit comments

Comments
 (0)