Skip to content

Commit d0e02d7

Browse files
ChingEnLinclaude
andauthored
feat(argus): structured observers + live progress polling (#39)
* feat(argus): wire structured observers + live progress polling Bump QueryArgus to PR #2 merge (observability foundation) and consume the new RunObserver hooks across QueryPal. Backend - Attach JsonFormatter handler to the `queryargus.run` logger at app startup so structured events flow into the existing log pipeline. - Wire StructuredLogObserver (module-level singleton) and a fresh CostTracker per request into each Argus run. - Surface report.cost on the serialized response so the UI can render USD cost alongside token counts. - Add LiveEventBuffer (per-job, ring-buffered, thread-safe) as a third observer; expose `GET /argus/runs/{job_id}/events?cursor=N` with rolled-up aggregates for live progress polling. Same auth-scoping as get_run (404 leak-safe). - Emit finding trace as JSONL so the UI can render structured steps. Frontend - ArgusReport gains `cost: RunCost | null` and the report header now has a token<->USD toggle pill with per-model breakdown on hover. - TraceView component parses the JSONL trace into per-step blocks (iter chip, action, optional gate badge, reason); falls back to the legacy <pre> for older reports. - NotificationsContext polls the new events endpoint in parallel with the status check, maintains a per-job cursor, and exposes runProgress. - AppTopBar "In progress" rows show a live second line: `iter N · M findings · K tok · → last_tool` plus a red tool-error chip when any have failed. No SSE, no new tables, no async refactor. Existing `GET /argus/runs/{job_id}` contract unchanged. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix(argus): defer log handler install to lifespan + black - Module-level handler install on `queryargus.run` set `propagate=False`, which broke the upstream observability tests that rely on `caplog` (collection-time imports of `main` would silence the root logger before pytest could capture records). Move into a lifespan helper so the handler only attaches when the app actually boots; tests are unaffected. - The helper is idempotent (handler tagged with `_querypal_argus`) so a double lifespan run during tests can't double-install. - Reformat with black to satisfy CI. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
1 parent dca0412 commit d0e02d7

8 files changed

Lines changed: 492 additions & 35 deletions

File tree

backend/main.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,37 @@
1+
import logging
12
import os
23
from contextlib import asynccontextmanager
34

45
import uvicorn
56
from fastapi import FastAPI
67
from fastapi.middleware.cors import CORSMiddleware
8+
from queryargus.observability.logging_observer import JsonFormatter
79
from routes import query, azure, system, user_queries, data_documents, audit, argus
810

911

12+
def _install_argus_log_handler() -> None:
13+
"""Route the ``queryargus.run`` logger to stderr as one JSON line per event.
14+
15+
Called from lifespan rather than at module import so test runners (which
16+
rely on ``caplog`` propagating records to the root logger) are unaffected
17+
when something imports ``main`` during collection.
18+
"""
19+
logger = logging.getLogger("queryargus.run")
20+
if any(getattr(h, "_querypal_argus", False) for h in logger.handlers):
21+
return # idempotent — lifespan may run twice under some test harnesses
22+
handler = logging.StreamHandler()
23+
handler.setFormatter(JsonFormatter())
24+
handler._querypal_argus = True # type: ignore[attr-defined]
25+
logger.addHandler(handler)
26+
logger.setLevel(logging.INFO)
27+
logger.propagate = False
28+
29+
1030
@asynccontextmanager
1131
async def lifespan(app: FastAPI):
1232
from services.argus_store import get_report_store
1333

34+
_install_argus_log_handler()
1435
get_report_store()
1536
yield
1637

backend/queryargus

Submodule queryargus updated 51 files

backend/routes/argus.py

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import json
45
import logging
56
import uuid
67
from datetime import datetime, timezone
@@ -22,6 +23,9 @@
2223
from queryargus.models.connection import CosmosConnection
2324
from queryargus.models.finding import Finding
2425
from queryargus.models.report import AuditReport
26+
from queryargus.observability.cost import CostTracker
27+
from queryargus.observability.logging_observer import StructuredLogObserver
28+
from services.argus_live_events import LiveEventBuffer
2529
from services.argus_profiles_service import (
2630
ProfileNameConflict,
2731
create_profile,
@@ -71,6 +75,11 @@
7175
_JOBS: dict[str, dict[str, Any]] = {}
7276
_MAX_JOBS = 50
7377

78+
# StructuredLogObserver carries no per-run state besides the current run_id
79+
# (re-set in on_run_start), so a module-level singleton is safe across requests.
80+
# CostTracker must be constructed per-request — it accumulates token buckets.
81+
_LOG_OBSERVER = StructuredLogObserver()
82+
7483

7584
class AuditRequest(BaseModel):
7685
account_id: str
@@ -97,17 +106,26 @@ def _summary(description: str) -> str:
97106

98107

99108
def _finding_trace(report: AuditReport, finding: Finding) -> str:
109+
"""One JSON object per relevant agent step (JSONL).
110+
111+
The frontend parses each line and renders a structured block. Falls back
112+
gracefully to plain-text display if a line fails to parse.
113+
"""
100114
field = finding.field
101115
lines: list[str] = []
102116
for i, action in enumerate(report.run_trace, start=1):
103117
inp_repr = repr(action.action_input)
104118
is_write = action.action == "write_finding" and field in inp_repr
105119
if field not in inp_repr and not is_write:
106120
continue
107-
lines.append(f"iter {i} · {action.action}")
108-
lines.append(f"reason: {action.reasoning}")
121+
entry: dict[str, Any] = {
122+
"iter": i,
123+
"action": action.action,
124+
"reason": action.reasoning,
125+
}
109126
if is_write:
110-
lines.append("finding gate: PASS")
127+
entry["gate"] = "PASS"
128+
lines.append(json.dumps(entry, ensure_ascii=False, default=str))
111129
return "\n".join(lines)
112130

113131

@@ -235,6 +253,9 @@ def _serialize_report(
235253
"counts": counts,
236254
"diff": diff_counts,
237255
"findings": findings,
256+
"cost": (
257+
report.cost.model_dump(mode="json") if report.cost is not None else None
258+
),
238259
"created_by": created_by,
239260
"history": None,
240261
}
@@ -362,11 +383,14 @@ async def _execute_job(
362383
else:
363384
judge_model_name = jm
364385
judge_llm = GeminiClient(model=judge_model_name)
386+
live = LiveEventBuffer()
387+
job["live"] = live
365388
agent = ArgusAgent.from_config(
366389
config=config,
367390
llm=llm,
368391
judge_llm=judge_llm,
369392
judge_model_name=judge_model_name,
393+
observers=[_LOG_OBSERVER, CostTracker(), live],
370394
)
371395

372396
history = None
@@ -500,6 +524,38 @@ async def get_run(job_id: str, authorization: str = Header(...)):
500524
)
501525

502526

527+
@router.get("/runs/{job_id}/events")
528+
async def get_run_events(
529+
job_id: str,
530+
authorization: str = Header(...),
531+
cursor: int = Query(default=0, ge=0),
532+
):
533+
"""Live event snapshot for a still-running job.
534+
535+
`cursor` is the value returned in the previous poll's `next_cursor`. The
536+
response also carries rolled-up aggregates (current_iter, findings_count,
537+
running token totals, last_action / last_tool) so the UI can render
538+
progress without re-folding the event stream.
539+
540+
Reuses the same caller-scoping rule as `get_run`: cross-tenant attempts
541+
return 404, not 403, to avoid leaking job existence.
542+
"""
543+
if not authorization.startswith("Bearer "):
544+
raise HTTPException(status_code=401, detail="Invalid token format")
545+
caller_email = extract_email_from_token(authorization[7:])
546+
job = _JOBS.get(job_id)
547+
if job is None:
548+
raise HTTPException(status_code=404, detail="Job not found")
549+
if job.get("created_by") and job["created_by"] != caller_email:
550+
raise HTTPException(status_code=404, detail="Job not found")
551+
live: Optional[LiveEventBuffer] = job.get("live")
552+
if live is None:
553+
# Run hasn't reached the observer-attach point yet (still waiting on
554+
# Azure auth / connection-string), or job pre-dates this feature.
555+
return JSONResponse(content={"events": [], "next_cursor": 0, "aggregates": {}})
556+
return JSONResponse(content=live.snapshot(since=cursor))
557+
558+
503559
@router.get("/runs")
504560
async def list_runs(
505561
authorization: str = Header(...),
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
"""Per-job in-memory buffer of structured agent events for live progress polling.
2+
3+
Attached as a third observer alongside StructuredLogObserver + CostTracker on
4+
each Argus run. Mirrors StructuredLogObserver's event shapes so the frontend
5+
can render one timeline regardless of source.
6+
7+
Thread-safe: ArgusAgent.run is invoked via run_in_threadpool, so the FastAPI
8+
request loop reads `snapshot()` from one thread while the worker thread writes
9+
events from another.
10+
11+
Bounded by a ring buffer so a runaway agent cannot OOM the process; the live
12+
view is for progress, not the system of record (the persisted AuditReport is).
13+
"""
14+
15+
from __future__ import annotations
16+
17+
from collections import deque
18+
from datetime import datetime, timezone
19+
from threading import Lock
20+
from typing import Any, Literal
21+
from uuid import UUID
22+
23+
from queryargus.llm.client import TokenUsage
24+
from queryargus.models.action import AgentAction
25+
from queryargus.models.finding import Finding
26+
from queryargus.models.report import AuditReport
27+
28+
_MAX_EVENTS_PER_JOB = 500
29+
30+
31+
def _ts() -> str:
32+
return datetime.now(timezone.utc).isoformat()
33+
34+
35+
class LiveEventBuffer:
36+
def __init__(self) -> None:
37+
self._events: deque[dict[str, Any]] = deque(maxlen=_MAX_EVENTS_PER_JOB)
38+
self._lock = Lock()
39+
self._run_id: str | None = None
40+
self.current_iter = 0
41+
self.findings_count = 0
42+
self.input_tokens = 0
43+
self.output_tokens = 0
44+
self.last_action: str | None = None
45+
self.last_tool: str | None = None
46+
self.tool_errors = 0
47+
48+
def _push(self, event: str, /, **extras: object) -> None:
49+
with self._lock:
50+
self._events.append(
51+
{"event": event, "run_id": self._run_id, "ts": _ts(), **extras}
52+
)
53+
54+
def snapshot(self, since: int = 0) -> dict[str, Any]:
55+
with self._lock:
56+
evs = list(self._events)
57+
aggregates = {
58+
"current_iter": self.current_iter,
59+
"findings_count": self.findings_count,
60+
"input_tokens": self.input_tokens,
61+
"output_tokens": self.output_tokens,
62+
"last_action": self.last_action,
63+
"last_tool": self.last_tool,
64+
"tool_errors": self.tool_errors,
65+
}
66+
tail = evs[since:] if since < len(evs) else []
67+
return {"events": tail, "next_cursor": len(evs), "aggregates": aggregates}
68+
69+
# RunObserver protocol -------------------------------------------------
70+
71+
def on_run_start(self, *, run_id: UUID, collection: str) -> None:
72+
self._run_id = str(run_id)
73+
self._push("run_start", collection=collection)
74+
75+
def on_iteration_start(self, *, iter: int) -> None:
76+
with self._lock:
77+
self.current_iter = iter
78+
self._push("iteration_start", iter=iter)
79+
80+
def on_llm_call(
81+
self,
82+
*,
83+
purpose: Literal["propose_action", "self_eval", "judge"],
84+
model: str,
85+
usage: TokenUsage,
86+
latency_ms: int,
87+
) -> None:
88+
with self._lock:
89+
self.input_tokens += usage.input_tokens
90+
self.output_tokens += usage.output_tokens
91+
self._push(
92+
"llm_call",
93+
purpose=purpose,
94+
model=model,
95+
input_tokens=usage.input_tokens,
96+
output_tokens=usage.output_tokens,
97+
latency_ms=latency_ms,
98+
)
99+
100+
def on_tool_call(
101+
self,
102+
*,
103+
name: str,
104+
args_summary: str,
105+
ok: bool,
106+
latency_ms: int,
107+
error: str | None,
108+
) -> None:
109+
with self._lock:
110+
self.last_tool = name
111+
if not ok:
112+
self.tool_errors += 1
113+
self._push(
114+
"tool_call",
115+
tool=name,
116+
args_summary=args_summary,
117+
ok=ok,
118+
latency_ms=latency_ms,
119+
error=error,
120+
)
121+
122+
def on_action(self, *, action: AgentAction) -> None:
123+
with self._lock:
124+
self.last_action = action.action
125+
self._push("action", action=action.action, confidence=action.confidence)
126+
127+
def on_finding(self, *, finding: Finding) -> None:
128+
with self._lock:
129+
self.findings_count += 1
130+
self._push(
131+
"finding",
132+
field=finding.field,
133+
category=finding.category,
134+
severity=str(finding.severity),
135+
)
136+
137+
def on_eval(
138+
self,
139+
*,
140+
target: Literal["action", "finding", "run"],
141+
verdict: str,
142+
score: float,
143+
evaluator: str,
144+
) -> None:
145+
self._push(
146+
"eval", target=target, verdict=verdict, score=score, evaluator=evaluator
147+
)
148+
149+
def on_run_complete(self, *, report: AuditReport) -> None:
150+
self._push(
151+
"run_complete",
152+
findings_count=len(report.findings),
153+
duration_ms=int(report.duration_seconds * 1000),
154+
usd_total=(report.cost.usd_total if report.cost is not None else None),
155+
)

0 commit comments

Comments
 (0)