diff --git a/backend/glossa_lab/api/research_loop.py b/backend/glossa_lab/api/research_loop.py index ba81b8b2..4835dd7e 100644 --- a/backend/glossa_lab/api/research_loop.py +++ b/backend/glossa_lab/api/research_loop.py @@ -191,25 +191,32 @@ def _producer(): while True: # Wait for next entry (with timeout so we don't hang forever) try: - entry = await asyncio.wait_for(queue.get(), timeout=120) + entry = await asyncio.wait_for(queue.get(), timeout=360) except asyncio.TimeoutError: break if entry is None: # producer finished break - cycles_done += 1 + # Phase E: intermediate SSE events (proposal, build, verify, + # analysis, timeout, gap_skipped) are streamed directly. + # Only persist + increment on node_complete / cycle entries. + entry_type = entry.get("type", "") + is_cycle = entry_type in ("node_complete", "") and entry.get("cycle") + yield f"data: {json.dumps(entry)}\n\n" - # Persist state from async context (no thread issues) - await _persist(loop) + if is_cycle: + cycles_done += 1 + # Persist state from async context (no thread issues) + await _persist(loop) - # Update job progress - if job_id and db: - try: - await db.update_job_status(job_id, "running") - except Exception: # noqa: BLE001 - pass + # Update job progress + if job_id and db: + try: + await db.update_job_status(job_id, "running") + except Exception: # noqa: BLE001 + pass # Wait for producer thread to finish await task @@ -347,6 +354,11 @@ def _build_synthesis(loop, foundation_result: dict[str, Any] | None = None) -> d path_signals = getattr(loop, "path_signals", {}) + # Phase E: include top findings and proposed next from full results + full_results = loop.get_full_results() + top_findings = full_results.get("top_findings", [])[:3] + proposed_next = full_results.get("proposed_next", []) + return { "summary": ( f"{len(history)} cycles completed. " @@ -366,6 +378,8 @@ def _build_synthesis(loop, foundation_result: dict[str, Any] | None = None) -> d "blocked": len(blocked), }, "foundation_check": foundation_result or {"skipped": True, "reason": "not run"}, + "top_findings": top_findings, + "proposed_next": proposed_next, } diff --git a/backend/glossa_lab/loop_proposal.py b/backend/glossa_lab/loop_proposal.py new file mode 100644 index 00000000..c2e12d73 --- /dev/null +++ b/backend/glossa_lab/loop_proposal.py @@ -0,0 +1,392 @@ +"""Closed-loop proposal engine — Propose → Build → Verify → Analyze. + +Phase E: makes the research loop self-directing by replacing the old +pick_template + run_template pattern with a full pipeline: + + 1. ProposalEngine.propose() — rank experiment candidates for a gap + 2. build_experiment() — instantiate a runnable experiment + 3. verify_before_run() — pre-flight checks before execution + 4. analyze_result() — post-run synthesis and trend analysis +""" +from __future__ import annotations + +import logging +import re +from collections import Counter +from dataclasses import dataclass, field +from typing import Any + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Data classes +# --------------------------------------------------------------------------- + +@dataclass +class ExperimentProposal: + experiment_id: str + display_name: str + rationale: str + priority: float = 0.5 + estimated_value: float = 0.5 + novelty_score: float = 0.5 + + +@dataclass +class ExperimentInstance: + experiment_id: str + display_name: str + anchor_set_id: str | None + corpus_ids: list[str] + params: dict[str, Any] = field(default_factory=dict) + proposal: ExperimentProposal | None = None + + +@dataclass +class VerificationResult: + ok: bool + issues: list[str] = field(default_factory=list) + recommendation: str = "pass" # pass | skip | abort + + +@dataclass +class AnalysisResult: + summary: str + metrics: dict[str, Any] = field(default_factory=dict) + flags: list[str] = field(default_factory=list) + suggested_next_steps: list[str] = field(default_factory=list) + + +# --------------------------------------------------------------------------- +# Proposal engine +# --------------------------------------------------------------------------- + +class ProposalEngine: + """Generate ranked experiment proposals for a given research gap. + + Rule-based scorer that prefers: + - Registered experiment templates + - Templates with high historical success rate + - Experiments covering unseen gaps + + Anti-patterns rejected: + - Same experiment twice in a row + - Superseded / legacy experiments + - contact-zone before having enough anchors (min 50) + """ + + # Experiments that are superseded or legacy — never propose + LEGACY_EXPERIMENTS: set[str] = set() + + # Experiments requiring minimum anchor counts + MIN_ANCHOR_REQUIREMENTS: dict[str, int] = { + "contact_zone_analysis": 50, + "anchor_convergence_benchmark": 30, + } + + def __init__( + self, + experiment_names: list[str], + template_to_graph: dict[str, str], + insight_to_experiments: dict[str, list[str]], + ) -> None: + self.experiment_names = experiment_names + self.template_to_graph = template_to_graph + self.insight_to_experiments = insight_to_experiments + self.seen_experiments: set[str] = set() + self.cooldown_map: dict[str, int] = {} # experiment_id → last-run cycle + + def propose( + self, + gap: str, + history: list[dict[str, Any]], + anchor_count: int, + cycle: int = 0, + insights: list[dict[str, Any]] | None = None, + ) -> list[ExperimentProposal]: + """Return 2-3 ranked proposals for what experiment to run next.""" + insights = insights or [] + + # Gather recent experiment IDs + recent_5 = {h["experiment"] for h in history[-5:] if h.get("experiment")} + last_exp = history[-1]["experiment"] if history else "" + + # Count historical success rate per experiment + success_rate: dict[str, float] = {} + exp_counts: Counter[str] = Counter() + for h in history: + eid = h.get("experiment", "") + if not eid: + continue + exp_counts[eid] += 1 + if h.get("is_new_info"): + success_rate[eid] = success_rate.get(eid, 0) + 1 + for eid in success_rate: + success_rate[eid] /= max(exp_counts[eid], 1) + + # Determine which insight types dominate + type_counts: Counter[str] = Counter( + i.get("type", "") for i in insights + ) + + # Build candidate pool + candidates: list[ExperimentProposal] = [] + for exp_id in self.experiment_names: + # ── Anti-pattern filters ── + if exp_id in self.LEGACY_EXPERIMENTS: + continue + if exp_id == last_exp: # never same as last + continue + if exp_id in self.seen_experiments: + last_cycle = self.cooldown_map.get(exp_id, 0) + if cycle - last_cycle < 5: # cooldown: 5 cycles + continue + min_anch = self.MIN_ANCHOR_REQUIREMENTS.get(exp_id, 0) + if min_anch > 0 and anchor_count < min_anch: + continue + + # ── Scoring ── + priority = 0.5 + novelty = 1.0 if exp_id not in self.seen_experiments else 0.3 + estimated_value = 0.5 + + # Boost if experiment matches insight-driven selection + for itype, _ in type_counts.most_common(): + exps_for_type = self.insight_to_experiments.get(itype, []) + if exp_id in exps_for_type: + rank_in_type = exps_for_type.index(exp_id) + priority += 0.3 - rank_in_type * 0.05 + break + + # Boost for historical success + sr = success_rate.get(exp_id, 0.5) + priority += sr * 0.2 + + # Penalise if recently run (but past cooldown) + if exp_id in recent_5: + priority -= 0.2 + + # Has a graph backing → slightly more reliable + if exp_id in self.template_to_graph: + priority += 0.05 + + estimated_value = round(priority * novelty, 3) + + rationale = self._build_rationale( + exp_id, gap, novelty, sr, type_counts + ) + + candidates.append(ExperimentProposal( + experiment_id=exp_id, + display_name=exp_id.replace("_", " ").title(), + rationale=rationale, + priority=round(priority, 3), + estimated_value=estimated_value, + novelty_score=round(novelty, 3), + )) + + # Sort by priority descending, take top 3 + candidates.sort(key=lambda p: -p.priority) + return candidates[:3] + + def record_run(self, experiment_id: str, cycle: int) -> None: + """Record that an experiment was run at a given cycle.""" + self.seen_experiments.add(experiment_id) + self.cooldown_map[experiment_id] = cycle + + def _build_rationale( + self, + exp_id: str, + gap: str, + novelty: float, + success_rate: float, + type_counts: Counter, + ) -> str: + parts: list[str] = [] + if novelty >= 1.0: + parts.append("never run before") + if success_rate > 0.6: + parts.append(f"high historical success ({success_rate:.0%})") + top_type = type_counts.most_common(1) + if top_type: + parts.append(f"aligns with dominant insight type '{top_type[0][0]}'") + parts.append(f"targets gap '{gap}'") + return f"{exp_id.replace('_', ' ')}: " + "; ".join(parts) + "." + + +# --------------------------------------------------------------------------- +# Build experiment from proposal +# --------------------------------------------------------------------------- + +def build_experiment( + proposal: ExperimentProposal, + anchor_set_id: str | None, + corpus_ids: list[str], + *, + template_to_graph: dict[str, str] | None = None, +) -> ExperimentInstance: + """Load template, inject anchor set and corpus IDs, return runnable instance.""" + template_to_graph = template_to_graph or {} + graph_id = template_to_graph.get(proposal.experiment_id, "") + + params: dict[str, Any] = {} + if anchor_set_id: + params["anchor_set_id"] = anchor_set_id + if corpus_ids: + params["corpus_ids"] = corpus_ids + if graph_id: + params["graph_id"] = graph_id + + return ExperimentInstance( + experiment_id=proposal.experiment_id, + display_name=proposal.display_name, + anchor_set_id=anchor_set_id, + corpus_ids=corpus_ids, + params=params, + proposal=proposal, + ) + + +# --------------------------------------------------------------------------- +# Pre-run verification +# --------------------------------------------------------------------------- + +def verify_before_run( + instance: ExperimentInstance, + anchor_count: int, + corpus_available: bool, + corpus_seq_count: int = 0, +) -> VerificationResult: + """Pre-flight checks before running an experiment. + + Returns: + VerificationResult with ok, issues, and recommendation (pass|skip|abort). + """ + issues: list[str] = [] + + # Anchor checks + min_anchors = ProposalEngine.MIN_ANCHOR_REQUIREMENTS.get( + instance.experiment_id, 0 + ) + if min_anchors > 0 and anchor_count < min_anchors: + issues.append( + f"Experiment requires ≥{min_anchors} anchors, " + f"only {anchor_count} available" + ) + + # Corpus availability + if not corpus_available: + issues.append("No corpus data available for this experiment") + + if corpus_seq_count < 10 and corpus_available: + issues.append(f"Very small corpus ({corpus_seq_count} sequences)") + + # SA-style experiments need enough anchors + if instance.experiment_id in ( + "anchor_convergence_benchmark", + "constraint_sweep", + ) and anchor_count < 30: + issues.append( + f"Anchored SA needs ≥30 anchors; only {anchor_count} available" + ) + + if not issues: + return VerificationResult(ok=True, recommendation="pass") + + # Determine severity + critical = any("No corpus" in i for i in issues) + if critical: + return VerificationResult(ok=False, issues=issues, recommendation="abort") + + return VerificationResult(ok=False, issues=issues, recommendation="skip") + + +# --------------------------------------------------------------------------- +# Post-run analysis +# --------------------------------------------------------------------------- + +def analyze_result( + result: dict[str, Any], + proposal: ExperimentProposal, + prior_results: list[dict[str, Any]], + verdict: str = "", +) -> AnalysisResult: + """Post-run synthesis: extract metrics, detect trends, flag changes. + + Extracts key metrics from the result (z-scores, consistency rates, + entropy values), compares to prior runs, and flags significant changes. + """ + metrics: dict[str, Any] = {} + flags: list[str] = [] + next_steps: list[str] = [] + + # Extract common metrics + for key in ( + "zipf_alpha", "ttr", "mutual_info", "avg_jaccard", "avg_depth", + "n_unique", "n_singletons", "n_blockers", "valid_pct", + "n_rare", "avg_sigma", "n_trigrams", "n_sites", "n_motifs", + "n_start", "n_mid", "n_shared", + ): + if key in result: + metrics[key] = result[key] + + # Detect new anchor candidates from the result + n_cands = result.get("n_candidates", 0) + if isinstance(n_cands, int) and n_cands > 0: + flags.append("new_anchor_candidate") + metrics["n_candidates"] = n_cands + + # Detect whether the needle moved vs prior runs of same experiment + same_type_priors = [ + p for p in prior_results + if p.get("experiment") == proposal.experiment_id + ] + + if same_type_priors and metrics: + prev_metrics = same_type_priors[-1].get("analysis_metrics", {}) + for key, val in metrics.items(): + if key in prev_metrics and isinstance(val, (int, float)): + prev_val = prev_metrics[key] + if isinstance(prev_val, (int, float)) and prev_val != 0: + pct_change = (val - prev_val) / abs(prev_val) + if pct_change > 0.1: + flags.append("needle_moved") + metrics[f"{key}_delta"] = round(pct_change, 3) + elif pct_change < -0.1: + flags.append("regression_detected") + metrics[f"{key}_delta"] = round(pct_change, 3) + + # Generate summary + metric_strs = [f"{k}={v}" for k, v in list(metrics.items())[:5] + if not k.endswith("_delta")] + summary_parts = [ + f"{proposal.display_name}: {verdict}" if verdict else proposal.display_name + ] + if metric_strs: + summary_parts.append("; ".join(metric_strs)) + if "needle_moved" in flags: + summary_parts.append("📈 Needle moved") + if "regression_detected" in flags: + summary_parts.append("📉 Regression detected") + if "new_anchor_candidate" in flags: + summary_parts.append(f"🎯 {n_cands} new candidate(s)") + + summary = " · ".join(summary_parts) + + # Suggest next steps + if "needle_moved" in flags: + next_steps.append( + f"Run {proposal.experiment_id} again with more data to confirm trend" + ) + if "new_anchor_candidate" in flags: + next_steps.append("Review staged anchor candidates") + if not flags: + next_steps.append("Try a different experiment type to explore new angles") + + return AnalysisResult( + summary=summary, + metrics=metrics, + flags=flags, + suggested_next_steps=next_steps, + ) diff --git a/backend/glossa_lab/pipelines/research_loop.py b/backend/glossa_lab/pipelines/research_loop.py index 9ff6949b..61b9b6a1 100644 --- a/backend/glossa_lab/pipelines/research_loop.py +++ b/backend/glossa_lab/pipelines/research_loop.py @@ -33,6 +33,14 @@ logger = logging.getLogger(__name__) + +class _CycleTimeoutError(Exception): + """Raised when a single cycle exceeds its wall-clock timeout.""" + def __init__(self, experiment: str) -> None: + self.experiment = experiment + super().__init__(f"Cycle timeout: {experiment}") + + _REPO = Path(__file__).resolve().parents[3] _HOLDAT_CSV = _REPO / "corpora/downloads/external_repos/holdatllc_indus/indus_corpus 2.csv" _ANCHORS_JSON = _REPO / "backend/reports/INDUS_FINAL_ANCHORS.json" @@ -247,21 +255,25 @@ def _dedr_support(reading: str) -> str | None: class ResearchLoop: """Stateful research loop: blitz-mine, adaptive path exploration, act + stage. - Phase 8: - - Corpus (Holdat CSV) and anchor data are loaded at init so experiments - receive real inputs instead of empty dicts. - - Each run() call starts with a blitz mine across all gap topics, - building path_signals used to adaptively select gaps per cycle. - - _execute_with_corpus() runs direct corpus analysis functions that - produce real metrics (Zipf exponent, suffix frequencies, blocker counts). - - _act() interprets each experiment output and generates staged anchor - candidates written to outputs/anchor_staging.json. Not auto-promoted. - + Phase E (closed-loop): + - Each cycle uses Propose → Build → Verify → Run → Analyze pipeline. + - ProposalEngine ranks experiment candidates with anti-duplication and + cooldown tracking (5-cycle minimum between repeats). + - verify_before_run() pre-checks anchor counts and corpus availability. + - analyze_result() extracts metrics, detects trends, flags regressions. + - Per-cycle analyses stored in history for trend comparison. + - Configurable per-cycle timeout (default 300s); partial results saved. + - Yields extra SSE events: proposal_selected, build_complete, + verify_result, analysis_complete, cycle_timeout, gap_skipped. + + Phase 8: Corpus (Holdat CSV) and anchor data loaded at init. Phase 7: history persisted to DB; all_seen is intentionally per-job only. """ - def __init__(self, max_cycles: int = 15, *, db: Any | None = None) -> None: + def __init__(self, max_cycles: int = 15, *, db: Any | None = None, + max_cycle_timeout_seconds: int = 300) -> None: self.max_cycles = max_cycles + self.max_cycle_timeout = max_cycle_timeout_seconds self.all_seen: set[str] = set() self.history: list[dict[str, Any]] = [] self.running = False @@ -284,6 +296,17 @@ def __init__(self, max_cycles: int = 15, *, db: Any | None = None) -> None: self.anchor_candidates: list[dict[str, Any]] = [] self.path_signals: dict[str, float] = {} + # Per-cycle analysis results (Phase E) + self.cycle_analyses: list[dict[str, Any]] = [] + + # Proposal engine (Phase E) + from glossa_lab.loop_proposal import ProposalEngine # noqa: PLC0415 + self._proposal_engine = ProposalEngine( + experiment_names=EXPERIMENT_NAMES, + template_to_graph=TEMPLATE_TO_GRAPH, + insight_to_experiments=INSIGHT_TO_EXPERIMENTS, + ) + self._load_corpus() self._load_anchors() @@ -585,13 +608,22 @@ def stop(self) -> None: def run(self) -> Generator[dict[str, Any], None, None]: """Yield one dict per completed cycle. - Phase 8 order: blitz mine → adaptive cycles (mine, execute, act) → stage. + Phase E order: blitz mine → adaptive cycles + (propose → build → verify → run → analyze → act) → stage. """ + from glossa_lab.loop_proposal import ( # noqa: PLC0415 + analyze_result, + build_experiment, + verify_before_run, + ) + self.running = True self.should_stop = False self.anchor_candidates = [] + self.cycle_analyses = [] _dry_streak = 0 _MAX_DRY = 3 + _timeout_count = 0 # Phase 1: blitz mine _, _, path_signals = self._blitz_mine() @@ -613,11 +645,175 @@ def run(self) -> Generator[dict[str, Any], None, None]: else: _dry_streak = 0 - template = self._select_experiment(insights, cycle) - verdict, exp_output = self._execute_with_corpus(template) - new_cands = self._act(template, exp_output, insights) - self.anchor_candidates.extend(new_cands) + # ── PROPOSE ────────────────────────────────────────────────── + anchor_count = len(self.high_signs) + len(self.low_signs) + proposals = self._proposal_engine.propose( + gap=gap["name"], + history=self.history, + anchor_count=anchor_count, + cycle=cycle, + insights=insights, + ) + + if not proposals: + # Fallback to rotation + template = self._select_experiment(insights, cycle) + proposals_used = None + else: + template = None + proposals_used = proposals + + # ── Try proposals until one passes verify ──────────────────── + selected_proposal = None + instance = None + verify_ok = False + + if proposals_used: + for prop in proposals_used: + inst = build_experiment( + prop, + anchor_set_id=None, + corpus_ids=[], + template_to_graph=TEMPLATE_TO_GRAPH, + ) + vr = verify_before_run( + inst, + anchor_count=anchor_count, + corpus_available=bool(self.corpus_seqs), + corpus_seq_count=len(self.corpus_seqs), + ) + if vr.recommendation == "pass": + selected_proposal = prop + instance = inst + verify_ok = True + break + elif vr.recommendation == "abort": + logger.warning("Cycle %d: verify abort for %s: %s", + cycle, prop.experiment_id, vr.issues) + continue + else: # skip + logger.info("Cycle %d: verify skip for %s: %s", + cycle, prop.experiment_id, vr.issues) + continue + + # If no proposal passed, try fallback + if not verify_ok: + if proposals_used: + # All proposals failed verification — skip this gap + yield { + "type": "gap_skipped", + "cycle": cycle, + "gap_targeted": gap["name"], + "reason": "all proposals failed verification", + } + continue + # No proposals at all — use rotation fallback + if template is None: + template = self._select_experiment(insights, cycle) + + # Resolve template name + if selected_proposal: + template = selected_proposal.experiment_id + + # Emit proposal_selected event + yield { + "type": "proposal_selected", + "cycle": cycle, + "experiment": template, + "rationale": selected_proposal.rationale if selected_proposal else "rotation fallback", + "priority": selected_proposal.priority if selected_proposal else 0, + } + + # Emit build_complete event + yield { + "type": "build_complete", + "cycle": cycle, + "experiment": template, + } + + # Emit verify_result event + yield { + "type": "verify_result", + "cycle": cycle, + "experiment": template, + "ok": True, + "recommendation": "pass", + } + + # ── RUN (with per-cycle timeout) ───────────────────────────── + cycle_t0 = time.time() + timed_out = False + try: + verdict, exp_output = self._execute_with_corpus_timeout( + template, self.max_cycle_timeout + ) + except _CycleTimeoutError: + timed_out = True + _timeout_count += 1 + verdict = f"{template}: timed out after {self.max_cycle_timeout}s" + exp_output = {} + logger.warning("Cycle %d: %s timed out", cycle, template) + # Record in cooldown to avoid retrying soon + self._proposal_engine.record_run(template, cycle) + yield { + "type": "cycle_timeout", + "cycle": cycle, + "experiment": template, + "timeout_seconds": self.max_cycle_timeout, + } + # Fail the whole loop if >50% of cycles time out + if _timeout_count > self.max_cycles * 0.5: + logger.error("Loop: >50%% cycles timed out — aborting") + self._save_partial_results() + break + + if not timed_out: + new_cands = self._act(template, exp_output, insights) + self.anchor_candidates.extend(new_cands) + else: + new_cands = [] + + # Record proposal run + self._proposal_engine.record_run(template, cycle) + + # ── ANALYZE ────────────────────────────────────────────────── + if selected_proposal and not timed_out: + from glossa_lab.loop_proposal import ExperimentProposal as _EP # noqa: PLC0415 + analysis = analyze_result( + result=exp_output, + proposal=selected_proposal, + prior_results=self.history, + verdict=verdict, + ) + else: + # Minimal analysis for fallback/timeout + from glossa_lab.loop_proposal import AnalysisResult # noqa: PLC0415 + analysis = AnalysisResult( + summary=verdict[:200] if verdict else template, + metrics={}, + flags=["timeout"] if timed_out else [], + ) + + cycle_analysis_entry = { + "cycle": cycle, + "experiment": template, + "summary": analysis.summary, + "metrics": analysis.metrics, + "flags": analysis.flags, + "suggested_next_steps": analysis.suggested_next_steps, + } + self.cycle_analyses.append(cycle_analysis_entry) + + # Emit analysis_complete event + yield { + "type": "analysis_complete", + "cycle": cycle, + "experiment": template, + "summary": analysis.summary, + "flags": analysis.flags, + } + # ── Build history entry ────────────────────────────────────── is_new = verdict != (self.history[-1]["verdict"] if self.history else "") entry = { "cycle": cycle, @@ -627,18 +823,40 @@ def run(self) -> Generator[dict[str, Any], None, None]: "n_insights": len(insights), "insight_types": dict(Counter(i.get("type", "") for i in insights)), "experiment": template, - "selection_method": "insight" if insights else "rotation", + "selection_method": "proposal" if selected_proposal else "rotation", "verdict": verdict, "is_new_info": is_new, "n_candidates": len(new_cands), "top_candidate": new_cands[0]["sign"] if new_cands else None, + "analysis_metrics": analysis.metrics, + "analysis_flags": analysis.flags, + "analysis_summary": analysis.summary, } self.history.append(entry) - yield entry + + # Emit the standard cycle entry (node_complete) + yield {"type": "node_complete", **entry} self._save_staging() + self._save_partial_results() self.running = False + # ------------------------------------------------------------------ + # Timeout helper + # ------------------------------------------------------------------ + + def _execute_with_corpus_timeout( + self, template: str, timeout: int + ) -> tuple[str, dict[str, Any]]: + """Run _execute_with_corpus with a wall-clock timeout.""" + import concurrent.futures # noqa: PLC0415 + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: + future = pool.submit(self._execute_with_corpus, template) + try: + return future.result(timeout=timeout) + except concurrent.futures.TimeoutError: + raise _CycleTimeoutError(template) from None + # ------------------------------------------------------------------ # Execute with real corpus data # ------------------------------------------------------------------ @@ -1083,6 +1301,31 @@ def _check_conflict(self, sign: str, proposed: str) -> str | None: f"reading '{er}'") return None + # ------------------------------------------------------------------ + # Partial results + # ------------------------------------------------------------------ + + def _save_partial_results(self) -> None: + """Write completed cycle syntheses to partial_loop_result.json.""" + if not self.cycle_analyses: + return + partial_path = _REPO / "outputs" / "partial_loop_result.json" + partial_path.parent.mkdir(parents=True, exist_ok=True) + try: + import json as _j # noqa: PLC0415 + partial_path.write_text( + _j.dumps({ + "cycles_completed": len(self.history), + "max_cycles": self.max_cycles, + "cycle_analyses": self.cycle_analyses, + "history": self.history, + }, indent=2, ensure_ascii=False, default=str), + encoding="utf-8", + ) + logger.info("Partial results saved → %s", partial_path) + except Exception as exc: # noqa: BLE001 + logger.warning("Failed to save partial results: %s", exc) + # ------------------------------------------------------------------ # Staging # ------------------------------------------------------------------ @@ -1144,8 +1387,51 @@ def get_full_results(self) -> dict[str, Any]: if c.get("review_status") == "staged") blocked = sum(1 for c in self.anchor_candidates if c.get("review_status") == "blocked") + + # Top findings from cycle analyses (Phase E) + top_findings: list[dict[str, Any]] = [] + for ca in self.cycle_analyses: + if ca.get("metrics"): + for k, v in list(ca["metrics"].items())[:2]: + if not str(k).endswith("_delta"): + top_findings.append({ + "experiment": ca["experiment"], + "metric": k, + "value": v, + "interpretation": ca.get("summary", "")[:120], + }) + # Deduplicate and limit + seen_keys: set[str] = set() + unique_findings: list[dict[str, Any]] = [] + for f in top_findings: + key = f"{f['experiment']}:{f['metric']}" + if key not in seen_keys: + seen_keys.add(key) + unique_findings.append(f) + top_findings = unique_findings[:10] + + # Proposed next experiments (Phase E) + proposed_next: list[dict[str, Any]] = [] + anchor_count = len(self.high_signs) + len(self.low_signs) + try: + next_proposals = self._proposal_engine.propose( + gap="continuation", + history=self.history, + anchor_count=anchor_count, + cycle=len(self.history) + 1, + ) + for p in next_proposals: + proposed_next.append({ + "experiment_id": p.experiment_id, + "display_name": p.display_name, + "rationale": p.rationale, + "priority": p.priority, + }) + except Exception: # noqa: BLE001 + pass + return { - "protocol": "integrated_research_loop_v2", + "protocol": "integrated_research_loop_v3", "cycles_run": len(self.history), "max_cycles": self.max_cycles, "total_papers_mined": sum(h["n_papers"] for h in self.history), @@ -1159,4 +1445,7 @@ def get_full_results(self) -> dict[str, Any]: "blocked": blocked, }, "history": self.history, + "top_findings": top_findings, + "proposed_next": proposed_next, + "cycle_analyses": self.cycle_analyses, } diff --git a/frontend/src/components/ResearchLoopPanel.tsx b/frontend/src/components/ResearchLoopPanel.tsx index 891c7e2f..7c8e81db 100644 --- a/frontend/src/components/ResearchLoopPanel.tsx +++ b/frontend/src/components/ResearchLoopPanel.tsx @@ -71,6 +71,20 @@ interface AnchorCandidate { partner_reading?: string; } +interface TopFinding { + experiment: string; + metric: string; + value: number | string; + interpretation: string; +} + +interface ProposedNext { + experiment_id: string; + display_name: string; + rationale: string; + priority: number; +} + interface Synthesis { summary: string; needle_moved?: boolean; @@ -81,6 +95,8 @@ interface Synthesis { foundation_check: FoundationCheck; anchor_candidates?: AnchorCandidate[]; candidate_counts?: { total: number; staged: number; blocked: number }; + top_findings?: TopFinding[]; + proposed_next?: ProposedNext[]; } interface LastRun { @@ -183,14 +199,54 @@ export function ResearchLoopPanel() { try { const event = JSON.parse(line.slice(6)) as CycleEntry & { type?: string; synthesis?: Synthesis; + experiment?: string; rationale?: string; + summary?: string; flags?: string[]; + ok?: boolean; reason?: string; + timeout_seconds?: number; gap_targeted?: string; }; if (event.type === "complete") { if (event.synthesis) setSynthesis(event.synthesis); - // Notify DashboardView so it can pull the new insight window.dispatchEvent(new CustomEvent("glossa:loop-complete")); void fetchStatus(); void fetchLastRun(); void fetchStaging(); + } else if (event.type === "proposal_selected") { + setLog((prev) => [...prev, { + cycle: event.cycle ?? 0, gap_targeted: "", + experiment: event.experiment ?? "", n_papers: 0, n_insights: 0, + insight_types: {}, verdict: `\u{1F4A1} Proposed: ${event.rationale ?? ""}`, + is_new_info: false, selection_method: "proposal", + } as CycleEntry]); + } else if (event.type === "verify_result") { + setLog((prev) => [...prev, { + cycle: event.cycle ?? 0, gap_targeted: "", + experiment: event.experiment ?? "", n_papers: 0, n_insights: 0, + insight_types: {}, verdict: `\u2713 Verified: ${event.ok ? "pass" : "fail"}`, + is_new_info: false, selection_method: "verify", + } as CycleEntry]); + } else if (event.type === "analysis_complete") { + setLog((prev) => [...prev, { + cycle: event.cycle ?? 0, gap_targeted: "", + experiment: event.experiment ?? "", n_papers: 0, n_insights: 0, + insight_types: {}, verdict: `\u{1F4CA} ${(event.summary ?? "").slice(0, 80)}`, + is_new_info: false, selection_method: "analysis", + } as CycleEntry]); + } else if (event.type === "cycle_timeout") { + setLog((prev) => [...prev, { + cycle: event.cycle ?? 0, gap_targeted: "", + experiment: event.experiment ?? "", n_papers: 0, n_insights: 0, + insight_types: {}, verdict: `\u23F1 Timeout (${event.timeout_seconds ?? 300}s)`, + is_new_info: false, selection_method: "timeout", + } as CycleEntry]); + } else if (event.type === "gap_skipped") { + setLog((prev) => [...prev, { + cycle: event.cycle ?? 0, gap_targeted: event.gap_targeted ?? "", + experiment: "", n_papers: 0, n_insights: 0, + insight_types: {}, verdict: `\u23ED Gap skipped: ${event.reason ?? ""}`, + is_new_info: false, selection_method: "skipped", + } as CycleEntry]); + } else if (event.type === "node_complete" && event.cycle) { + setLog((prev) => [...prev, event as CycleEntry]); } else if (event.cycle) { setLog((prev) => [...prev, event as CycleEntry]); } @@ -262,7 +318,7 @@ export function ResearchLoopPanel() { {/* ── Protocol description ── */}