Skip to content

Commit 584fc1e

Browse files
fluffy314fluffy314
authored andcommitted
Merge remote-tracking branch 'origin/main' into AgentMemory/v030-pr-n2-remove-engine-tokenizer-doubles-8e7f
Co-authored-by: Cursor <cursoragent@cursor.com> # Conflicts: # .github/workflows/ci.yaml # tests/integration/conftest.py
2 parents 0f2493b + 6e674c4 commit 584fc1e

39 files changed

Lines changed: 3704 additions & 1872 deletions

.github/workflows/ci.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ jobs:
8888
tests/inference_engine/scheduler/ \
8989
tests/inference_engine/pipeline/ \
9090
tests/inference_engine/session/ \
91+
tests/inference_engine/bench/ \
9192
tests/sdk/python/ \
9293
tests/training/repr_align/ \
9394
tests/backends/mlx/test_env.py \

docs/adr/0008-session-bound-runtime-and-grpc-protocol.md

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -775,12 +775,44 @@ parallelize.
775775

776776
### 6.4 Phase D — Deprecated HTTP+SSE shim
777777

778-
- **PR-D1**: Update `inference_engine/server/app.py` so each
779-
`/v1/chat/completions` request creates a single-shot session under
780-
the new `SessionStore`, prefills, generates, and closes. Removes
781-
any path-selection / cross-request logic (none of which exists on
782-
`main` after C3). Adds `Deprecation` / `Sunset` headers. Updates
783-
the existing 461-test integration suite to match.
778+
*(scope split, recorded 2026-06-01 during implementation of PR-D1.)*
779+
780+
The original PR-D1 entry conflated two coupled changes:
781+
782+
(a) Remove the ADR 0007 dead code from the server-side surface
783+
(path_selection metrics, `_emit_path_selection_metric` helper,
784+
`engine_result` field on the scheduler session, etc.).
785+
(b) Refactor the HTTP shim's chat-completions handler onto the new
786+
`SessionStore` so each request becomes a single-shot session
787+
(prefill → generate → close) instead of being driven by the
788+
legacy `PooledVerifier`.
789+
790+
(a) is a pure subtraction: the dead code was reachable only from the
791+
ADR 0007 path_select stack that PR-A3 already removed from the
792+
verifier side; the server-side metrics and helpers it left behind
793+
are unreachable at runtime in any healthy completion. (b) is a
794+
larger refactor of feature-frozen code (per §2.7), with a
795+
corresponding test-update tail.
796+
797+
The two are split, same pattern as PR-A3 / PR-A3b:
798+
799+
- **PR-D1** (this PR, dead-code removal): cleans up §6.6 rows for
800+
`app.py` / `engine.py` / `metrics.py` / `scheduler/session.py` /
801+
`bench_long_session.py`. The HTTP shim continues to use
802+
`PooledVerifier` exactly as before; nothing user-observable
803+
changes except the disappearance of the four ADR 0007 metrics
804+
from `/metrics` and the `acceptance_rate` field from the OpenAI
805+
response (the latter was sourced from `engine_result`, which is
806+
gone). 100% Linux unit coverage.
807+
808+
- **PR-D2** (queued, not in PR-D1's diff): the HTTP-shim refactor
809+
proper. Each `/v1/chat/completions` request creates a single-shot
810+
session under `SessionStore`, prefills, generates, and closes;
811+
`PooledVerifier` is retired. Adds `Deprecation` / `Sunset`
812+
headers per §2.7. Updates the existing integration suite to
813+
match. Linux-only path; §9 carve-out continues to apply. PR-D2
814+
is non-blocking for v0.3 GA — the deprecated shim works on
815+
`main` post-PR-D1 in its v0.3.0-rc1 shape, just lighter.
784816

785817
### 6.5 Phase E — Mac M4 integration test marker + CI workflow
786818

inference_engine/backends/mlx/verifier.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,25 @@ def __init__(self, config: Optional[VerifierConfig] = None) -> None:
103103
self.quantization: QuantizationInfo = detect_quantization(self.model)
104104
self.stats = VerifierStats(weight_bytes=self.quantization.total_weight_bytes)
105105

106+
# PR-E1c: precompute per-K/V-token byte cost for the
107+
# ``kv_live_bytes`` accessor. Mirrors the CPU verifier;
108+
# reads dims from the wrapped HF config so GQA / MQA via
109+
# ``num_key_value_heads`` is honored.
110+
cfg = self.model.config if hasattr(self.model, "config") else self.model
111+
num_layers = int(getattr(cfg, "num_hidden_layers"))
112+
num_kv_heads = int(
113+
getattr(cfg, "num_key_value_heads", None)
114+
or getattr(cfg, "num_attention_heads")
115+
)
116+
head_dim = int(
117+
getattr(cfg, "head_dim", None)
118+
or (cfg.hidden_size // cfg.num_attention_heads)
119+
)
120+
itemsize = torch.tensor([], dtype=self.config.dtype).element_size()
121+
self._bytes_per_kv_token = (
122+
num_layers * num_kv_heads * head_dim * itemsize * 2
123+
)
124+
106125
# ---------------------------- public API ---------------------------- #
107126

108127
def reset(self) -> None:
@@ -222,6 +241,17 @@ def k_seq_length(self, session: object) -> int:
222241
del session # unused in v0.3 single-tenant scope
223242
return self._cache_buffer_size()
224243

244+
def kv_live_bytes(self, session: object) -> int:
245+
"""Return the live K/V cache size in bytes for ``session``.
246+
247+
Mirrors the CPU verifier's :meth:`kv_live_bytes`; computed as
248+
``k_seq_length × num_layers × num_kv_heads × head_dim ×
249+
itemsize × 2``. PR-E1c — feeds ``GetSessionInfo.kv_live_bytes``
250+
through the coordinator's slab-write-through.
251+
"""
252+
del session # unused in v0.3 single-tenant scope
253+
return self._cache_buffer_size() * self._bytes_per_kv_token
254+
225255
# --------------------------- internals --------------------------- #
226256

227257
def _cache_buffer_size(self) -> int:

inference_engine/bench/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
"""Pure-Python aggregation helpers used by ``scripts/bench_agentic/``.
2+
3+
These helpers are split out of the CLI scripts so they can be unit-
4+
tested under the Linux 100% coverage gate. The CLI scripts that
5+
import them are themselves exempt from the coverage gate (CLI
6+
plumbing convention; see ``scripts/serve.py`` for precedent).
7+
"""
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
"""Pure aggregation helpers for the gRPC long-session bench.
2+
3+
The bench script under ``scripts/bench_agentic/bench_session_long_run.py``
4+
walks one gRPC session through many turns, recording per-turn
5+
metrics: latency, KV bytes, history length, error / success. After
6+
the run it calls :func:`aggregate_run` here to compute the headline
7+
KPIs:
8+
9+
* ``kv_bounded`` — does ``kv_live_bytes`` stay under a tight band
10+
across all turns? (ADR 0006 §2.3.a, ADR 0008 §7 G2.)
11+
* ``prefill_bounded`` — does per-turn latency stay flat as the
12+
history grows? (ADR 0008 §7 G2 prefill claim, the v0.3 GA gate
13+
that was a non-claim on the deprecated HTTP shim.)
14+
* Latency p50/p95, KV min/mean/max, n_turns, n_errors.
15+
16+
Splitting this out of the CLI script means the aggregation logic is
17+
fully unit-testable and the script itself stays focused on IO. The
18+
script also computes a 10-minute bucket breakdown for visual sanity-
19+
check on long runs (4h+); that bucketing logic lives here too.
20+
"""
21+
22+
from __future__ import annotations
23+
24+
import statistics
25+
from typing import Any, Dict, List, Optional
26+
27+
28+
# ---------------------------------------------------------------------------
29+
# Aggregation
30+
# ---------------------------------------------------------------------------
31+
32+
33+
def _percentile(values: List[float], pct: float) -> Optional[float]:
34+
"""Linear-interpolated percentile, ``None`` if input is empty.
35+
36+
Implemented locally instead of pulling in ``numpy`` so the bench
37+
has no scientific-stack dependency.
38+
"""
39+
if not values:
40+
return None
41+
if not 0.0 <= pct <= 1.0:
42+
raise ValueError(f"pct must be in [0, 1], got {pct}")
43+
sorted_values = sorted(values)
44+
if len(sorted_values) == 1:
45+
return float(sorted_values[0])
46+
rank = pct * (len(sorted_values) - 1)
47+
lo = int(rank)
48+
hi = min(lo + 1, len(sorted_values) - 1)
49+
frac = rank - lo
50+
return float(sorted_values[lo] + (sorted_values[hi] - sorted_values[lo]) * frac)
51+
52+
53+
def _kv_bounded(kv_values: List[int], *, tolerance: float = 0.10) -> Optional[bool]:
54+
"""Returns ``True`` iff the KV-bytes series stays within
55+
``tolerance`` (default 10%) of its minimum across every turn.
56+
57+
Returns ``None`` when there are not enough successful turns to
58+
answer (≤1 sample). The tolerance is a relative band — if the
59+
minimum is 0 we treat that as a pathologically small denominator
60+
and use ``max(min, 1)`` to avoid div-by-zero, the same convention
61+
``bench_long_session.py`` uses.
62+
"""
63+
if len(kv_values) <= 1:
64+
return None
65+
lo = min(kv_values)
66+
hi = max(kv_values)
67+
return (hi - lo) / max(lo, 1) < tolerance
68+
69+
70+
def _prefill_bounded(
71+
latencies: List[float],
72+
*,
73+
head_window: int = 5,
74+
tail_window: int = 5,
75+
drift_threshold_s: float = 5.0,
76+
) -> Optional[bool]:
77+
"""Returns ``True`` iff median per-turn latency on the LAST
78+
``tail_window`` turns is within ``drift_threshold_s`` seconds of
79+
the median on the FIRST ``head_window`` turns.
80+
81+
This is the prefill-bounded contract: a healthy session-bound
82+
runtime processes only the new user message per turn, so latency
83+
should not grow with conversation length. On the deprecated HTTP
84+
shim, by contrast, every turn re-prefills the full history and
85+
latency grows linearly — that's the failure mode this metric
86+
catches.
87+
88+
``None`` when the run is too short to bracket head and tail
89+
windows without overlap.
90+
"""
91+
if len(latencies) < head_window + tail_window:
92+
return None
93+
head = latencies[:head_window]
94+
tail = latencies[-tail_window:]
95+
head_p50 = statistics.median(head)
96+
tail_p50 = statistics.median(tail)
97+
return (tail_p50 - head_p50) <= drift_threshold_s
98+
99+
100+
def _latency_drift_p50_s(
101+
latencies: List[float],
102+
*,
103+
head_window: int = 5,
104+
tail_window: int = 5,
105+
) -> Optional[float]:
106+
"""Drift in seconds between head-window p50 and tail-window p50.
107+
108+
Positive = latency grew over the run. Returns ``None`` for
109+
runs too short to bracket head and tail without overlap.
110+
"""
111+
if len(latencies) < head_window + tail_window:
112+
return None
113+
head = latencies[:head_window]
114+
tail = latencies[-tail_window:]
115+
return float(statistics.median(tail) - statistics.median(head))
116+
117+
118+
def _bucketize_10min(turns: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
119+
"""Partition successful turns by their wall-clock bucket
120+
(10-minute granularity, indexed from 0). Each bucket reports
121+
``n_turns``, p50/p95 latency, and mean kv_live_bytes — gives a
122+
visual sanity check of latency / memory drift across a long run.
123+
124+
Empty input or all-error input returns an empty list.
125+
"""
126+
buckets: Dict[int, List[Dict[str, Any]]] = {}
127+
for t in turns:
128+
if not t.get("ok"):
129+
continue
130+
bucket_idx = int(t["t_relative_s"] // 600)
131+
buckets.setdefault(bucket_idx, []).append(t)
132+
133+
out: List[Dict[str, Any]] = []
134+
for idx in sorted(buckets):
135+
items = buckets[idx]
136+
latencies = [float(t["latency_s"]) for t in items]
137+
kv_values = [
138+
int(t["kv_live_bytes"]) for t in items
139+
if t.get("kv_live_bytes") is not None
140+
]
141+
out.append(
142+
{
143+
"bucket_index": idx,
144+
"n_turns": len(items),
145+
"p50_latency_s": _percentile(latencies, 0.50),
146+
"p95_latency_s": _percentile(latencies, 0.95),
147+
"mean_kv_live_bytes": (
148+
statistics.mean(kv_values) if kv_values else None
149+
),
150+
}
151+
)
152+
return out
153+
154+
155+
def aggregate_run(
156+
turns: List[Dict[str, Any]],
157+
*,
158+
duration_s: float,
159+
kv_tolerance: float = 0.10,
160+
drift_head_window: int = 5,
161+
drift_tail_window: int = 5,
162+
drift_threshold_s: float = 5.0,
163+
) -> Dict[str, Any]:
164+
"""Build the aggregate report from a list of per-turn records.
165+
166+
Each turn dict must carry at least:
167+
* ``ok`` — bool
168+
* ``t_relative_s`` — float, seconds since run start
169+
* ``latency_s`` — float (only if ``ok``)
170+
* ``kv_live_bytes`` — int or ``None`` (only if ``ok``)
171+
172+
Returns a dict with the headline KPIs ADR 0006 §2.3.a / ADR 0008
173+
§7 G2 speak to: ``kv_bounded``, ``prefill_bounded``, latency
174+
p50/p95, kv min/mean/max, error count, 10-minute bucket break-
175+
down.
176+
"""
177+
successes = [t for t in turns if t.get("ok")]
178+
errors = [t for t in turns if not t.get("ok")]
179+
180+
latencies = [float(t["latency_s"]) for t in successes]
181+
kv_values = [
182+
int(t["kv_live_bytes"]) for t in successes
183+
if t.get("kv_live_bytes") is not None
184+
]
185+
186+
return {
187+
"n_turns": len(successes),
188+
"n_errors": len(errors),
189+
"duration_s": float(duration_s),
190+
"p50_latency_s": _percentile(latencies, 0.50),
191+
"p95_latency_s": _percentile(latencies, 0.95),
192+
"min_kv_live_bytes": min(kv_values) if kv_values else None,
193+
"mean_kv_live_bytes": (
194+
statistics.mean(kv_values) if kv_values else None
195+
),
196+
"max_kv_live_bytes": max(kv_values) if kv_values else None,
197+
"kv_bounded": _kv_bounded(kv_values, tolerance=kv_tolerance),
198+
"prefill_bounded": _prefill_bounded(
199+
latencies,
200+
head_window=drift_head_window,
201+
tail_window=drift_tail_window,
202+
drift_threshold_s=drift_threshold_s,
203+
),
204+
"latency_drift_p50_s": _latency_drift_p50_s(
205+
latencies,
206+
head_window=drift_head_window,
207+
tail_window=drift_tail_window,
208+
),
209+
"buckets_10min": _bucketize_10min(turns),
210+
}

inference_engine/scheduler/scheduler.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -358,12 +358,12 @@ def on_token(tok_id: int) -> bool:
358358
session.eos_token_ids, on_token,
359359
)
360360

361-
# Out of engine lock — finalize state.
362-
# Stash the engine result on the session so route handlers
363-
# can read path-selection observability fields (ADR 0007
364-
# §2.10) and acceptance rate. tokens were already streamed
365-
# via on_token.
366-
session.engine_result = result
361+
# Out of engine lock — finalize state. Tokens were already
362+
# streamed via on_token; the engine result is otherwise
363+
# discarded (PR-D1 of ADR 0008 removed the engine_result
364+
# stash that ADR 0007 §2.10 used for path-selection
365+
# observability).
366+
del result
367367
if session.state == SessionState.CANCELLED:
368368
# Already counted by cancel_session caller; we just
369369
# observe the terminal state here.

inference_engine/scheduler/session.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,6 @@ class Session:
6464
# the scheduler.iter_tokens() async iterator drain this; the
6565
# scheduler's worker pushes into it.
6666
token_queue: asyncio.Queue = field(default_factory=lambda: asyncio.Queue())
67-
# The engine's full result, set by the scheduler worker after
68-
# ``engine.generate()`` returns. Route handlers read this to
69-
# populate ADR 0007 §2.10 path-selection observability metrics
70-
# (path_selection, tokens_skipped, prefill_duration_seconds) and
71-
# acceptance-rate stats. ``None`` until the engine returns —
72-
# callers must check before reading.
73-
engine_result: Optional[object] = None
7467

7568
def __post_init__(self) -> None:
7669
if not self.prompt_ids:

0 commit comments

Comments
 (0)