Skip to content

Commit c774295

Browse files
PR-E1b (ADR 0008 \u00a76.5): gRPC long-session bench + server CLI
PR-E1's deferred sibling, per the scope split documented in PR-E1's description. Ships everything needed to validate the two ADR 0008 \u00a77 GA gates the deprecated HTTP shim's bench cannot answer: * memory bounded: agg.kv_bounded * prefill bounded: agg.prefill_bounded The HTTP shim's bench_long_session.py fails on prefill-bounded by architecture (every /v1/chat/completions request re-prefills the full conversation history). The session-bound gRPC contract makes prefill cost depend only on the size of each new user message, regardless of how long the conversation is. PR-E1b measures this empirically. What ships ---------- inference_engine/bench/ (new package) __init__.py session_long_run.py [56 stmts, 100% covered] Pure-Python aggregation helpers split out of the CLI script so they can be unit-tested under the Linux 100% coverage gate. Exposes: _percentile linear-interpolated, no numpy dep _kv_bounded tolerance band on KV-bytes series _prefill_bounded tail-vs-head p50 latency drift gate _latency_drift_p50_s drift in seconds _bucketize_10min 10-minute bucket breakdown for long runs aggregate_run the full report builder scripts/start_grpc_runtime_server.py (new, CLI) Boots a real Qwen3 verifier (CPU or MLX), wires it through SessionStore + AppendTokensCoordinator + GenerationCoordinator, serves the v0.3 gRPC RuntimeService. Pulls slab dims (num_layers, num_kv_heads, head_dim) from the verifier's HF config so GetSessionInfo.kv_live_bytes reports physically meaningful bytes. Symmetric to scripts/serve.py for the deprecated HTTP shim. CLI plumbing exempt from coverage by the same convention. scripts/bench_agentic/bench_session_long_run.py (new, CLI) Walks ONE gRPC session through many turns, recording per-turn latency and session.info().kv_live_bytes. Tokenizes ONLY the new user message per turn (the whole point of session-bound runtime: prefill cost is O(new_user_message), not O(history)). Writes JSON via atomic os.replace + 10-min partial checkpoints so a host reboot mid-run doesn't lose evidence. tests/inference_engine/bench/test_session_long_run.py (new, 35 tests) Covers the aggregator to 100%: _percentile (5 tests), _kv_bounded (5), _prefill_bounded (5), _latency_drift_p50_s (3), _bucketize_10min (6), aggregate_run (7) including empty / all-error / mixed / bounded / unbounded / custom-threshold paths. scripts/review_pr_e1b_on_mac.sh (new, executable) Default invocation: 30-min smoke. Boots the gRPC server in the background, runs the bench against it, kills the server, prints the headline KPIs. Two helper subcommands print the bare server and 4-hour bench commands for separate-terminal manual runs: bash scripts/review_pr_e1b_on_mac.sh --print-server-cmd bash scripts/review_pr_e1b_on_mac.sh --print-4h-cmd CI wiring --------- .github/workflows/ci.yaml + tests/inference_engine/bench/ in the Linux pytest gate + --cov=inference_engine.bench in the coverage gate Local verification (Linux VM, py3.12) ------------------------------------- Linux CI gate (extended path set + new bench tests): 730 passed, 100% coverage on 1750 stmts (was 682 / 1660 stmts). +35 new tests under tests/inference_engine/bench/. CLI scripts: py_compile passes on both new scripts. Runtime validation happens on Mac M4 via review_pr_e1b_on_mac.sh. Per ADR 0008 \u00a79 ---------------- This PR ships CLI plumbing + a pure-Python aggregator. The aggregator is fully covered on Linux. The CLI scripts that drive real model weights are platform-agnostic Python but only validated end-to-end on Mac M4. Reviewer pushes the 30-min smoke JSON to the PR branch; the 4-hour evidence run is committed separately when wall-clock budget allows. Sequence for the 4-hour evidence run (per user spec, exactly): 1. Start gRPC server (Mac mini local, Qwen3-0.6B): PYTHONPATH=.:sdks/python python3 \ scripts/start_grpc_runtime_server.py \ --backend cpu --verifier-id Qwen/Qwen3-0.6B \ --bind 127.0.0.1:50051 \ --capacity 1 --sink 4 --window 64 2. Run bench: PYTHONPATH=.:sdks/python python3 \ scripts/bench_agentic/bench_session_long_run.py \ --grpc-address 127.0.0.1:50051 \ --tokenizer-id Qwen/Qwen3-0.6B \ --duration-s 14400 --turn-spacing-s 30 \ --output results/platform-tests/bench_session_4h_$(date +%s).json 3. Commit JSON to branch. Stack ----- PR-E1b is independent of PR-D1 (#49) and PR-E1 (#50) at the file level. Branched off main directly. Can merge in any order with the others. Co-authored-by: FluffyAIcode <FluffyAIcode@users.noreply.github.com>
1 parent 0642902 commit c774295

8 files changed

Lines changed: 1266 additions & 0 deletions

File tree

.github/workflows/ci.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ jobs:
7878
tests/inference_engine/scheduler/ \
7979
tests/inference_engine/pipeline/ \
8080
tests/inference_engine/session/ \
81+
tests/inference_engine/bench/ \
8182
tests/sdk/python/ \
8283
tests/training/repr_align/ \
8384
tests/backends/mlx/test_env.py \
@@ -86,6 +87,7 @@ jobs:
8687
--cov=inference_engine.scheduler \
8788
--cov=inference_engine.pipeline \
8889
--cov=inference_engine.session \
90+
--cov=inference_engine.bench \
8991
--cov=kakeya \
9092
--cov=training.repr_align \
9193
--cov-report=term \

inference_engine/bench/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
"""Pure-Python aggregation helpers used by ``scripts/bench_agentic/``.
2+
3+
These helpers are split out of the CLI scripts so they can be unit-
4+
tested under the Linux 100% coverage gate. The CLI scripts that
5+
import them are themselves exempt from the coverage gate (CLI
6+
plumbing convention; see ``scripts/serve.py`` for precedent).
7+
"""
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
"""Pure aggregation helpers for the gRPC long-session bench.
2+
3+
The bench script under ``scripts/bench_agentic/bench_session_long_run.py``
4+
walks one gRPC session through many turns, recording per-turn
5+
metrics: latency, KV bytes, history length, error / success. After
6+
the run it calls :func:`aggregate_run` here to compute the headline
7+
KPIs:
8+
9+
* ``kv_bounded`` — does ``kv_live_bytes`` stay under a tight band
10+
across all turns? (ADR 0006 §2.3.a, ADR 0008 §7 G2.)
11+
* ``prefill_bounded`` — does per-turn latency stay flat as the
12+
history grows? (ADR 0008 §7 G2 prefill claim, the v0.3 GA gate
13+
that was a non-claim on the deprecated HTTP shim.)
14+
* Latency p50/p95, KV min/mean/max, n_turns, n_errors.
15+
16+
Splitting this out of the CLI script means the aggregation logic is
17+
fully unit-testable and the script itself stays focused on IO. The
18+
script also computes a 10-minute bucket breakdown for visual sanity-
19+
check on long runs (4h+); that bucketing logic lives here too.
20+
"""
21+
22+
from __future__ import annotations
23+
24+
import statistics
25+
from typing import Any, Dict, List, Optional
26+
27+
28+
# ---------------------------------------------------------------------------
29+
# Aggregation
30+
# ---------------------------------------------------------------------------
31+
32+
33+
def _percentile(values: List[float], pct: float) -> Optional[float]:
34+
"""Linear-interpolated percentile, ``None`` if input is empty.
35+
36+
Implemented locally instead of pulling in ``numpy`` so the bench
37+
has no scientific-stack dependency.
38+
"""
39+
if not values:
40+
return None
41+
if not 0.0 <= pct <= 1.0:
42+
raise ValueError(f"pct must be in [0, 1], got {pct}")
43+
sorted_values = sorted(values)
44+
if len(sorted_values) == 1:
45+
return float(sorted_values[0])
46+
rank = pct * (len(sorted_values) - 1)
47+
lo = int(rank)
48+
hi = min(lo + 1, len(sorted_values) - 1)
49+
frac = rank - lo
50+
return float(sorted_values[lo] + (sorted_values[hi] - sorted_values[lo]) * frac)
51+
52+
53+
def _kv_bounded(kv_values: List[int], *, tolerance: float = 0.10) -> Optional[bool]:
54+
"""Returns ``True`` iff the KV-bytes series stays within
55+
``tolerance`` (default 10%) of its minimum across every turn.
56+
57+
Returns ``None`` when there are not enough successful turns to
58+
answer (≤1 sample). The tolerance is a relative band — if the
59+
minimum is 0 we treat that as a pathologically small denominator
60+
and use ``max(min, 1)`` to avoid div-by-zero, the same convention
61+
``bench_long_session.py`` uses.
62+
"""
63+
if len(kv_values) <= 1:
64+
return None
65+
lo = min(kv_values)
66+
hi = max(kv_values)
67+
return (hi - lo) / max(lo, 1) < tolerance
68+
69+
70+
def _prefill_bounded(
71+
latencies: List[float],
72+
*,
73+
head_window: int = 5,
74+
tail_window: int = 5,
75+
drift_threshold_s: float = 5.0,
76+
) -> Optional[bool]:
77+
"""Returns ``True`` iff median per-turn latency on the LAST
78+
``tail_window`` turns is within ``drift_threshold_s`` seconds of
79+
the median on the FIRST ``head_window`` turns.
80+
81+
This is the prefill-bounded contract: a healthy session-bound
82+
runtime processes only the new user message per turn, so latency
83+
should not grow with conversation length. On the deprecated HTTP
84+
shim, by contrast, every turn re-prefills the full history and
85+
latency grows linearly — that's the failure mode this metric
86+
catches.
87+
88+
``None`` when the run is too short to bracket head and tail
89+
windows without overlap.
90+
"""
91+
if len(latencies) < head_window + tail_window:
92+
return None
93+
head = latencies[:head_window]
94+
tail = latencies[-tail_window:]
95+
head_p50 = statistics.median(head)
96+
tail_p50 = statistics.median(tail)
97+
return (tail_p50 - head_p50) <= drift_threshold_s
98+
99+
100+
def _latency_drift_p50_s(
101+
latencies: List[float],
102+
*,
103+
head_window: int = 5,
104+
tail_window: int = 5,
105+
) -> Optional[float]:
106+
"""Drift in seconds between head-window p50 and tail-window p50.
107+
108+
Positive = latency grew over the run. Returns ``None`` for
109+
runs too short to bracket head and tail without overlap.
110+
"""
111+
if len(latencies) < head_window + tail_window:
112+
return None
113+
head = latencies[:head_window]
114+
tail = latencies[-tail_window:]
115+
return float(statistics.median(tail) - statistics.median(head))
116+
117+
118+
def _bucketize_10min(turns: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
119+
"""Partition successful turns by their wall-clock bucket
120+
(10-minute granularity, indexed from 0). Each bucket reports
121+
``n_turns``, p50/p95 latency, and mean kv_live_bytes — gives a
122+
visual sanity check of latency / memory drift across a long run.
123+
124+
Empty input or all-error input returns an empty list.
125+
"""
126+
buckets: Dict[int, List[Dict[str, Any]]] = {}
127+
for t in turns:
128+
if not t.get("ok"):
129+
continue
130+
bucket_idx = int(t["t_relative_s"] // 600)
131+
buckets.setdefault(bucket_idx, []).append(t)
132+
133+
out: List[Dict[str, Any]] = []
134+
for idx in sorted(buckets):
135+
items = buckets[idx]
136+
latencies = [float(t["latency_s"]) for t in items]
137+
kv_values = [
138+
int(t["kv_live_bytes"]) for t in items
139+
if t.get("kv_live_bytes") is not None
140+
]
141+
out.append(
142+
{
143+
"bucket_index": idx,
144+
"n_turns": len(items),
145+
"p50_latency_s": _percentile(latencies, 0.50),
146+
"p95_latency_s": _percentile(latencies, 0.95),
147+
"mean_kv_live_bytes": (
148+
statistics.mean(kv_values) if kv_values else None
149+
),
150+
}
151+
)
152+
return out
153+
154+
155+
def aggregate_run(
156+
turns: List[Dict[str, Any]],
157+
*,
158+
duration_s: float,
159+
kv_tolerance: float = 0.10,
160+
drift_head_window: int = 5,
161+
drift_tail_window: int = 5,
162+
drift_threshold_s: float = 5.0,
163+
) -> Dict[str, Any]:
164+
"""Build the aggregate report from a list of per-turn records.
165+
166+
Each turn dict must carry at least:
167+
* ``ok`` — bool
168+
* ``t_relative_s`` — float, seconds since run start
169+
* ``latency_s`` — float (only if ``ok``)
170+
* ``kv_live_bytes`` — int or ``None`` (only if ``ok``)
171+
172+
Returns a dict with the headline KPIs ADR 0006 §2.3.a / ADR 0008
173+
§7 G2 speak to: ``kv_bounded``, ``prefill_bounded``, latency
174+
p50/p95, kv min/mean/max, error count, 10-minute bucket break-
175+
down.
176+
"""
177+
successes = [t for t in turns if t.get("ok")]
178+
errors = [t for t in turns if not t.get("ok")]
179+
180+
latencies = [float(t["latency_s"]) for t in successes]
181+
kv_values = [
182+
int(t["kv_live_bytes"]) for t in successes
183+
if t.get("kv_live_bytes") is not None
184+
]
185+
186+
return {
187+
"n_turns": len(successes),
188+
"n_errors": len(errors),
189+
"duration_s": float(duration_s),
190+
"p50_latency_s": _percentile(latencies, 0.50),
191+
"p95_latency_s": _percentile(latencies, 0.95),
192+
"min_kv_live_bytes": min(kv_values) if kv_values else None,
193+
"mean_kv_live_bytes": (
194+
statistics.mean(kv_values) if kv_values else None
195+
),
196+
"max_kv_live_bytes": max(kv_values) if kv_values else None,
197+
"kv_bounded": _kv_bounded(kv_values, tolerance=kv_tolerance),
198+
"prefill_bounded": _prefill_bounded(
199+
latencies,
200+
head_window=drift_head_window,
201+
tail_window=drift_tail_window,
202+
drift_threshold_s=drift_threshold_s,
203+
),
204+
"latency_drift_p50_s": _latency_drift_p50_s(
205+
latencies,
206+
head_window=drift_head_window,
207+
tail_window=drift_tail_window,
208+
),
209+
"buckets_10min": _bucketize_10min(turns),
210+
}

0 commit comments

Comments
 (0)