From 519b8297086338905e26f99464b0b8abe95642d0 Mon Sep 17 00:00:00 2001 From: Adam Gohain <68021524+akgohain@users.noreply.github.com> Date: Mon, 13 Apr 2026 16:02:06 -0400 Subject: [PATCH] Add failure hotspot workflow proposal action --- server_api/workflow/__init__.py | 5 + server_api/workflow/proposals.py | 169 ++++++++++++++++++++++++ tests/test_workflow_hotspot_proposal.py | 58 ++++++++ 3 files changed, 232 insertions(+) create mode 100644 server_api/workflow/__init__.py create mode 100644 server_api/workflow/proposals.py create mode 100644 tests/test_workflow_hotspot_proposal.py diff --git a/server_api/workflow/__init__.py b/server_api/workflow/__init__.py new file mode 100644 index 0000000..a20538e --- /dev/null +++ b/server_api/workflow/__init__.py @@ -0,0 +1,5 @@ +"""Workflow proposal APIs.""" + +from .proposals import PRIORITIZE_FAILURE_HOTSPOTS, propose_failure_hotspots + +__all__ = ["PRIORITIZE_FAILURE_HOTSPOTS", "propose_failure_hotspots"] diff --git a/server_api/workflow/proposals.py b/server_api/workflow/proposals.py new file mode 100644 index 0000000..ac924e5 --- /dev/null +++ b/server_api/workflow/proposals.py @@ -0,0 +1,169 @@ +"""Workflow-agent proposal helpers. + +This module currently exposes a single approval-gated, read-only proposal action +for prioritizing failure hotspots from recent inference/proofreading events. +""" + +from __future__ import annotations + +from collections import defaultdict +from typing import Any, Dict, Iterable, List + +PRIORITIZE_FAILURE_HOTSPOTS = "prioritize_failure_hotspots" + +_EVENT_TYPE_FIELDS = ( + "event_type", + "type", + "name", +) +_CANDIDATE_ID_FIELDS = ( + "region_id", + "item_id", + "instance_id", + "tile_id", + "fov_id", + "object_id", +) + + +def _normalized_event_type(event: Dict[str, Any]) -> str: + for field in _EVENT_TYPE_FIELDS: + value = event.get(field) + if value: + return str(value).strip().lower() + return "" + + +def _candidate_key(event: Dict[str, Any]) -> str: + for field in _CANDIDATE_ID_FIELDS: + value = event.get(field) + if value not in (None, ""): + return f"{field}:{value}" + + location = event.get("location") + if isinstance(location, dict): + x = location.get("x") + y = location.get("y") + z = location.get("z") + if x is not None and y is not None and z is not None: + return f"location:{x},{y},{z}" + + if event.get("target"): + return f"target:{event['target']}" + + return "unknown" + + +def _event_weight(event: Dict[str, Any], event_type: str) -> int: + score = 1 + + if "proofread" in event_type: + score += 2 + if "infer" in event_type: + score += 2 + if any(token in event_type for token in ("fail", "error", "reject", "undo")): + score += 3 + + severity = str(event.get("severity", "")).lower().strip() + if severity in {"high", "critical"}: + score += 2 + elif severity == "medium": + score += 1 + + if bool(event.get("requires_rework")): + score += 1 + + return score + + +def propose_failure_hotspots( + events: Iterable[Dict[str, Any]], + *, + top_k: int = 3, +) -> Dict[str, Any]: + """Create a read-only hotspot proposal from recent events. + + Heuristic-only ranking is used; no model inference or state mutation occurs. + """ + + ranked = defaultdict( + lambda: { + "item": None, + "score": 0, + "event_count": 0, + "proofreading_events": 0, + "inference_events": 0, + "failure_events": 0, + } + ) + + for event in events: + event_type = _normalized_event_type(event) + if not event_type: + continue + + touches_relevant_flow = "proofread" in event_type or "infer" in event_type + if not touches_relevant_flow: + continue + + is_failure_signal = any( + token in event_type for token in ("fail", "error", "reject", "undo") + ) or bool(event.get("requires_rework")) + if not is_failure_signal: + continue + + item = _candidate_key(event) + group = ranked[item] + group["item"] = item + group["score"] += _event_weight(event, event_type) + group["event_count"] += 1 + group["proofreading_events"] += int("proofread" in event_type) + group["inference_events"] += int("infer" in event_type) + group["failure_events"] += int( + any(token in event_type for token in ("fail", "error", "reject", "undo")) + ) + + ranked_items = sorted( + ranked.values(), + key=lambda x: (x["score"], x["failure_events"], x["event_count"]), + reverse=True, + )[: max(1, top_k)] + + if ranked_items: + candidates: List[Dict[str, Any]] = [] + for index, item in enumerate(ranked_items, start=1): + reason = ( + f"Observed {item['event_count']} failure-linked events " + f"({item['proofreading_events']} proofreading, " + f"{item['inference_events']} inference)." + ) + candidates.append( + { + "rank": index, + "item": item["item"], + "score": item["score"], + "reason": reason, + } + ) + + explanation = { + "summary": "Ranked hotspot candidates from recent inference/proofreading failures.", + "candidates": candidates, + } + else: + explanation = { + "summary": "Insufficient failure-linked events to rank specific hotspots.", + "fallback_recommendation": ( + "Start with the most recently edited proofreading region and the latest " + "inference output tile, then collect additional failure annotations." + ), + "candidates": [], + } + + return { + "proposal_type": PRIORITIZE_FAILURE_HOTSPOTS, + "requires_approval": True, + "mutates_state": False, + "action": "review_ranked_hotspots", + "explanation": explanation, + } diff --git a/tests/test_workflow_hotspot_proposal.py b/tests/test_workflow_hotspot_proposal.py new file mode 100644 index 0000000..60dc1d1 --- /dev/null +++ b/tests/test_workflow_hotspot_proposal.py @@ -0,0 +1,58 @@ +import unittest + +from server_api.workflow.proposals import ( + PRIORITIZE_FAILURE_HOTSPOTS, + propose_failure_hotspots, +) + + +class WorkflowHotspotProposalTests(unittest.TestCase): + def test_generates_ranked_candidates_and_reasons(self): + events = [ + {"event_type": "proofreading_error_marked", "region_id": "r-1"}, + {"event_type": "inference_failure_detected", "region_id": "r-1"}, + { + "event_type": "proofreading_reject", + "region_id": "r-2", + "severity": "high", + }, + { + "event_type": "inference_error", + "region_id": "r-2", + "requires_rework": True, + }, + ] + + proposal = propose_failure_hotspots(events, top_k=2) + + self.assertEqual(proposal["proposal_type"], PRIORITIZE_FAILURE_HOTSPOTS) + self.assertTrue(proposal["requires_approval"]) + self.assertFalse(proposal["mutates_state"]) + + candidates = proposal["explanation"]["candidates"] + self.assertEqual(len(candidates), 2) + self.assertEqual(candidates[0]["item"], "region_id:r-2") + self.assertIn("failure-linked events", candidates[0]["reason"]) + + def test_fallback_with_limited_events(self): + events = [{"event_type": "proofreading_viewed", "region_id": "r-1"}] + + proposal = propose_failure_hotspots(events) + + explanation = proposal["explanation"] + self.assertEqual(explanation["candidates"], []) + self.assertIn("fallback_recommendation", explanation) + self.assertIn("Insufficient failure-linked events", explanation["summary"]) + + def test_handles_empty_event_stream_without_mutation(self): + events = [] + + proposal = propose_failure_hotspots(events) + + self.assertEqual(proposal["proposal_type"], PRIORITIZE_FAILURE_HOTSPOTS) + self.assertFalse(proposal["mutates_state"]) + self.assertTrue(proposal["requires_approval"]) + + +if __name__ == "__main__": + unittest.main()