Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 82 additions & 16 deletions src/workflow/bundled/deep_research.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
{"title": "Verify", "detail": "Cross-check each claim with independent agents"},
{"title": "Synthesize", "detail": "Write a cited report from surviving claims"},
],
# #283: a verbose model burned ~888k tokens in Search+Verify with no
# ceiling. Applied by the engine when the caller set no budget;
# CLAWCODEX_DEEP_RESEARCH_TOKEN_BUDGET overrides (0 disables).
"default_budget": 400000,
}

# Bundled deep-research workflow (port of the upstream /deep-research bundle).
Expand Down Expand Up @@ -101,6 +105,39 @@

# ── Phase 2: cross-check every claim independently ───────────────────────────
phase("Verify")

# Budget-aware degradation (#283): surface the Search spend, and only
# launch as many verifiers as the remaining budget affords, reserving
# headroom for Synthesize. Claims that can't be afforded pass through
# UNVERIFIED rather than being dropped (the cross-check default is
# "supported unless contradicted", and an unrun check contradicts
# nothing). With no budget set, everything is verified as before.
# Note: the estimate gates how many verifiers LAUNCH; spend within an
# already-launched wave is uncapped (the engine's ceiling only trips
# between calls), so the Synthesize step below re-checks the budget and
# falls back to the raw claims if the waves overshot.
SYNTH_RESERVE = 40000
to_verify = claims
unverified = []
if budget.total:
search_spent = budget.spent()
log(f"Search spent ~{search_spent:,} of the {budget.total:,}-token budget.")
per_verifier = max(2000, search_spent // max(1, len(ANGLES)))
affordable = int(max(0, (budget.remaining() - SYNTH_RESERVE) // per_verifier))
if affordable < len(claims):
to_verify = claims[:affordable]
unverified = claims[affordable:]
if not to_verify:
log(
f"Token budget nearly exhausted ({budget.remaining():,.0f} left); "
f"skipping cross-check — all {len(claims)} claims pass through unverified."
)
else:
log(
f"Token budget affords cross-checking {len(to_verify)} of {len(claims)} "
f"claims (~{per_verifier:,} tokens each); {len(unverified)} pass through unverified."
)

verdicts = await parallel([
agent(
f'Fact-check this claim about "{question}":\n\n "{c["claim"]}"\n\n'
Expand All @@ -116,30 +153,59 @@
phase="Verify",
schema=VERDICT_SCHEMA,
)
for c in claims
])
for c in to_verify
]) if to_verify else []

# Keep claims the cross-check did not refute (supported, and unclear-but-not-wrong),
# so the report covers the breadth of the topic rather than only claims that happened
# to be independently re-confirmed. Refuted ("unsupported") claims are dropped.
survivors = [
c for c, v in zip(claims, verdicts)
if v and v.get("verdict") in ("supported", "unclear")
]
log(f"{len(survivors)} of {len(claims)} claims survived cross-checking.")
# A None verdict (verifier failed or hit the budget ceiling) contradicts
# nothing — the claim passes through unverified instead of vanishing.
survivors = []
failed_checks = 0
for c, v in zip(to_verify, verdicts):
if v is None:
failed_checks += 1
survivors.append(c)
elif v.get("verdict") in ("supported", "unclear"):
survivors.append(c)
survivors.extend(unverified)
log(
f"{len(survivors)} of {len(claims)} claims survived cross-checking"
+ (f" ({failed_checks} verifier(s) did not complete; their claims pass through unverified)" if failed_checks else "")
+ (f" ({len(unverified)} not checked: budget)" if unverified else "")
+ "."
)
if budget.total:
log(f"Spend before Synthesize: ~{budget.spent():,} of {budget.total:,} tokens.")

# ── Phase 3: synthesize a cited report ───────────────────────────────────────
phase("Synthesize")
bullet_lines = "\n".join(f"- {c['claim']} (source: {c['source']})" for c in survivors)
report = await agent(
f'Write a clear, well-organized report answering: "{question}".\n\n'
f"You already have everything you need below — do NOT use any tools (no web search, "
f"no web fetch, no retrieving anything). Write the report DIRECTLY from these "
f"cross-checked claims, citing the source for each point:\n\n{bullet_lines}\n\n"
f"Structure it with a short summary followed by the details. Note any open questions.",
label="synthesize",
phase="Synthesize",
)
# Re-check the ceiling before the final call (#283): a verbose model can
# overshoot the estimate WITHIN the already-launched search/verify waves,
# and a tripped ceiling here would otherwise fail the whole run with no
# report after spending the entire budget. Better an un-synthesized
# claim list than expensive nothing.
if budget.total and budget.spent() >= budget.total:
log(
f"Budget exhausted before Synthesize (~{budget.spent():,} of {budget.total:,}); "
f"returning the raw surviving claims instead of a synthesized report."
)
report = (
"(Token budget exhausted before synthesis — raw cross-checked claims:)\n\n"
+ bullet_lines
)
else:
report = await agent(
f'Write a clear, well-organized report answering: "{question}".\n\n'
f"You already have everything you need below — do NOT use any tools (no web search, "
f"no web fetch, no retrieving anything). Write the report DIRECTLY from these "
f"cross-checked claims, citing the source for each point:\n\n{bullet_lines}\n\n"
f"Structure it with a short summary followed by the details. Note any open questions.",
label="synthesize",
phase="Synthesize",
)

return {
"question": question,
Expand Down
38 changes: 38 additions & 0 deletions src/workflow/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import asyncio
import inspect
import os
import re
from dataclasses import dataclass, field
from typing import Any, Callable, Mapping, Optional

Expand Down Expand Up @@ -313,6 +315,36 @@ def _close_coroutines(items) -> None:
item.close()


def _resolve_default_budget(meta: WorkflowMeta) -> Optional[int]:
"""The workflow's own token ceiling when the caller set none (#283).

Resolution order:

1. ``CLAWCODEX_<NAME>_TOKEN_BUDGET`` env var (name uppercased,
non-alphanumerics → ``_`` — e.g. deep-research reads
``CLAWCODEX_DEEP_RESEARCH_TOKEN_BUDGET``). ``0`` disables the
default entirely; malformed values are ignored.
2. ``meta.default_budget`` — a positive int literal in the meta block.
"""
env_key = (
"CLAWCODEX_"
+ re.sub(r"[^A-Za-z0-9]+", "_", meta.name).upper().strip("_")
+ "_TOKEN_BUDGET"
)
raw_env = os.environ.get(env_key)
if raw_env is not None:
try:
value = int(raw_env)
except ValueError:
pass # malformed — fall through to the meta default
else:
return value if value > 0 else None
declared = meta.raw.get("default_budget")
if isinstance(declared, int) and not isinstance(declared, bool) and declared > 0:
return declared
return None


async def run_workflow(
source: str,
*,
Expand Down Expand Up @@ -340,6 +372,12 @@ async def run_workflow(
meta = extract_meta(source)

scheduler = scheduler if scheduler is not None else Scheduler(max_concurrent)
# A workflow may declare its own ceiling via ``meta.default_budget``
# (#283: deep-research burned ~888k tokens on a verbose model with no
# cap). Applied only when the caller set no budget — an explicit
# budget_total or an inherited parent Budget (nested workflow()) wins.
if budget is None and budget_total is None:
budget_total = _resolve_default_budget(meta)
budget = budget if budget is not None else Budget(budget_total)
journal = Journal(resume)
progress = WorkflowProgress(meta.phases, on_change=on_progress)
Expand Down
208 changes: 208 additions & 0 deletions tests/workflow/test_research_budget.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
"""#283 — deep-research token budget: meta default + graceful degradation.

Covers the engine-side ``meta.default_budget`` resolution (with the
``CLAWCODEX_<NAME>_TOKEN_BUDGET`` env override) and the bundled
deep-research script's budget-aware Verify gating: when the remaining
budget can't afford the full verify fan-out, unaffordable claims pass
through unverified (logged), and Synthesize still runs.
"""

from __future__ import annotations

import pathlib
from types import SimpleNamespace

import pytest

from src.workflow.runtime import _resolve_default_budget, run_workflow
from src.workflow.sandbox import extract_meta
from src.workflow.types import AgentOutcome

_DEEP_RESEARCH = (
pathlib.Path(__file__).resolve().parents[2]
/ "src"
/ "workflow"
/ "bundled"
/ "deep_research.py"
).read_text()


@pytest.fixture(autouse=True)
def _no_ambient_budget_env(monkeypatch):
"""A developer's exported override must not change test meaning."""
monkeypatch.delenv("CLAWCODEX_DEEP_RESEARCH_TOKEN_BUDGET", raising=False)


def _meta(name="deep-research", **raw):
return SimpleNamespace(name=name, raw=raw)


class TestResolveDefaultBudget:
def test_meta_default_applies(self):
assert _resolve_default_budget(_meta(default_budget=400000)) == 400000

def test_no_default_returns_none(self):
assert _resolve_default_budget(_meta()) is None

def test_non_positive_or_bool_defaults_ignored(self):
assert _resolve_default_budget(_meta(default_budget=0)) is None
assert _resolve_default_budget(_meta(default_budget=-5)) is None
assert _resolve_default_budget(_meta(default_budget=True)) is None
assert _resolve_default_budget(_meta(default_budget="300000")) is None

def test_env_override_wins(self, monkeypatch):
monkeypatch.setenv("CLAWCODEX_DEEP_RESEARCH_TOKEN_BUDGET", "123456")
assert _resolve_default_budget(_meta(default_budget=400000)) == 123456

def test_env_zero_disables(self, monkeypatch):
monkeypatch.setenv("CLAWCODEX_DEEP_RESEARCH_TOKEN_BUDGET", "0")
assert _resolve_default_budget(_meta(default_budget=400000)) is None

def test_malformed_env_falls_back_to_meta(self, monkeypatch):
monkeypatch.setenv("CLAWCODEX_DEEP_RESEARCH_TOKEN_BUDGET", "lots")
assert _resolve_default_budget(_meta(default_budget=400000)) == 400000

def test_bundled_deep_research_declares_a_default(self):
meta = extract_meta(_DEEP_RESEARCH)
assert _resolve_default_budget(meta) == 400000


class TestMetaDefaultWiring:
async def test_meta_default_reaches_the_script(self, runner):
source = (
'meta = {"name": "t", "description": "d", "default_budget": 1234}\n'
"return {'total': budget.total}\n"
)
result = await run_workflow(source, runner=runner)
assert result.error is None
assert result.value == {"total": 1234}

async def test_explicit_budget_beats_meta_default(self, runner):
source = (
'meta = {"name": "t", "description": "d", "default_budget": 1234}\n'
"return {'total': budget.total}\n"
)
result = await run_workflow(source, runner=runner, budget_total=99)
assert result.value == {"total": 99}


def _research_handler(search_tokens: int, claim_count: int = 3):
"""A runner handler producing deterministic search/verify/synthesize
outcomes with controllable token spend."""

def handler(spec, index):
schema_props = (spec.schema or {}).get("properties", {})
if "claims" in schema_props:
return AgentOutcome(
structured={
"claims": [
{"claim": f"claim {index}-{i}", "source": f"https://s/{index}/{i}"}
for i in range(claim_count)
]
},
tokens=search_tokens,
)
if "verdict" in schema_props:
return AgentOutcome(
structured={"verdict": "supported", "reason": "checks out"},
tokens=1000,
)
return AgentOutcome(text="the report", tokens=1000)

return handler


class TestDeepResearchDegradation:
async def test_tight_budget_skips_verification_but_synthesizes(self, make_runner):
# 4 searches x 4k = 16k spent of 20k; remaining 4k < the 40k
# synthesize reserve -> zero verifiers, everything passes through.
runner = make_runner(handler=_research_handler(search_tokens=4000, claim_count=1))
result = await run_workflow(
_DEEP_RESEARCH,
runner=runner,
args={"question": "what is up?"},
budget_total=20000,
)
assert result.error is None, result.error
assert result.value["report"] == "the report"
# 4 distinct claims (one per angle), none dropped.
assert result.value["claims_verified"] == result.value["claims_gathered"] == 4
# 4 search agents + 1 synthesize; NO verify agents launched.
assert runner.call_count == 5

async def test_partial_budget_verifies_what_it_can_afford(self, make_runner):
# 4 searches x 10k = 40k spent of 100k; remaining 60k - 40k
# reserve = 20k at ~10k/verifier -> 2 of 4 claims verified, the
# other 2 pass through unverified.
runner = make_runner(handler=_research_handler(search_tokens=10000, claim_count=1))
result = await run_workflow(
_DEEP_RESEARCH,
runner=runner,
args={"question": "what is up?"},
budget_total=100000,
)
assert result.error is None, result.error
assert result.value["claims_verified"] == 4 # 2 verified + 2 pass-through
# 4 search + 2 verify + 1 synthesize.
assert runner.call_count == 7

async def test_no_budget_verifies_everything(self, make_runner):
runner = make_runner(handler=_research_handler(search_tokens=10000, claim_count=1))
result = await run_workflow(
_DEEP_RESEARCH,
runner=runner,
args={"question": "what is up?"},
budget_total=None,
)
# The meta default (400k) applies — far above this run's spend,
# so all 4 claims are verified.
assert result.error is None, result.error
assert result.value["claims_verified"] == 4
assert runner.call_count == 9 # 4 search + 4 verify + 1 synthesize

async def test_failed_verifier_passes_claim_through(self, make_runner):
calls = {"verify": 0}

def handler(spec, index):
schema_props = (spec.schema or {}).get("properties", {})
if "claims" in schema_props:
return AgentOutcome(
structured={"claims": [{"claim": f"c{index}", "source": "https://s"}]},
tokens=10,
)
if "verdict" in schema_props:
calls["verify"] += 1
if calls["verify"] == 1:
return AgentOutcome(error="verifier crashed")
return AgentOutcome(
structured={"verdict": "supported", "reason": "ok"}, tokens=10
)
return AgentOutcome(text="the report", tokens=10)

runner = make_runner(handler=handler)
result = await run_workflow(
_DEEP_RESEARCH,
runner=runner,
args={"question": "q?"},
)
assert result.error is None, result.error
# The crashed verifier's claim is NOT dropped.
assert result.value["claims_verified"] == 4

async def test_wave_overshoot_falls_back_to_raw_claims(self, make_runner):
# The verbose-model incident profile: spend within the
# already-launched Search wave blows past the whole budget
# (4 x 120k > 400k). The run must still produce a report-shaped
# result — the raw claims — instead of failing after full spend.
runner = make_runner(handler=_research_handler(search_tokens=120000, claim_count=1))
result = await run_workflow(
_DEEP_RESEARCH,
runner=runner,
args={"question": "what is up?"},
)
assert result.error is None, result.error
assert "raw cross-checked claims" in result.value["report"]
assert "claim" in result.value["report"] # the bullets made it in
assert result.value["claims_verified"] == 4 # all pass through
# 4 search agents only — no verify, no synthesize agent.
assert runner.call_count == 4