Skip to content

Commit a3ab5b2

Browse files
fix: retry on empty SSE stream to handle premature load balancer disconnects
1 parent 1cb71a5 commit a3ab5b2

1 file changed

Lines changed: 44 additions & 55 deletions

File tree

packages/opencode/src/skill/validate/batch_validate.py

Lines changed: 44 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -120,26 +120,47 @@ def _parse_sse_stream(response):
120120
# ---------------------------------------------------------------------------
121121
# Backend API calls
122122
# ---------------------------------------------------------------------------
123+
_EMPTY_STREAM_RETRIES = 3
124+
_EMPTY_STREAM_BACKOFF = 2 # seconds
125+
126+
127+
def _stream_post(url, payload, timeout):
128+
"""POST to url, stream SSE response. Returns list of trace_result/trace_error events."""
129+
import time
130+
for attempt in range(1, _EMPTY_STREAM_RETRIES + 1):
131+
results = []
132+
try:
133+
resp = _SESSION.post(url, json=payload, headers=_HEADERS, stream=True, timeout=timeout)
134+
resp.raise_for_status()
135+
for event in _parse_sse_stream(resp):
136+
_log_event(event)
137+
if event.get("event") in ("trace_result", "trace_error"):
138+
results.append(event)
139+
except Exception as e:
140+
print(f" ERROR: {e}", file=sys.stderr)
141+
return results
142+
143+
if results:
144+
return results
145+
146+
if attempt < _EMPTY_STREAM_RETRIES:
147+
print(
148+
f" Warning: stream returned no events (attempt {attempt}/{_EMPTY_STREAM_RETRIES}), "
149+
f"retrying in {_EMPTY_STREAM_BACKOFF}s...",
150+
file=sys.stderr,
151+
)
152+
time.sleep(_EMPTY_STREAM_BACKOFF)
153+
154+
print(f" Warning: stream returned no events after {_EMPTY_STREAM_RETRIES} attempts.", file=sys.stderr)
155+
return []
156+
157+
123158
def validate_single_trace(trace_id):
124159
"""Call POST /validate for a single trace. Returns list of result dicts."""
125160
print(f"Validating trace: {trace_id}...", file=sys.stderr)
126-
results = []
127-
try:
128-
resp = _SESSION.post(
129-
f"{BASE_URL}/validate",
130-
json={"trace_id": trace_id},
131-
headers=_HEADERS,
132-
stream=True,
133-
timeout=300,
134-
)
135-
resp.raise_for_status()
136-
for event in _parse_sse_stream(resp):
137-
_log_event(event)
138-
if event.get("event") in ("trace_result", "trace_error"):
139-
results.append(event)
140-
except Exception as e:
141-
print(f" ERROR: {e}", file=sys.stderr)
142-
results.append({"event": "trace_error", "trace_id": trace_id, "error": str(e)})
161+
results = _stream_post(f"{BASE_URL}/validate", {"trace_id": trace_id}, timeout=300)
162+
if not results:
163+
results.append({"event": "trace_error", "trace_id": trace_id, "error": "Stream returned no events"})
143164
return results
144165

145166

@@ -149,49 +170,17 @@ def validate_date_range(user_id, from_datetime, to_datetime):
149170
f"Validating traces for user '{user_id}' from {from_datetime} to {to_datetime}...",
150171
file=sys.stderr,
151172
)
152-
results = []
153-
try:
154-
resp = _SESSION.post(
155-
f"{BASE_URL}/validate/date-range",
156-
json={
157-
"user_id": user_id,
158-
"from_datetime": from_datetime,
159-
"to_datetime": to_datetime,
160-
},
161-
headers=_HEADERS,
162-
stream=True,
163-
timeout=600,
164-
)
165-
resp.raise_for_status()
166-
for event in _parse_sse_stream(resp):
167-
_log_event(event)
168-
if event.get("event") in ("trace_result", "trace_error"):
169-
results.append(event)
170-
except Exception as e:
171-
print(f" ERROR: {e}", file=sys.stderr)
172-
return results
173+
return _stream_post(
174+
f"{BASE_URL}/validate/date-range",
175+
{"user_id": user_id, "from_datetime": from_datetime, "to_datetime": to_datetime},
176+
timeout=600,
177+
)
173178

174179

175180
def validate_session(session_id):
176181
"""Call POST /validate/session. Returns list of result dicts."""
177182
print(f"Validating all traces in session: {session_id}...", file=sys.stderr)
178-
results = []
179-
try:
180-
resp = _SESSION.post(
181-
f"{BASE_URL}/validate/session",
182-
json={"session_id": session_id},
183-
headers=_HEADERS,
184-
stream=True,
185-
timeout=600,
186-
)
187-
resp.raise_for_status()
188-
for event in _parse_sse_stream(resp):
189-
_log_event(event)
190-
if event.get("event") in ("trace_result", "trace_error"):
191-
results.append(event)
192-
except Exception as e:
193-
print(f" ERROR: {e}", file=sys.stderr)
194-
return results
183+
return _stream_post(f"{BASE_URL}/validate/session", {"session_id": session_id}, timeout=600)
195184

196185

197186
def _log_event(event):

0 commit comments

Comments
 (0)