Skip to content

Commit 76a3f09

Browse files
committed
fix(agentic): fail jobs with excessive aiperf errors
1 parent ba65df8 commit 76a3f09

3 files changed

Lines changed: 202 additions & 3 deletions

File tree

benchmarks/benchmark_lib.sh

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -899,6 +899,7 @@ run_eval() {
899899
INFMAX_CONTAINER_WORKSPACE="${INFMAX_CONTAINER_WORKSPACE:-/workspace}"
900900
AGENTIC_DIR="${AGENTIC_DIR:-${INFMAX_CONTAINER_WORKSPACE}/utils/agentic-benchmark}"
901901
AIPERF_DIR="${AIPERF_DIR:-${INFMAX_CONTAINER_WORKSPACE}/utils/aiperf}"
902+
AIPERF_FAILED_REQUEST_THRESHOLD=0.10
902903

903904
agentic_pip_install() {
904905
local pip_install=(python3 -m pip install)
@@ -1034,7 +1035,7 @@ build_replay_cmd() {
10341035
# transient low-rate failures from killing long sweeps while still
10351036
# catching malformed payloads or server crashes before they get aggregated
10361037
# as benchmarkable data.
1037-
REPLAY_CMD+=" --failed-request-threshold 0.10"
1038+
REPLAY_CMD+=" --failed-request-threshold $AIPERF_FAILED_REQUEST_THRESHOLD"
10381039
# Sample each trajectory's warmup start position uniformly from
10391040
# [25%, 75%] of the trace's turn count (was hardcoded 0%-70% upstream).
10401041
# Avoids starting trajectories right at turn 0 where the KV cache is
@@ -1095,8 +1096,9 @@ build_replay_cmd() {
10951096

10961097
write_agentic_result_json() {
10971098
# Aggregate aiperf's profile_export.{json,jsonl} + server_metrics_export.json
1098-
# into $AGENTIC_OUTPUT_DIR/$RESULT_FILENAME.json. The workflow's existing
1099-
# retry-based existence check is the single success gate.
1099+
# into $AGENTIC_OUTPUT_DIR/$RESULT_FILENAME.json. The workflow checks that
1100+
# this file exists; run_agentic_replay_and_write_outputs separately rejects
1101+
# aggregates whose request error rate exceeds the configured limit.
11001102
local result_dir="$1"
11011103
RESULT_DIR="$result_dir" AGENTIC_OUTPUT_DIR="${AGENTIC_OUTPUT_DIR:-$INFMAX_CONTAINER_WORKSPACE}" \
11021104
python3 "$INFMAX_CONTAINER_WORKSPACE/utils/process_agentic_result.py"
@@ -1110,6 +1112,7 @@ write_agentic_result_json() {
11101112
run_agentic_replay_and_write_outputs() {
11111113
local result_dir="$1"
11121114
local replay_rc
1115+
local validation_rc
11131116

11141117
echo "$REPLAY_CMD" > "$result_dir/benchmark_command.txt"
11151118

@@ -1125,8 +1128,20 @@ run_agentic_replay_and_write_outputs() {
11251128
python3 "$AGENTIC_DIR/scripts/analyze_benchmark_distributions.py" \
11261129
"$result_dir/aiperf_artifacts" -o "$result_dir" 2>&1 || true
11271130

1131+
set +e
1132+
python3 "$INFMAX_CONTAINER_WORKSPACE/utils/validate_agentic_result.py" \
1133+
"$result_dir/aiperf_artifacts" \
1134+
--failed-request-threshold "$AIPERF_FAILED_REQUEST_THRESHOLD"
1135+
validation_rc=$?
1136+
set -e
1137+
11281138
if [ "$replay_rc" -ne 0 ]; then
11291139
echo "ERROR: agentic trace replay exited with code $replay_rc after writing available results" >&2
11301140
return "$replay_rc"
11311141
fi
1142+
1143+
if [ "$validation_rc" -ne 0 ]; then
1144+
echo "ERROR: agentic trace replay produced invalid results after writing available artifacts" >&2
1145+
return "$validation_rc"
1146+
fi
11321147
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
"""Tests for the agentic aiperf result validity gate."""
2+
3+
from __future__ import annotations
4+
5+
import json
6+
from pathlib import Path
7+
8+
from validate_agentic_result import validate_result
9+
10+
11+
def _write_aggregate(tmp_path: Path, aggregate: dict, *, per_run: bool = False) -> Path:
12+
artifact_dir = tmp_path / "aiperf_artifacts"
13+
output_dir = artifact_dir / "run_0" if per_run else artifact_dir
14+
output_dir.mkdir(parents=True)
15+
with open(output_dir / "profile_export_aiperf.json", "w") as f:
16+
json.dump(aggregate, f)
17+
return artifact_dir
18+
19+
20+
def test_passes_when_request_error_rate_is_within_limit(tmp_path: Path):
21+
artifact_dir = _write_aggregate(
22+
tmp_path,
23+
{
24+
"request_count": {"avg": 90},
25+
"error_request_count": {"avg": 10},
26+
"completed_request_count": {"avg": 100},
27+
},
28+
)
29+
30+
assert validate_result(artifact_dir, 0.10) == []
31+
32+
33+
def test_fails_when_request_error_rate_exceeds_limit(tmp_path: Path):
34+
artifact_dir = _write_aggregate(
35+
tmp_path,
36+
{
37+
"request_count": {"avg": 2},
38+
"error_request_count": {"avg": 65},
39+
"completed_request_count": {"avg": 67},
40+
},
41+
)
42+
43+
errors = validate_result(artifact_dir, 0.10)
44+
assert errors == [
45+
"aiperf request error rate exceeded the benchmark limit: "
46+
"65/67 = 97.015% > 10.000%"
47+
]
48+
49+
50+
def test_treats_missing_error_count_as_zero(tmp_path: Path):
51+
artifact_dir = _write_aggregate(
52+
tmp_path,
53+
{"request_count": {"avg": 12}},
54+
)
55+
56+
assert validate_result(artifact_dir, 0.10) == []
57+
58+
59+
def test_supports_per_run_artifact_layout(tmp_path: Path):
60+
artifact_dir = _write_aggregate(
61+
tmp_path,
62+
{"request_count": {"avg": 12}},
63+
per_run=True,
64+
)
65+
66+
assert validate_result(artifact_dir, 0.10) == []
67+
68+
69+
def test_fails_when_aggregate_is_missing(tmp_path: Path):
70+
errors = validate_result(tmp_path / "aiperf_artifacts", 0.10)
71+
72+
assert len(errors) == 1
73+
assert errors[0].endswith("profile_export_aiperf.json not found")

utils/validate_agentic_result.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
#!/usr/bin/env python3
2+
"""Validate whether an aiperf agentic replay produced benchmarkable results."""
3+
4+
from __future__ import annotations
5+
6+
import argparse
7+
import json
8+
import math
9+
import sys
10+
from pathlib import Path
11+
from typing import Any
12+
13+
14+
def _resolve_aggregate_path(artifact_dir: Path) -> Path:
15+
"""Find aiperf's aggregate JSON in the direct or per-run artifact layout."""
16+
direct = artifact_dir / "profile_export_aiperf.json"
17+
if direct.is_file():
18+
return direct
19+
20+
if artifact_dir.is_dir():
21+
for child in sorted(artifact_dir.iterdir()):
22+
candidate = child / "profile_export_aiperf.json"
23+
if child.is_dir() and candidate.is_file():
24+
return candidate
25+
26+
return direct
27+
28+
29+
def _metric_avg(aggregate: dict[str, Any], name: str) -> float | None:
30+
"""Read an aggregate metric's numeric average, if present."""
31+
metric = aggregate.get(name)
32+
if metric is None:
33+
return None
34+
if not isinstance(metric, dict):
35+
raise ValueError(f"{name} must be an object")
36+
37+
value = metric.get("avg")
38+
if value is None:
39+
return None
40+
if not isinstance(value, int | float) or isinstance(value, bool):
41+
raise ValueError(f"{name}.avg must be numeric")
42+
43+
value = float(value)
44+
if not math.isfinite(value) or value < 0:
45+
raise ValueError(f"{name}.avg must be a finite non-negative number")
46+
return value
47+
48+
49+
def validate_result(artifact_dir: Path, failed_request_threshold: float) -> list[str]:
50+
"""Return validation errors for an aiperf artifact directory."""
51+
aggregate_path = _resolve_aggregate_path(artifact_dir)
52+
if not aggregate_path.is_file():
53+
return [f"{aggregate_path} not found"]
54+
55+
try:
56+
with open(aggregate_path) as f:
57+
aggregate = json.load(f)
58+
if not isinstance(aggregate, dict):
59+
return [f"{aggregate_path} must contain a JSON object"]
60+
61+
successes = _metric_avg(aggregate, "request_count")
62+
errors = _metric_avg(aggregate, "error_request_count") or 0.0
63+
completed = _metric_avg(aggregate, "completed_request_count")
64+
except (OSError, json.JSONDecodeError, ValueError) as exc:
65+
return [f"failed to read {aggregate_path}: {exc}"]
66+
67+
if successes is None:
68+
return ["request_count.avg is missing"]
69+
if completed is None:
70+
completed = successes + errors
71+
if completed <= 0:
72+
return ["aiperf completed zero requests"]
73+
74+
error_rate = errors / completed
75+
if error_rate > failed_request_threshold:
76+
return [
77+
"aiperf request error rate exceeded the benchmark limit: "
78+
f"{errors:g}/{completed:g} = {error_rate:.3%} > "
79+
f"{failed_request_threshold:.3%}"
80+
]
81+
82+
print(
83+
"Validated aiperf request error rate: "
84+
f"{errors:g}/{completed:g} = {error_rate:.3%} <= "
85+
f"{failed_request_threshold:.3%}"
86+
)
87+
return []
88+
89+
90+
def main() -> int:
91+
parser = argparse.ArgumentParser()
92+
parser.add_argument("artifact_dir", type=Path)
93+
parser.add_argument(
94+
"--failed-request-threshold",
95+
type=float,
96+
required=True,
97+
help="Maximum accepted error fraction, inclusive",
98+
)
99+
args = parser.parse_args()
100+
101+
if not 0 <= args.failed_request_threshold <= 1:
102+
parser.error("--failed-request-threshold must be between 0 and 1")
103+
104+
errors = validate_result(args.artifact_dir, args.failed_request_threshold)
105+
for error in errors:
106+
print(f"ERROR: {error}", file=sys.stderr)
107+
return 1 if errors else 0
108+
109+
110+
if __name__ == "__main__":
111+
sys.exit(main())

0 commit comments

Comments
 (0)