diff --git a/src/workflow/bundled/deep_research.py b/src/workflow/bundled/deep_research.py index cd5bfbd3..5d2870b8 100644 --- a/src/workflow/bundled/deep_research.py +++ b/src/workflow/bundled/deep_research.py @@ -7,6 +7,10 @@ {"title": "Verify", "detail": "Cross-check each claim with independent agents"}, {"title": "Synthesize", "detail": "Write a cited report from surviving claims"}, ], + # #283: a verbose model burned ~888k tokens in Search+Verify with no + # ceiling. Applied by the engine when the caller set no budget; + # CLAWCODEX_DEEP_RESEARCH_TOKEN_BUDGET overrides (0 disables). + "default_budget": 400000, } # Bundled deep-research workflow (port of the upstream /deep-research bundle). @@ -101,6 +105,39 @@ # ── Phase 2: cross-check every claim independently ─────────────────────────── phase("Verify") + +# Budget-aware degradation (#283): surface the Search spend, and only +# launch as many verifiers as the remaining budget affords, reserving +# headroom for Synthesize. Claims that can't be afforded pass through +# UNVERIFIED rather than being dropped (the cross-check default is +# "supported unless contradicted", and an unrun check contradicts +# nothing). With no budget set, everything is verified as before. +# Note: the estimate gates how many verifiers LAUNCH; spend within an +# already-launched wave is uncapped (the engine's ceiling only trips +# between calls), so the Synthesize step below re-checks the budget and +# falls back to the raw claims if the waves overshot. +SYNTH_RESERVE = 40000 +to_verify = claims +unverified = [] +if budget.total: + search_spent = budget.spent() + log(f"Search spent ~{search_spent:,} of the {budget.total:,}-token budget.") + per_verifier = max(2000, search_spent // max(1, len(ANGLES))) + affordable = int(max(0, (budget.remaining() - SYNTH_RESERVE) // per_verifier)) + if affordable < len(claims): + to_verify = claims[:affordable] + unverified = claims[affordable:] + if not to_verify: + log( + f"Token budget nearly exhausted ({budget.remaining():,.0f} left); " + f"skipping cross-check — all {len(claims)} claims pass through unverified." + ) + else: + log( + f"Token budget affords cross-checking {len(to_verify)} of {len(claims)} " + f"claims (~{per_verifier:,} tokens each); {len(unverified)} pass through unverified." + ) + verdicts = await parallel([ agent( f'Fact-check this claim about "{question}":\n\n "{c["claim"]}"\n\n' @@ -116,30 +153,59 @@ phase="Verify", schema=VERDICT_SCHEMA, ) - for c in claims -]) + for c in to_verify +]) if to_verify else [] # Keep claims the cross-check did not refute (supported, and unclear-but-not-wrong), # so the report covers the breadth of the topic rather than only claims that happened # to be independently re-confirmed. Refuted ("unsupported") claims are dropped. -survivors = [ - c for c, v in zip(claims, verdicts) - if v and v.get("verdict") in ("supported", "unclear") -] -log(f"{len(survivors)} of {len(claims)} claims survived cross-checking.") +# A None verdict (verifier failed or hit the budget ceiling) contradicts +# nothing — the claim passes through unverified instead of vanishing. +survivors = [] +failed_checks = 0 +for c, v in zip(to_verify, verdicts): + if v is None: + failed_checks += 1 + survivors.append(c) + elif v.get("verdict") in ("supported", "unclear"): + survivors.append(c) +survivors.extend(unverified) +log( + f"{len(survivors)} of {len(claims)} claims survived cross-checking" + + (f" ({failed_checks} verifier(s) did not complete; their claims pass through unverified)" if failed_checks else "") + + (f" ({len(unverified)} not checked: budget)" if unverified else "") + + "." +) +if budget.total: + log(f"Spend before Synthesize: ~{budget.spent():,} of {budget.total:,} tokens.") # ── Phase 3: synthesize a cited report ─────────────────────────────────────── phase("Synthesize") bullet_lines = "\n".join(f"- {c['claim']} (source: {c['source']})" for c in survivors) -report = await agent( - f'Write a clear, well-organized report answering: "{question}".\n\n' - f"You already have everything you need below — do NOT use any tools (no web search, " - f"no web fetch, no retrieving anything). Write the report DIRECTLY from these " - f"cross-checked claims, citing the source for each point:\n\n{bullet_lines}\n\n" - f"Structure it with a short summary followed by the details. Note any open questions.", - label="synthesize", - phase="Synthesize", -) +# Re-check the ceiling before the final call (#283): a verbose model can +# overshoot the estimate WITHIN the already-launched search/verify waves, +# and a tripped ceiling here would otherwise fail the whole run with no +# report after spending the entire budget. Better an un-synthesized +# claim list than expensive nothing. +if budget.total and budget.spent() >= budget.total: + log( + f"Budget exhausted before Synthesize (~{budget.spent():,} of {budget.total:,}); " + f"returning the raw surviving claims instead of a synthesized report." + ) + report = ( + "(Token budget exhausted before synthesis — raw cross-checked claims:)\n\n" + + bullet_lines + ) +else: + report = await agent( + f'Write a clear, well-organized report answering: "{question}".\n\n' + f"You already have everything you need below — do NOT use any tools (no web search, " + f"no web fetch, no retrieving anything). Write the report DIRECTLY from these " + f"cross-checked claims, citing the source for each point:\n\n{bullet_lines}\n\n" + f"Structure it with a short summary followed by the details. Note any open questions.", + label="synthesize", + phase="Synthesize", + ) return { "question": question, diff --git a/src/workflow/runtime.py b/src/workflow/runtime.py index 25cefeef..e9d4c6ff 100644 --- a/src/workflow/runtime.py +++ b/src/workflow/runtime.py @@ -16,6 +16,8 @@ import asyncio import inspect +import os +import re from dataclasses import dataclass, field from typing import Any, Callable, Mapping, Optional @@ -313,6 +315,36 @@ def _close_coroutines(items) -> None: item.close() +def _resolve_default_budget(meta: WorkflowMeta) -> Optional[int]: + """The workflow's own token ceiling when the caller set none (#283). + + Resolution order: + + 1. ``CLAWCODEX__TOKEN_BUDGET`` env var (name uppercased, + non-alphanumerics → ``_`` — e.g. deep-research reads + ``CLAWCODEX_DEEP_RESEARCH_TOKEN_BUDGET``). ``0`` disables the + default entirely; malformed values are ignored. + 2. ``meta.default_budget`` — a positive int literal in the meta block. + """ + env_key = ( + "CLAWCODEX_" + + re.sub(r"[^A-Za-z0-9]+", "_", meta.name).upper().strip("_") + + "_TOKEN_BUDGET" + ) + raw_env = os.environ.get(env_key) + if raw_env is not None: + try: + value = int(raw_env) + except ValueError: + pass # malformed — fall through to the meta default + else: + return value if value > 0 else None + declared = meta.raw.get("default_budget") + if isinstance(declared, int) and not isinstance(declared, bool) and declared > 0: + return declared + return None + + async def run_workflow( source: str, *, @@ -340,6 +372,12 @@ async def run_workflow( meta = extract_meta(source) scheduler = scheduler if scheduler is not None else Scheduler(max_concurrent) + # A workflow may declare its own ceiling via ``meta.default_budget`` + # (#283: deep-research burned ~888k tokens on a verbose model with no + # cap). Applied only when the caller set no budget — an explicit + # budget_total or an inherited parent Budget (nested workflow()) wins. + if budget is None and budget_total is None: + budget_total = _resolve_default_budget(meta) budget = budget if budget is not None else Budget(budget_total) journal = Journal(resume) progress = WorkflowProgress(meta.phases, on_change=on_progress) diff --git a/tests/workflow/test_research_budget.py b/tests/workflow/test_research_budget.py new file mode 100644 index 00000000..835e599f --- /dev/null +++ b/tests/workflow/test_research_budget.py @@ -0,0 +1,208 @@ +"""#283 — deep-research token budget: meta default + graceful degradation. + +Covers the engine-side ``meta.default_budget`` resolution (with the +``CLAWCODEX__TOKEN_BUDGET`` env override) and the bundled +deep-research script's budget-aware Verify gating: when the remaining +budget can't afford the full verify fan-out, unaffordable claims pass +through unverified (logged), and Synthesize still runs. +""" + +from __future__ import annotations + +import pathlib +from types import SimpleNamespace + +import pytest + +from src.workflow.runtime import _resolve_default_budget, run_workflow +from src.workflow.sandbox import extract_meta +from src.workflow.types import AgentOutcome + +_DEEP_RESEARCH = ( + pathlib.Path(__file__).resolve().parents[2] + / "src" + / "workflow" + / "bundled" + / "deep_research.py" +).read_text() + + +@pytest.fixture(autouse=True) +def _no_ambient_budget_env(monkeypatch): + """A developer's exported override must not change test meaning.""" + monkeypatch.delenv("CLAWCODEX_DEEP_RESEARCH_TOKEN_BUDGET", raising=False) + + +def _meta(name="deep-research", **raw): + return SimpleNamespace(name=name, raw=raw) + + +class TestResolveDefaultBudget: + def test_meta_default_applies(self): + assert _resolve_default_budget(_meta(default_budget=400000)) == 400000 + + def test_no_default_returns_none(self): + assert _resolve_default_budget(_meta()) is None + + def test_non_positive_or_bool_defaults_ignored(self): + assert _resolve_default_budget(_meta(default_budget=0)) is None + assert _resolve_default_budget(_meta(default_budget=-5)) is None + assert _resolve_default_budget(_meta(default_budget=True)) is None + assert _resolve_default_budget(_meta(default_budget="300000")) is None + + def test_env_override_wins(self, monkeypatch): + monkeypatch.setenv("CLAWCODEX_DEEP_RESEARCH_TOKEN_BUDGET", "123456") + assert _resolve_default_budget(_meta(default_budget=400000)) == 123456 + + def test_env_zero_disables(self, monkeypatch): + monkeypatch.setenv("CLAWCODEX_DEEP_RESEARCH_TOKEN_BUDGET", "0") + assert _resolve_default_budget(_meta(default_budget=400000)) is None + + def test_malformed_env_falls_back_to_meta(self, monkeypatch): + monkeypatch.setenv("CLAWCODEX_DEEP_RESEARCH_TOKEN_BUDGET", "lots") + assert _resolve_default_budget(_meta(default_budget=400000)) == 400000 + + def test_bundled_deep_research_declares_a_default(self): + meta = extract_meta(_DEEP_RESEARCH) + assert _resolve_default_budget(meta) == 400000 + + +class TestMetaDefaultWiring: + async def test_meta_default_reaches_the_script(self, runner): + source = ( + 'meta = {"name": "t", "description": "d", "default_budget": 1234}\n' + "return {'total': budget.total}\n" + ) + result = await run_workflow(source, runner=runner) + assert result.error is None + assert result.value == {"total": 1234} + + async def test_explicit_budget_beats_meta_default(self, runner): + source = ( + 'meta = {"name": "t", "description": "d", "default_budget": 1234}\n' + "return {'total': budget.total}\n" + ) + result = await run_workflow(source, runner=runner, budget_total=99) + assert result.value == {"total": 99} + + +def _research_handler(search_tokens: int, claim_count: int = 3): + """A runner handler producing deterministic search/verify/synthesize + outcomes with controllable token spend.""" + + def handler(spec, index): + schema_props = (spec.schema or {}).get("properties", {}) + if "claims" in schema_props: + return AgentOutcome( + structured={ + "claims": [ + {"claim": f"claim {index}-{i}", "source": f"https://s/{index}/{i}"} + for i in range(claim_count) + ] + }, + tokens=search_tokens, + ) + if "verdict" in schema_props: + return AgentOutcome( + structured={"verdict": "supported", "reason": "checks out"}, + tokens=1000, + ) + return AgentOutcome(text="the report", tokens=1000) + + return handler + + +class TestDeepResearchDegradation: + async def test_tight_budget_skips_verification_but_synthesizes(self, make_runner): + # 4 searches x 4k = 16k spent of 20k; remaining 4k < the 40k + # synthesize reserve -> zero verifiers, everything passes through. + runner = make_runner(handler=_research_handler(search_tokens=4000, claim_count=1)) + result = await run_workflow( + _DEEP_RESEARCH, + runner=runner, + args={"question": "what is up?"}, + budget_total=20000, + ) + assert result.error is None, result.error + assert result.value["report"] == "the report" + # 4 distinct claims (one per angle), none dropped. + assert result.value["claims_verified"] == result.value["claims_gathered"] == 4 + # 4 search agents + 1 synthesize; NO verify agents launched. + assert runner.call_count == 5 + + async def test_partial_budget_verifies_what_it_can_afford(self, make_runner): + # 4 searches x 10k = 40k spent of 100k; remaining 60k - 40k + # reserve = 20k at ~10k/verifier -> 2 of 4 claims verified, the + # other 2 pass through unverified. + runner = make_runner(handler=_research_handler(search_tokens=10000, claim_count=1)) + result = await run_workflow( + _DEEP_RESEARCH, + runner=runner, + args={"question": "what is up?"}, + budget_total=100000, + ) + assert result.error is None, result.error + assert result.value["claims_verified"] == 4 # 2 verified + 2 pass-through + # 4 search + 2 verify + 1 synthesize. + assert runner.call_count == 7 + + async def test_no_budget_verifies_everything(self, make_runner): + runner = make_runner(handler=_research_handler(search_tokens=10000, claim_count=1)) + result = await run_workflow( + _DEEP_RESEARCH, + runner=runner, + args={"question": "what is up?"}, + budget_total=None, + ) + # The meta default (400k) applies — far above this run's spend, + # so all 4 claims are verified. + assert result.error is None, result.error + assert result.value["claims_verified"] == 4 + assert runner.call_count == 9 # 4 search + 4 verify + 1 synthesize + + async def test_failed_verifier_passes_claim_through(self, make_runner): + calls = {"verify": 0} + + def handler(spec, index): + schema_props = (spec.schema or {}).get("properties", {}) + if "claims" in schema_props: + return AgentOutcome( + structured={"claims": [{"claim": f"c{index}", "source": "https://s"}]}, + tokens=10, + ) + if "verdict" in schema_props: + calls["verify"] += 1 + if calls["verify"] == 1: + return AgentOutcome(error="verifier crashed") + return AgentOutcome( + structured={"verdict": "supported", "reason": "ok"}, tokens=10 + ) + return AgentOutcome(text="the report", tokens=10) + + runner = make_runner(handler=handler) + result = await run_workflow( + _DEEP_RESEARCH, + runner=runner, + args={"question": "q?"}, + ) + assert result.error is None, result.error + # The crashed verifier's claim is NOT dropped. + assert result.value["claims_verified"] == 4 + + async def test_wave_overshoot_falls_back_to_raw_claims(self, make_runner): + # The verbose-model incident profile: spend within the + # already-launched Search wave blows past the whole budget + # (4 x 120k > 400k). The run must still produce a report-shaped + # result — the raw claims — instead of failing after full spend. + runner = make_runner(handler=_research_handler(search_tokens=120000, claim_count=1)) + result = await run_workflow( + _DEEP_RESEARCH, + runner=runner, + args={"question": "what is up?"}, + ) + assert result.error is None, result.error + assert "raw cross-checked claims" in result.value["report"] + assert "claim" in result.value["report"] # the bullets made it in + assert result.value["claims_verified"] == 4 # all pass through + # 4 search agents only — no verify, no synthesize agent. + assert runner.call_count == 4