Skip to content

Commit 1a4b134

Browse files
fix: backfill rollout status fields from logs when polling completes
The lightweight `/status` endpoint on the tracing gateway only returns the status code; `Message`, `Details`, and `Extras` still live on the Logs table. After PR #446 stopped reading from `/logs` on terminal status, the SDK was constructing `Status(code=..., message="", details=[])` for every completed rollout and `EvalProtocolError(message="")` for failures, which broke `tests/remote_server/test_remote_fireworks_propagate_status.py` (`assert row.rollout_status.message == "test error"`). Restore the two-phase polling shape from the original PR: poll `/status` for the code, and on a terminal (non-RUNNING) code do one `async_search_logs` call to backfill `message`/`details`/`extras` from the matching log row. This is still ~1000x cheaper on the Logs table than the pre-#446 polling loop because the search runs once per rollout completion instead of every poll interval. Made-with: Cursor
1 parent 86a52a4 commit 1a4b134

1 file changed

Lines changed: 23 additions & 4 deletions

File tree

eval_protocol/pytest/remote_rollout_processor.py

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,28 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow:
139139
status_code,
140140
)
141141

142-
status_message = status.get("message", "") or ""
143-
status_details = status.get("details", []) or []
142+
# /status is a point-read on the Status table that only
143+
# carries the status code today. Backfill message/details/
144+
# extras from the Logs table with a single read once the
145+
# rollout has reached a terminal state.
146+
status_message: str = ""
147+
status_details: list = []
148+
status_extras: dict = {}
149+
completed_logs = await self._tracing_adapter.async_search_logs(
150+
session, tags=[f"rollout_id:{row.execution_metadata.rollout_id}"]
151+
)
152+
for log in completed_logs:
153+
sd = log.get("status")
154+
if sd and isinstance(sd, dict) and "code" in sd:
155+
status_message = sd.get("message", "") or ""
156+
status_details = sd.get("details", []) or []
157+
raw_extras = log.get("extras") or {}
158+
status_extras = {
159+
k: v
160+
for k, v in raw_extras.items()
161+
if k not in ("logger_name", "level", "timestamp")
162+
}
163+
break
144164

145165
exception = exception_for_status_code(status_code, status_message)
146166
if exception is not None:
@@ -152,8 +172,7 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow:
152172
details=status_details,
153173
)
154174

155-
status_extras = (status_result or {}).get("extras")
156-
if isinstance(status_extras, dict):
175+
if status_extras:
157176
if row.execution_metadata.extra:
158177
row.execution_metadata.extra.update(status_extras)
159178
else:

0 commit comments

Comments
 (0)