-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathrunner.py
More file actions
333 lines (276 loc) · 11.2 KB
/
Copy pathrunner.py
File metadata and controls
333 lines (276 loc) · 11.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
"""Benchmark runner artifact plumbing.
This module intentionally contains only local fake/baseline execution. Real
provider adapters and report generation are follow-up work packages.
"""
from __future__ import annotations
import json
import platform
import re
import subprocess
import sys
import time
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
import yaml
_SAFE_ID = re.compile(r"^[A-Za-z0-9][A-Za-z0-9_.-]*$")
_EXECUTABLE_ADAPTERS = {"fake", "no-mcp-baseline", "no_mcp_baseline"}
class BenchmarkValidationError(ValueError):
"""Raised when corpus or manifest input is not runnable."""
@dataclass(frozen=True)
class BenchmarkConfig:
"""Benchmark runner configuration."""
corpus_path: Path
manifest_path: Path
out_dir: Path
run_id: str | None = None
dry_run: bool = False
@dataclass(frozen=True)
class Question:
"""Validated corpus question."""
id: str
prompt: str
raw: dict[str, Any]
@dataclass(frozen=True)
class Competitor:
"""Validated competitor manifest entry."""
id: str
adapter: str
raw: dict[str, Any]
@dataclass(frozen=True)
class BenchmarkCell:
"""One competitor/question execution cell."""
competitor: Competitor
question: Question
@property
def cell_id(self) -> str:
return f"{self.competitor.id}/{self.question.id}"
def run_benchmark(config: BenchmarkConfig) -> dict[str, Any]:
"""Run or dry-run a benchmark and write the stable artifact layout."""
corpus_data = _load_yaml_mapping(config.corpus_path, "corpus")
manifest_data = _load_yaml_mapping(config.manifest_path, "manifest")
questions = _load_questions(corpus_data)
competitors = _load_competitors(manifest_data)
cells = [
BenchmarkCell(competitor=competitor, question=question)
for competitor in competitors
for question in questions
]
run_id = config.run_id or _default_run_id()
run_dir = config.out_dir
run_dir.mkdir(parents=True, exist_ok=True)
_write_artifact(run_dir / "snapshots" / "competitor-manifest.yml", manifest_data)
_write_artifact(run_dir / "snapshots" / "corpus.yml", corpus_data)
started_at = _utc_now()
environment = _environment_metadata(run_id=run_id, dry_run=config.dry_run)
_write_json(run_dir / "environment.json", environment)
planned_cells = [
{"competitor_id": cell.competitor.id, "corpus_id": cell.question.id}
for cell in cells
]
_write_json(run_dir / "planned-cells.json", {"cells": planned_cells})
succeeded = 0
failed = 0
if not config.dry_run:
for cell in cells:
result = _execute_cell(cell)
_write_cell_artifacts(run_dir, cell, result)
if result["status"] == "succeeded":
succeeded += 1
else:
failed += 1
summary = {
"run_id": run_id,
"dry_run": config.dry_run,
"started_at": started_at,
"completed_at": _utc_now(),
"corpus_path": str(config.corpus_path),
"manifest_path": str(config.manifest_path),
"artifact_root": str(run_dir),
"repo_commit": environment["repo_commit"],
"external_provider_calls": False,
"planned_cells": len(cells),
"succeeded_cells": succeeded,
"failed_cells": failed,
"competitors": [competitor.id for competitor in competitors],
"corpus_ids": [question.id for question in questions],
}
_write_json(run_dir / "run-summary.json", summary)
return summary
def _load_yaml_mapping(path: Path, label: str) -> dict[str, Any]:
if not path.exists():
raise BenchmarkValidationError(f"{label} file does not exist: {path}")
with path.open("r", encoding="utf-8") as file:
data = yaml.safe_load(file)
if not isinstance(data, dict):
raise BenchmarkValidationError(f"{label} file must contain a YAML mapping")
return data
def _load_questions(data: dict[str, Any]) -> list[Question]:
items = data.get("questions")
if not isinstance(items, list) or not items:
raise BenchmarkValidationError("corpus must contain a non-empty 'questions' list")
questions: list[Question] = []
seen: set[str] = set()
for index, item in enumerate(items):
if not isinstance(item, dict):
raise BenchmarkValidationError(f"corpus question at index {index} must be a mapping")
question_id = _required_safe_id(item, "id", f"corpus question at index {index}")
if question_id in seen:
raise BenchmarkValidationError(f"duplicate corpus id: {question_id}")
seen.add(question_id)
prompt = item.get("prompt")
if not isinstance(prompt, str) or not prompt.strip():
raise BenchmarkValidationError(f"corpus question {question_id} must include prompt")
questions.append(Question(id=question_id, prompt=prompt, raw=item))
return questions
def _load_competitors(data: dict[str, Any]) -> list[Competitor]:
items = data.get("competitors")
if not isinstance(items, list) or not items:
raise BenchmarkValidationError("manifest must contain a non-empty 'competitors' list")
competitors: list[Competitor] = []
seen: set[str] = set()
for index, item in enumerate(items):
if not isinstance(item, dict):
raise BenchmarkValidationError(f"competitor at index {index} must be a mapping")
competitor_id = _required_safe_id(item, "id", f"competitor at index {index}")
if competitor_id in seen:
raise BenchmarkValidationError(f"duplicate competitor id: {competitor_id}")
seen.add(competitor_id)
adapter = item.get("adapter")
if not isinstance(adapter, str) or not adapter.strip():
raise BenchmarkValidationError(f"competitor {competitor_id} must include adapter")
competitors.append(Competitor(id=competitor_id, adapter=adapter, raw=item))
return competitors
def _required_safe_id(item: dict[str, Any], key: str, label: str) -> str:
value = item.get(key)
if not isinstance(value, str) or not value.strip():
raise BenchmarkValidationError(f"{label} is missing required {key!r}")
if not _SAFE_ID.fullmatch(value):
raise BenchmarkValidationError(
f"{label} has unsafe {key!r}: {value!r}; use letters, numbers, dots, dashes, "
"or underscores"
)
return value
def _execute_cell(cell: BenchmarkCell) -> dict[str, Any]:
started = time.perf_counter()
started_at = _utc_now()
status = "succeeded"
error: dict[str, str] | None = None
answer = ""
try:
answer = _fake_provider_answer(cell)
except Exception as exc: # noqa: BLE001 - failures are benchmark artifacts
status = "failed"
error = {"type": type(exc).__name__, "message": str(exc)}
latency_ms = round((time.perf_counter() - started) * 1000, 3)
completed_at = _utc_now()
transcript = {
"competitor_id": cell.competitor.id,
"corpus_id": cell.question.id,
"adapter": cell.competitor.adapter,
"status": status,
"started_at": started_at,
"completed_at": completed_at,
"messages": [{"role": "user", "content": cell.question.prompt}],
"answer": answer,
"error": error,
"external_provider_calls": False,
}
token_record = {
"competitor_id": cell.competitor.id,
"corpus_id": cell.question.id,
"status": "placeholder",
"input_characters": len(cell.question.prompt),
"output_characters": len(answer),
"client_wrapped_tokens": None,
"raw_payload_tokens": None,
"notes": "Token counting integration is intentionally out of scope for issue #72.",
}
latency_record = {
"competitor_id": cell.competitor.id,
"corpus_id": cell.question.id,
"status": status,
"latency_ms": latency_ms,
"started_at": started_at,
"completed_at": completed_at,
}
scoring_record = {
"competitor_id": cell.competitor.id,
"corpus_id": cell.question.id,
"status": "placeholder",
"score": None,
"requires_manual_scoring": True,
"notes": "Correctness scoring automation is intentionally out of scope for issue #72.",
}
return {
"status": status,
"transcript": transcript,
"tokens": token_record,
"latency": latency_record,
"scoring": scoring_record,
"failure": error,
}
def _fake_provider_answer(cell: BenchmarkCell) -> str:
adapter = cell.competitor.adapter
if adapter not in _EXECUTABLE_ADAPTERS:
raise RuntimeError(f"adapter {adapter!r} is not implemented in issue #72 runner")
if cell.competitor.raw.get("force_failure") is True:
raise RuntimeError("forced fake provider failure")
answer = cell.competitor.raw.get("fake_answer")
if isinstance(answer, str):
return answer
return f"[fake:{cell.competitor.id}] {cell.question.prompt}"
def _write_cell_artifacts(run_dir: Path, cell: BenchmarkCell, result: dict[str, Any]) -> None:
competitor_id = cell.competitor.id
corpus_id = cell.question.id
_write_json(run_dir / "transcripts" / competitor_id / f"{corpus_id}.json", result["transcript"])
_write_json(run_dir / "tokens" / competitor_id / f"{corpus_id}.json", result["tokens"])
_write_json(run_dir / "latency" / competitor_id / f"{corpus_id}.json", result["latency"])
_write_json(run_dir / "scoring" / competitor_id / f"{corpus_id}.json", result["scoring"])
if result["failure"] is not None:
_write_json(
run_dir / "failures" / competitor_id / f"{corpus_id}.json",
{
"competitor_id": competitor_id,
"corpus_id": corpus_id,
"status": "failed",
"error": result["failure"],
},
)
def _environment_metadata(*, run_id: str, dry_run: bool) -> dict[str, Any]:
return {
"run_id": run_id,
"created_at": _utc_now(),
"dry_run": dry_run,
"repo_commit": _repo_commit_sha(),
"python_version": sys.version,
"python_executable": sys.executable,
"platform": platform.platform(),
"system": platform.system(),
"machine": platform.machine(),
"benchmark_runner": "benchmarks",
"external_provider_calls": False,
}
def _repo_commit_sha() -> str:
try:
result = subprocess.run(
["git", "rev-parse", "HEAD"],
check=True,
capture_output=True,
text=True,
timeout=5,
)
except (OSError, subprocess.CalledProcessError, subprocess.TimeoutExpired):
return "unknown"
return result.stdout.strip() or "unknown"
def _write_json(path: Path, data: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, indent=2, sort_keys=True) + "\n", encoding="utf-8")
def _write_artifact(path: Path, data: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(yaml.safe_dump(data, sort_keys=True), encoding="utf-8")
def _default_run_id() -> str:
return datetime.now(UTC).strftime("%Y%m%dT%H%M%SZ")
def _utc_now() -> str:
return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")