Skip to content

Commit ac3b1da

Browse files
authored
Merge pull request #52 from FluffyAIcode/AgentMemory/v030-pr-e1c-fix-kv-live-bytes-reporting-8e7f
PR-E1c: fix kv_live_bytes reporting path
2 parents 9773190 + c17ae31 commit ac3b1da

10 files changed

Lines changed: 591 additions & 9 deletions

File tree

inference_engine/backends/mlx/verifier.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,25 @@ def __init__(self, config: Optional[VerifierConfig] = None) -> None:
103103
self.quantization: QuantizationInfo = detect_quantization(self.model)
104104
self.stats = VerifierStats(weight_bytes=self.quantization.total_weight_bytes)
105105

106+
# PR-E1c: precompute per-K/V-token byte cost for the
107+
# ``kv_live_bytes`` accessor. Mirrors the CPU verifier;
108+
# reads dims from the wrapped HF config so GQA / MQA via
109+
# ``num_key_value_heads`` is honored.
110+
cfg = self.model.config if hasattr(self.model, "config") else self.model
111+
num_layers = int(getattr(cfg, "num_hidden_layers"))
112+
num_kv_heads = int(
113+
getattr(cfg, "num_key_value_heads", None)
114+
or getattr(cfg, "num_attention_heads")
115+
)
116+
head_dim = int(
117+
getattr(cfg, "head_dim", None)
118+
or (cfg.hidden_size // cfg.num_attention_heads)
119+
)
120+
itemsize = torch.tensor([], dtype=self.config.dtype).element_size()
121+
self._bytes_per_kv_token = (
122+
num_layers * num_kv_heads * head_dim * itemsize * 2
123+
)
124+
106125
# ---------------------------- public API ---------------------------- #
107126

108127
def reset(self) -> None:
@@ -222,6 +241,17 @@ def k_seq_length(self, session: object) -> int:
222241
del session # unused in v0.3 single-tenant scope
223242
return self._cache_buffer_size()
224243

244+
def kv_live_bytes(self, session: object) -> int:
245+
"""Return the live K/V cache size in bytes for ``session``.
246+
247+
Mirrors the CPU verifier's :meth:`kv_live_bytes`; computed as
248+
``k_seq_length × num_layers × num_kv_heads × head_dim ×
249+
itemsize × 2``. PR-E1c — feeds ``GetSessionInfo.kv_live_bytes``
250+
through the coordinator's slab-write-through.
251+
"""
252+
del session # unused in v0.3 single-tenant scope
253+
return self._cache_buffer_size() * self._bytes_per_kv_token
254+
225255
# --------------------------- internals --------------------------- #
226256

227257
def _cache_buffer_size(self) -> int:

inference_engine/session/coordinator.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,28 @@ def commit_or_truncate(self, *, forwarded: int, accepted: int) -> None:
9696
def k_seq_length(self, session: Session) -> int:
9797
... # pragma: no cover - Protocol body, never executed
9898

99+
def kv_live_bytes(self, session: Session) -> int:
100+
... # pragma: no cover - Protocol body, never executed
101+
102+
103+
def _sync_slab_bytes(session: Session, verifier: "VerifierProtocol") -> None:
104+
"""Mirror the verifier's current KV byte count onto the session's
105+
slab placeholder (PR-E1c).
106+
107+
The slab's ``live_kv_bytes`` is the source of truth for
108+
:meth:`Session.kv_live_bytes`, which in turn feeds
109+
``GetSessionInfo.kv_live_bytes`` over gRPC. The verifier owns
110+
the actual K/V tensors; the slab is a placeholder that holds
111+
one capacity unit per active session. Without this sync the
112+
gauge reads 0 forever (PR-E1b's 4h bench surfaced this).
113+
114+
No-op when the session has no slab (pool-less SessionStore — the
115+
test / pure-data-layer mode the coordinator unit tests use).
116+
"""
117+
if session.slab is None:
118+
return
119+
session.slab.live_kv_bytes_override = int(verifier.kv_live_bytes(session))
120+
99121

100122
class AppendTokensCoordinator:
101123
"""Orchestrator for the §2.3 byte-exact prefill-incremental contract.
@@ -194,4 +216,9 @@ def append_tokens(
194216
session_id, self._verifier.next_global_position,
195217
)
196218

219+
# Mirror the verifier's current KV byte count onto the slab
220+
# so GetSessionInfo.kv_live_bytes reports physical bytes
221+
# rather than the slab's placeholder zero. PR-E1c.
222+
_sync_slab_bytes(session, self._verifier)
223+
197224
return new_history_length

inference_engine/session/generator.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,10 @@
5151

5252
import torch
5353

54-
from inference_engine.session.coordinator import VerifierProtocol
54+
from inference_engine.session.coordinator import (
55+
VerifierProtocol,
56+
_sync_slab_bytes,
57+
)
5558
from inference_engine.session.store import SessionStore
5659

5760

@@ -226,6 +229,12 @@ def generate(
226229
yield TokenEvent(token_id=next_token)
227230

228231
if next_token in eos_set:
232+
# Mirror final KV bytes onto the slab so the next
233+
# GetSessionInfo reads the correct live count
234+
# (PR-E1c). Once the cache is at sink+window
235+
# capacity, this value plateaus and the caller can
236+
# observe the architectural KV bound empirically.
237+
_sync_slab_bytes(session, self._verifier)
229238
yield DoneEvent(
230239
stop_reason=STOP_REASON_EOS,
231240
generated_token_count=generated_count,
@@ -234,6 +243,7 @@ def generate(
234243
)
235244
return
236245

246+
_sync_slab_bytes(session, self._verifier)
237247
yield DoneEvent(
238248
stop_reason=STOP_REASON_MAX_TOKENS,
239249
generated_token_count=generated_count,

inference_engine/session/store.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -182,14 +182,23 @@ def idle_seconds(self) -> float:
182182
return time.monotonic() - self.last_active_at
183183

184184
def kv_live_bytes(self) -> int:
185-
"""Live KV bytes held by this session's slab.
186-
187-
Returns the slab's reported live KV bytes (the verifier
188-
wiring keeps ``slab.live_kv_bytes_override`` synced to its
189-
real ``stats.peak_kv_bytes`` snapshot — see
190-
``PooledVerifier._sync_slab_bytes`` for the existing CPU/MLX
191-
contract that PR-A3b reuses). When the session has no slab
192-
(pool-less store), returns 0.
185+
"""Live KV bytes held by this session's KV cache.
186+
187+
Returns the slab's ``live_kv_bytes_override`` field, which is
188+
kept in sync with the verifier's true cache size by:
189+
190+
* The HTTP shim's :class:`PooledVerifier._sync_slab_bytes`
191+
(writes ``verifier.stats.peak_kv_bytes`` after every
192+
forward — running max).
193+
* The gRPC path's coordinator-level ``_sync_slab_bytes``
194+
helper (writes ``verifier.kv_live_bytes(session)`` after
195+
every forward — current live bytes; PR-E1c).
196+
197+
The ``Session`` object itself never knows about the verifier;
198+
the slab is the single piece of session-bound state that
199+
bridges the verifier and the gRPC ``GetSessionInfo`` field.
200+
201+
Returns 0 when the session has no slab (pool-less store).
193202
"""
194203
if self.slab is None:
195204
return 0

kv_cache_proposer/verifier.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,27 @@ def __init__(self, config: Optional[VerifierConfig] = None) -> None:
9898
weight_bytes=sum(p.numel() * p.element_size() for p in self.model.parameters())
9999
)
100100

101+
# PR-E1c: precompute per-K/V-token byte cost so the
102+
# ``kv_live_bytes`` accessor is O(1). Two factors of 2 — one
103+
# for K + V, one already absorbed into the dim product. Read
104+
# the dims from the HF config so GQA / MQA variants
105+
# (Qwen3 / Gemma / DeepSeek) are accounted for correctly via
106+
# ``num_key_value_heads`` rather than ``num_attention_heads``.
107+
cfg = self.model.config
108+
num_layers = int(getattr(cfg, "num_hidden_layers"))
109+
num_kv_heads = int(
110+
getattr(cfg, "num_key_value_heads", None)
111+
or getattr(cfg, "num_attention_heads")
112+
)
113+
head_dim = int(
114+
getattr(cfg, "head_dim", None)
115+
or (cfg.hidden_size // cfg.num_attention_heads)
116+
)
117+
itemsize = torch.tensor([], dtype=self.config.dtype).element_size()
118+
self._bytes_per_kv_token = (
119+
num_layers * num_kv_heads * head_dim * itemsize * 2
120+
)
121+
101122
# ---------------------------- public API ---------------------------- #
102123
def reset(self) -> None:
103124
self.cache = DynamicCache(config=self.model.config)
@@ -320,6 +341,23 @@ def k_seq_length(self, session: object) -> int:
320341
del session # unused in v0.3 single-tenant scope
321342
return self._cache_seq_length()
322343

344+
def kv_live_bytes(self, session: object) -> int:
345+
"""Return the live K/V cache size in bytes for ``session``.
346+
347+
Implements the :class:`VerifierProtocol.kv_live_bytes` contract
348+
introduced by PR-E1c. Computed as
349+
``k_seq_length × num_layers × num_kv_heads × head_dim ×
350+
itemsize × 2`` (the trailing 2 = K + V).
351+
352+
After PR-E1c the gRPC ``GetSessionInfo.kv_live_bytes`` field is
353+
sourced from this method via the coordinator's slab-write-
354+
through (PR-E1b's 4h bench surfaced that the previous source —
355+
``slab.live_kv_bytes`` — was always 0 because the slab is a
356+
capacity placeholder, not a real KV-tensor sink).
357+
"""
358+
del session # unused in v0.3 single-tenant scope
359+
return self._cache_seq_length() * self._bytes_per_kv_token
360+
323361
def _assert_cache_invariant_1(self) -> None:
324362
"""ADR 0007 §2.9 INV-1: parallel-sequence consistency.
325363

scripts/review_pr_e1c_on_mac.sh

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
#!/usr/bin/env bash
2+
# Mac M4 review aid for PR-E1c (kv_live_bytes reporting fix).
3+
#
4+
# This PR closes the GetSessionInfo.kv_live_bytes=0 reporting bug
5+
# PR-E1b's 4-hour bench surfaced. The Linux unit gate exercises the
6+
# coordinator-level slab-write-through against a deterministic
7+
# FakeVerifier. The Mac M4 review here adds two further checks:
8+
#
9+
# 1. The CPU verifier's kv_live_bytes accessor against real
10+
# Qwen3-0.6B numerics — non-zero, plateaus at sink+window
11+
# capacity, equals k_seq_length × per-token bytes.
12+
# 2. A short (5-min) gRPC bench run that confirms
13+
# GetSessionInfo.kv_live_bytes is no longer 0 over the wire.
14+
#
15+
# Produces 2 artifacts:
16+
#
17+
# results/platform-tests/pr-e1c-mac-verifier-tests-<unix>.json
18+
# pytest tests/core/test_verifier.py + tests/backends/mlx/test_verifier.py
19+
# (the kv_live_bytes-related tests + INV-1 baseline).
20+
#
21+
# results/platform-tests/pr-e1c-mac-bench-session-5min-<unix>.json
22+
# bench_session_long_run.py @ 300s. Purpose: visually confirm
23+
# kv_live_bytes goes 0 -> capped multi-MB once cache hits
24+
# sink+window. Expected: kv_bounded=True, prefill_bounded=True,
25+
# min/mean/max kv_live_bytes all > 0.
26+
#
27+
# Usage (from repo root, on Mac M4):
28+
#
29+
# bash scripts/review_pr_e1c_on_mac.sh
30+
#
31+
# Then commit:
32+
#
33+
# git add results/platform-tests/pr-e1c-mac-*
34+
# git commit -m "Mac M4 review evidence for PR-E1c"
35+
# git push
36+
37+
set -euo pipefail
38+
39+
ROOT="$(cd "$(dirname "$0")/.." && pwd)"
40+
cd "$ROOT"
41+
42+
stamp="$(date +%s)"
43+
out_dir="results/platform-tests"
44+
mkdir -p "$out_dir"
45+
46+
# --- Part 1: verifier-level tests -----------------------------------------
47+
verif_junit="$out_dir/pr-e1c-mac-verifier-tests-${stamp}.junit.xml"
48+
verif_report="$out_dir/pr-e1c-mac-verifier-tests-${stamp}.json"
49+
50+
echo "==> CPU + MLX verifier tests covering kv_live_bytes (PR-E1c)"
51+
PYTHONPATH=.:sdks/python python3 -m pytest \
52+
tests/core/test_verifier.py \
53+
tests/backends/mlx/test_verifier.py \
54+
-k "kv_live_bytes or k_seq_length or cache_inspector" \
55+
--junitxml="$verif_junit" \
56+
-v
57+
58+
PYTHONPATH=.:sdks/python python3 - "$verif_junit" "$verif_report" <<'PY'
59+
import json
60+
import platform
61+
import sys
62+
import xml.etree.ElementTree as ET
63+
junit_path, out_path = sys.argv[1:3]
64+
jr = ET.parse(junit_path).getroot()
65+
testsuites = list(jr.iter("testsuite"))
66+
total_tests = sum(int(ts.get("tests", "0")) for ts in testsuites)
67+
total_failures = sum(int(ts.get("failures", "0")) for ts in testsuites)
68+
total_errors = sum(int(ts.get("errors", "0")) for ts in testsuites)
69+
total_skipped = sum(int(ts.get("skipped", "0")) for ts in testsuites)
70+
report = {
71+
"schema_version": 1,
72+
"kind": "pr_e1c_mac_verifier_tests",
73+
"host": {
74+
"platform": platform.platform(),
75+
"machine": platform.machine(),
76+
"python": platform.python_version(),
77+
},
78+
"junit": {
79+
"tests": total_tests, "failures": total_failures,
80+
"errors": total_errors, "skipped": total_skipped,
81+
},
82+
}
83+
with open(out_path, "w", encoding="utf-8") as fh:
84+
json.dump(report, fh, indent=2)
85+
print(f" -> {out_path}")
86+
PY
87+
88+
# --- Part 2: 5-min gRPC bench ---------------------------------------------
89+
# This part requires PR-E1b's scripts/start_grpc_runtime_server.py and
90+
# scripts/bench_agentic/bench_session_long_run.py to be present on the
91+
# checked-out tree. PR-E1c merges *after* PR-E1b in the recommended
92+
# sequence; if PR-E1c is exercised against a tree where PR-E1b hasn't
93+
# landed yet, skip the bench gracefully so Part 1 evidence still
94+
# commits cleanly.
95+
if [[ ! -f scripts/start_grpc_runtime_server.py \
96+
|| ! -f scripts/bench_agentic/bench_session_long_run.py ]]; then
97+
echo
98+
echo "==> Part 2 skipped: PR-E1b artifacts not present on this tree."
99+
echo " Re-run after PR-E1b lands to capture the bench evidence."
100+
echo
101+
echo "==> Done. Commit Part 1 evidence:"
102+
echo " git add $out_dir/pr-e1c-mac-verifier-tests-${stamp}.*"
103+
echo " git commit -m 'Mac M4 review evidence for PR-E1c (verifier tests)'"
104+
echo " git push"
105+
exit 0
106+
fi
107+
108+
bench_json="$out_dir/pr-e1c-mac-bench-session-5min-${stamp}.json"
109+
server_log="$out_dir/pr-e1c-mac-bench-session-5min-${stamp}.server.log"
110+
111+
server_pid=""
112+
cleanup() {
113+
if [[ -n "$server_pid" ]] && kill -0 "$server_pid" 2>/dev/null; then
114+
kill "$server_pid" 2>/dev/null || true
115+
wait "$server_pid" 2>/dev/null || true
116+
fi
117+
}
118+
trap cleanup EXIT
119+
120+
echo
121+
echo "==> starting gRPC server (logs: $server_log)"
122+
PYTHONPATH=.:sdks/python python3 scripts/start_grpc_runtime_server.py \
123+
--backend cpu --verifier-id Qwen/Qwen3-0.6B \
124+
--bind 127.0.0.1:50051 --capacity 1 --sink 4 --window 64 \
125+
>"$server_log" 2>&1 &
126+
server_pid=$!
127+
128+
ready=0
129+
for _ in $(seq 1 60); do
130+
if grep -q "kakeya gRPC RuntimeService listening on" "$server_log" 2>/dev/null; then
131+
ready=1
132+
break
133+
fi
134+
sleep 1
135+
done
136+
137+
if [[ "$ready" != "1" ]]; then
138+
echo "!!! gRPC server didn't become ready"
139+
tail -20 "$server_log" || true
140+
exit 1
141+
fi
142+
143+
echo "==> running 5-min bench (validates kv_live_bytes is non-zero)"
144+
PYTHONPATH=.:sdks/python python3 \
145+
scripts/bench_agentic/bench_session_long_run.py \
146+
--grpc-address 127.0.0.1:50051 \
147+
--tokenizer-id Qwen/Qwen3-0.6B \
148+
--duration-s 300 --turn-spacing-s 30 \
149+
--max-tokens 64 \
150+
--output "$bench_json"
151+
152+
echo
153+
echo "==> Headline KPIs from $bench_json:"
154+
PYTHONPATH=.:sdks/python python3 - "$bench_json" <<'PY'
155+
import json
156+
import sys
157+
with open(sys.argv[1], encoding="utf-8") as fh:
158+
payload = json.load(fh)
159+
agg = payload["agg"]
160+
print(f" n_turns = {agg['n_turns']}")
161+
print(f" n_errors = {agg['n_errors']}")
162+
print(f" p50_latency_s = {agg['p50_latency_s']}")
163+
print(f" kv min/mean/max = "
164+
f"{agg['min_kv_live_bytes']} / "
165+
f"{agg['mean_kv_live_bytes']} / "
166+
f"{agg['max_kv_live_bytes']}")
167+
print(f" kv_bounded = {agg['kv_bounded']}")
168+
print(f" prefill_bounded = {agg['prefill_bounded']}")
169+
m = agg["max_kv_live_bytes"]
170+
if m and m > 0:
171+
print(f" -> kv_live_bytes is non-zero; PR-E1c reporting fix VERIFIED.")
172+
else:
173+
print(f" -> kv_live_bytes is still 0; PR-E1c FAILED.")
174+
sys.exit(1)
175+
PY
176+
177+
echo
178+
echo "==> Done. Commit:"
179+
echo " git add $out_dir/pr-e1c-mac-*"
180+
echo " git commit -m 'Mac M4 review evidence for PR-E1c'"
181+
echo " git push"

0 commit comments

Comments
 (0)