Skip to content

Commit e37d86b

Browse files
author
amabito
committed
feat(evaluators): add contrib budget evaluator for per-agent cost tracking
New contrib evaluator "budget" that tracks cumulative token/cost usage per agent, channel, user. Configurable time windows via window_seconds. Design per reviewer feedback: - Contrib evaluator (not builtin) for production hardening - Integer limit + Currency enum (USD/EUR/tokens) - window_seconds (int) instead of named windows - group_by for dynamic per-user/per-channel budgets - Evaluator owns cost computation from pricing table - BudgetStore protocol + InMemoryBudgetStore (dict + Lock) - Store derives period keys internally, injectable clock Addresses #130. 55 tests (incl. thread safety, NaN/Inf, scope injection, double-count).
1 parent b965e66 commit e37d86b

12 files changed

Lines changed: 1289 additions & 0 deletions

File tree

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# Budget Evaluator
2+
3+
Cumulative LLM cost and token budget tracking for agent-control.
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
[project]
2+
name = "agent-control-evaluator-budget"
3+
version = "0.1.0"
4+
description = "Budget evaluator for agent-control -- cumulative LLM cost and token tracking"
5+
readme = "README.md"
6+
requires-python = ">=3.12"
7+
license = { text = "Apache-2.0" }
8+
authors = [{ name = "Agent Control Team" }]
9+
dependencies = [
10+
"agent-control-evaluators>=3.0.0",
11+
"agent-control-models>=3.0.0",
12+
]
13+
14+
[project.optional-dependencies]
15+
dev = [
16+
"pytest>=8.0.0",
17+
"pytest-asyncio>=0.23.0",
18+
"ruff>=0.1.0",
19+
"mypy>=1.8.0",
20+
]
21+
22+
[project.entry-points."agent_control.evaluators"]
23+
budget = "agent_control_evaluator_budget.budget:BudgetEvaluator"
24+
25+
[build-system]
26+
requires = ["hatchling"]
27+
build-backend = "hatchling.build"
28+
29+
[tool.hatch.build.targets.wheel]
30+
packages = ["src/agent_control_evaluator_budget"]
31+
32+
[tool.ruff]
33+
line-length = 100
34+
target-version = "py312"
35+
36+
[tool.ruff.lint]
37+
select = ["E", "F", "I"]
38+
39+
[tool.uv.sources]
40+
agent-control-evaluators = { path = "../../builtin", editable = true }
41+
agent-control-models = { path = "../../../models", editable = true }
42+
43+
[dependency-groups]
44+
dev = [
45+
"pytest>=9.0.2",
46+
"pytest-asyncio>=1.3.0",
47+
]

evaluators/contrib/budget/src/agent_control_evaluator_budget/__init__.py

Whitespace-only changes.
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
"""Budget evaluator for per-agent LLM cost and token tracking."""
2+
3+
from agent_control_evaluator_budget.budget.config import BudgetEvaluatorConfig
4+
from agent_control_evaluator_budget.budget.evaluator import BudgetEvaluator
5+
from agent_control_evaluator_budget.budget.memory_store import InMemoryBudgetStore
6+
from agent_control_evaluator_budget.budget.store import BudgetSnapshot, BudgetStore
7+
8+
__all__ = [
9+
"BudgetEvaluator",
10+
"BudgetEvaluatorConfig",
11+
"BudgetSnapshot",
12+
"BudgetStore",
13+
"InMemoryBudgetStore",
14+
]
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
"""Configuration for the budget evaluator."""
2+
3+
from __future__ import annotations
4+
5+
from enum import Enum
6+
7+
from agent_control_evaluators._base import EvaluatorConfig
8+
from pydantic import Field, field_validator, model_validator
9+
10+
# ---------------------------------------------------------------------------
11+
# Window convenience constants (seconds)
12+
# ---------------------------------------------------------------------------
13+
14+
WINDOW_HOURLY = 3600
15+
WINDOW_DAILY = 86400
16+
WINDOW_WEEKLY = 604800
17+
WINDOW_MONTHLY = 2592000 # 30 days
18+
19+
20+
class Currency(str, Enum):
21+
"""Supported budget currencies."""
22+
23+
USD = "usd"
24+
EUR = "eur"
25+
TOKENS = "tokens"
26+
27+
28+
class BudgetLimitRule(EvaluatorConfig):
29+
"""A single budget limit rule.
30+
31+
Each rule defines a ceiling for a combination of scope dimensions
32+
and time window. Multiple rules can apply to the same step -- the
33+
evaluator checks all of them and triggers on the first breach.
34+
35+
Attributes:
36+
scope: Static scope dimensions that must match for this rule
37+
to apply. Empty dict = global rule.
38+
Examples:
39+
{"agent": "summarizer"} -- per-agent limit
40+
{"agent": "summarizer", "channel": "slack"} -- agent+channel limit
41+
group_by: If set, the limit is applied independently for each
42+
unique value of this dimension. e.g. group_by="user_id" means
43+
each user gets their own budget. None = shared/global limit.
44+
window_seconds: Time window for accumulation in seconds.
45+
None = cumulative (no reset). See WINDOW_* constants.
46+
limit: Maximum spend in the window, in minor units (e.g. cents
47+
for USD). None = uncapped on this dimension.
48+
currency: Currency for the limit. Defaults to USD.
49+
limit_tokens: Maximum tokens in the window. None = uncapped.
50+
"""
51+
52+
scope: dict[str, str] = Field(default_factory=dict)
53+
group_by: str | None = None
54+
window_seconds: int | None = None
55+
limit: int | None = None
56+
currency: Currency = Currency.USD
57+
limit_tokens: int | None = None
58+
59+
@model_validator(mode="after")
60+
def at_least_one_limit(self) -> "BudgetLimitRule":
61+
if self.limit is None and self.limit_tokens is None:
62+
raise ValueError("At least one of limit or limit_tokens must be set")
63+
return self
64+
65+
@field_validator("limit")
66+
@classmethod
67+
def validate_limit(cls, v: int | None) -> int | None:
68+
if v is not None and v <= 0:
69+
raise ValueError("limit must be a positive integer")
70+
return v
71+
72+
@field_validator("limit_tokens")
73+
@classmethod
74+
def validate_limit_tokens(cls, v: int | None) -> int | None:
75+
if v is not None and v <= 0:
76+
raise ValueError("limit_tokens must be positive")
77+
return v
78+
79+
@field_validator("window_seconds")
80+
@classmethod
81+
def validate_window_seconds(cls, v: int | None) -> int | None:
82+
if v is not None and v <= 0:
83+
raise ValueError("window_seconds must be positive")
84+
return v
85+
86+
87+
class BudgetEvaluatorConfig(EvaluatorConfig):
88+
"""Configuration for the budget evaluator.
89+
90+
Attributes:
91+
limits: List of budget limit rules. Each is checked independently.
92+
pricing: Optional model pricing table. Maps model name to per-1K
93+
token rates. Used to derive cost in USD from token counts and
94+
model name.
95+
token_path: Dot-notation path to extract token usage from step
96+
data (e.g. "usage.total_tokens"). If None, looks for standard
97+
fields (input_tokens, output_tokens, total_tokens, usage).
98+
model_path: Dot-notation path to extract model name (for pricing lookup).
99+
metadata_paths: Mapping of metadata field name to dot-notation path
100+
in step data. Used to extract scope dimensions (channel, user_id, etc).
101+
"""
102+
103+
limits: list[BudgetLimitRule] = Field(min_length=1)
104+
pricing: dict[str, dict[str, float]] | None = None
105+
token_path: str | None = None
106+
model_path: str | None = None
107+
metadata_paths: dict[str, str] = Field(default_factory=dict)
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
"""Budget evaluator -- tracks cumulative LLM token/cost usage.
2+
3+
Deterministic evaluator: confidence is always 1.0, matched is True when
4+
any configured limit is exceeded. Utilization ratio and spend breakdown
5+
are returned in result metadata, not in confidence.
6+
"""
7+
8+
from __future__ import annotations
9+
10+
import logging
11+
import math
12+
from typing import Any
13+
14+
from agent_control_evaluators._base import Evaluator, EvaluatorMetadata
15+
from agent_control_evaluators._registry import register_evaluator
16+
from agent_control_models import EvaluatorResult
17+
18+
from .config import BudgetEvaluatorConfig
19+
from .memory_store import InMemoryBudgetStore
20+
21+
logger = logging.getLogger(__name__)
22+
23+
24+
def _extract_by_path(data: Any, path: str) -> Any:
25+
"""Extract a value from nested data using dot-notation path."""
26+
current = data
27+
for part in path.split("."):
28+
if part.startswith("__"):
29+
return None
30+
if isinstance(current, dict):
31+
current = current.get(part)
32+
elif hasattr(current, part):
33+
current = getattr(current, part)
34+
else:
35+
return None
36+
if current is None:
37+
return None
38+
return current
39+
40+
41+
def _extract_tokens(data: Any, token_path: str | None) -> tuple[int, int]:
42+
"""Extract (input_tokens, output_tokens) from step data.
43+
44+
Tries token_path first, then standard field names.
45+
Returns (0, 0) if no token information found.
46+
"""
47+
if data is None:
48+
return 0, 0
49+
50+
if token_path:
51+
val = _extract_by_path(data, token_path)
52+
if isinstance(val, int) and not isinstance(val, bool) and val >= 0:
53+
return 0, val
54+
if isinstance(val, dict):
55+
data = val
56+
57+
if isinstance(data, dict):
58+
usage = data.get("usage", data)
59+
if isinstance(usage, dict):
60+
inp = usage.get("input_tokens")
61+
if inp is None:
62+
inp = usage.get("prompt_tokens")
63+
out = usage.get("output_tokens")
64+
if out is None:
65+
out = usage.get("completion_tokens")
66+
inp_ok = isinstance(inp, int) and not isinstance(inp, bool)
67+
out_ok = isinstance(out, int) and not isinstance(out, bool)
68+
if inp_ok and out_ok:
69+
return max(0, inp), max(0, out)
70+
total = usage.get("total_tokens")
71+
if isinstance(total, int) and not isinstance(total, bool) and total > 0:
72+
return 0, max(0, total)
73+
return 0, 0
74+
75+
76+
def _estimate_cost(
77+
model: str | None,
78+
input_tokens: int,
79+
output_tokens: int,
80+
pricing: dict[str, dict[str, float]] | None,
81+
) -> int:
82+
"""Estimate cost in minor units from model pricing table. Returns 0 if unknown."""
83+
if not model or not pricing:
84+
return 0
85+
rates = pricing.get(model)
86+
if not rates:
87+
return 0
88+
input_rate = rates.get("input_per_1k", 0.0)
89+
output_rate = rates.get("output_per_1k", 0.0)
90+
cost = (input_tokens * input_rate + output_tokens * output_rate) / 1000.0
91+
if not math.isfinite(cost) or cost < 0:
92+
return 0
93+
return math.ceil(cost)
94+
95+
96+
def _extract_metadata(data: Any, metadata_paths: dict[str, str]) -> dict[str, str]:
97+
"""Extract metadata fields from step data using configured paths."""
98+
result: dict[str, str] = {}
99+
for field_name, path in metadata_paths.items():
100+
val = _extract_by_path(data, path)
101+
if val is not None:
102+
result[field_name] = str(val)
103+
return result
104+
105+
106+
@register_evaluator
107+
class BudgetEvaluator(Evaluator[BudgetEvaluatorConfig]):
108+
"""Tracks cumulative LLM token and cost usage per scope and time window.
109+
110+
Deterministic evaluator: matched=True when any configured limit is
111+
exceeded, confidence=1.0 always.
112+
113+
The evaluator is stateful -- it accumulates usage in a BudgetStore.
114+
The store is created per evaluator config and is thread-safe.
115+
"""
116+
117+
metadata = EvaluatorMetadata(
118+
name="budget",
119+
version="2.0.0",
120+
description="Cumulative LLM token and cost budget tracking",
121+
)
122+
config_model = BudgetEvaluatorConfig
123+
124+
def __init__(self, config: BudgetEvaluatorConfig) -> None:
125+
super().__init__(config)
126+
self._store = InMemoryBudgetStore(rules=config.limits)
127+
128+
async def evaluate(self, data: Any) -> EvaluatorResult:
129+
"""Evaluate step data against all configured budget limits."""
130+
if data is None:
131+
return EvaluatorResult(
132+
matched=False,
133+
confidence=1.0,
134+
message="No data to evaluate",
135+
)
136+
137+
input_tokens, output_tokens = _extract_tokens(data, self.config.token_path)
138+
139+
model: str | None = None
140+
if self.config.model_path:
141+
val = _extract_by_path(data, self.config.model_path)
142+
if val is not None:
143+
model = str(val)
144+
145+
cost = _estimate_cost(model, input_tokens, output_tokens, self.config.pricing)
146+
147+
step_metadata = _extract_metadata(data, self.config.metadata_paths)
148+
149+
snapshots = self._store.record_and_check(
150+
scope=step_metadata,
151+
input_tokens=input_tokens,
152+
output_tokens=output_tokens,
153+
cost=cost,
154+
)
155+
156+
breached: list[dict[str, Any]] = []
157+
all_snaps: list[dict[str, Any]] = []
158+
159+
for i, snap in enumerate(snapshots):
160+
snap_info = {
161+
"spent": snap.spent,
162+
"spent_tokens": snap.spent_tokens,
163+
"limit": snap.limit,
164+
"limit_tokens": snap.limit_tokens,
165+
"utilization": round(snap.utilization, 4),
166+
"exceeded": snap.exceeded,
167+
}
168+
all_snaps.append(snap_info)
169+
if snap.exceeded:
170+
breached.append(snap_info)
171+
172+
if breached:
173+
first = breached[0]
174+
return EvaluatorResult(
175+
matched=True,
176+
confidence=1.0,
177+
message=f"Budget exceeded (utilization={first['utilization']:.0%})",
178+
metadata={
179+
"breached_rules": breached,
180+
"all_snapshots": all_snaps,
181+
"input_tokens": input_tokens,
182+
"output_tokens": output_tokens,
183+
"cost": cost,
184+
},
185+
)
186+
187+
max_util = max((s["utilization"] for s in all_snaps), default=0.0)
188+
return EvaluatorResult(
189+
matched=False,
190+
confidence=1.0,
191+
message=f"Within budget (utilization={max_util:.0%})",
192+
metadata={
193+
"all_snapshots": all_snaps,
194+
"input_tokens": input_tokens,
195+
"output_tokens": output_tokens,
196+
"cost": cost,
197+
"max_utilization": round(max_util, 4),
198+
},
199+
)

0 commit comments

Comments
 (0)