-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathtest_concurrent_execute.py
More file actions
93 lines (74 loc) · 3.35 KB
/
test_concurrent_execute.py
File metadata and controls
93 lines (74 loc) · 3.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
"""Reproduce: concurrent /v1/execute requests fail with "Handshake status 200 OK".
The KubernetesExecutor uses a single CoreV1Api client for both REST and
streaming operations. stream.stream() temporarily monkey-patches the
shared api_client.request to use WebSocket. Under concurrent load, a
REST call from one request can land during another request's patch window,
causing a WebSocket handshake against a non-WebSocket endpoint.
This test fires multiple concurrent requests at the code interpreter and
asserts that all succeed. With the current bug, at least one will fail
with an error containing "Handshake status" or a 500 status.
After the fix (separate ApiClient instances for REST vs streaming), all
requests should succeed.
"""
from __future__ import annotations
import concurrent.futures
from typing import Any, Final
import httpx
import pytest
BASE_URL: Final[str] = "http://localhost:8000"
# Number of concurrent requests — enough to reliably trigger the race.
CONCURRENCY: Final[int] = 5
def _execute_request(index: int) -> dict[str, Any]:
"""Send a single /v1/execute request and return the parsed result.
Raises on transport errors or non-200 status so the caller can
collect failures.
"""
timeout = httpx.Timeout(60.0, connect=10.0)
with httpx.Client(base_url=BASE_URL, timeout=timeout) as client:
response = client.post(
"/v1/execute",
json={
"code": f"print('request {index}')",
"timeout_ms": 30000,
},
)
response.raise_for_status()
return {"index": index, "result": response.json()}
def test_concurrent_execute_requests_all_succeed() -> None:
"""Fire N concurrent /v1/execute requests.
With the shared-client bug, overlapping stream.stream() calls cause
REST calls to be routed through the WebSocket path, producing errors
like "Handshake status 200 OK".
All N requests must return exit_code == 0 for this test to pass.
"""
# Verify the service is reachable first
timeout = httpx.Timeout(10.0, connect=5.0)
with httpx.Client(base_url=BASE_URL, timeout=timeout) as client:
try:
health = client.get("/health")
except httpx.TransportError as exc:
pytest.fail(f"Code interpreter not reachable at {BASE_URL}: {exc}")
assert health.status_code == 200 and health.json()["status"] == "ok"
# Fire concurrent requests
results: list[dict[str, Any]] = []
errors: list[str] = []
with concurrent.futures.ThreadPoolExecutor(max_workers=CONCURRENCY) as pool:
futures = {pool.submit(_execute_request, i): i for i in range(CONCURRENCY)}
for future in concurrent.futures.as_completed(futures):
idx = futures[future]
try:
results.append(future.result())
except Exception as exc:
errors.append(f"request {idx}: {exc}")
# Report all failures together for easier debugging
assert not errors, f"{len(errors)}/{CONCURRENCY} concurrent requests failed:\n" + "\n".join(
errors
)
# Every successful response should have exit_code == 0
for r in results:
result = r["result"]
assert result["exit_code"] == 0, (
f"request {r['index']} failed: "
f"stdout={result.get('stdout')!r} "
f"stderr={result.get('stderr')!r}"
)