Skip to content

Commit 731689c

Browse files
nv-alichengclaude
andcommitted
chore: address second round of PR #282 review comments
- docs(report_design): add "reports reproducible from event log" principle - refactor(metrics_table): rename subtract_field -> delta_start_fieldname - docs(metrics_table): reword ISL token_ids/text comments so SGLang/OpenAI are examples, not defining conditions - test(kv_store): pin empty SeriesStats min/max sentinels; add snapshot isolation regression test - test(aggregator): add explanatory messages to tracking-window asserts - test(report_builder): pin max/std_dev on empty compute_summary - test(sample_order): parametrize over [3, 100, 10_000] dataset sizes - test(zmq_pool_transport): collapse two pool transport classes into one parametrized over (num_workers, create_publisher) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent d8142f1 commit 731689c

7 files changed

Lines changed: 136 additions & 97 deletions

File tree

docs/metrics/report_design.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,19 @@ this complexity is needed when the input is a `list[float]` from the KVStore.
3838
The entire rollup is a single function: `compute_summary(values) → dict`.
3939
It calls numpy for percentiles and histograms. No classes, no state.
4040

41+
**Reports are reproducible from the event log.**
42+
43+
The KVStore is lossy aggregation — it stores per-metric series, not per-sample
44+
provenance. The authoritative record of what happened during a run is the event
45+
log written by the `EventLoggerService`. Every number in a `Report` can be
46+
recomputed by replaying the event log through the same aggregator logic: if a
47+
production report shows a TTFT spike, the event log is the ground truth a user
48+
can mine to attribute the spike to specific samples or time windows.
49+
50+
New metrics must preserve this property: the aggregator may only derive values
51+
from event fields, never from out-of-band state. If a metric cannot be rebuilt
52+
from the event log alone, it does not belong in the KVStore.
53+
4154
## Components
4255

4356
### `compute_summary(values, percentiles, n_histogram_buckets) → dict`

src/inference_endpoint/async_utils/services/metrics_aggregator/metrics_table.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -150,18 +150,20 @@ def fire(
150150

151151

152152
class TimeDeltaTrigger(EmitTrigger):
153-
"""Sync trigger: emits ev_rec.timestamp_ns - pre_change[required_field].
153+
"""Sync trigger: emits ev_rec.timestamp_ns - pre_change[delta_start_fieldname].
154154
155-
Subclass only needs to set metric_name and the required field name.
156-
Skips silently if the required field is None (event hasn't occurred yet).
155+
The emitted metric is a time delta: the firing event marks the end of the
156+
delta, and ``delta_start_fieldname`` names the SampleField whose timestamp
157+
marks the start. Skips silently if the start field is None (the delta has
158+
not yet opened for this sample).
157159
"""
158160

159-
def __init__(self, metric_name: str, kv_store: KVStore, subtract_field: str):
160-
super().__init__(metric_name, kv_store, requires=(subtract_field,))
161-
self._subtract_field = subtract_field
161+
def __init__(self, metric_name: str, kv_store: KVStore, delta_start_fieldname: str):
162+
super().__init__(metric_name, kv_store, requires=(delta_start_fieldname,))
163+
self._delta_start_fieldname = delta_start_fieldname
162164

163165
def fire(self, ev_rec, row, pre_change):
164-
baseline = pre_change.get(self._subtract_field)
166+
baseline = pre_change.get(self._delta_start_fieldname)
165167
if baseline is not None:
166168
self.kv_store.update(self.metric_name, ev_rec.timestamp_ns - baseline)
167169
return None
@@ -235,7 +237,9 @@ class TtftTrigger(TimeDeltaTrigger):
235237

236238
def __init__(self, kv_store: KVStore):
237239
super().__init__(
238-
MetricSeriesKey.TTFT_NS, kv_store, subtract_field=SampleField.ISSUED_NS
240+
MetricSeriesKey.TTFT_NS,
241+
kv_store,
242+
delta_start_fieldname=SampleField.ISSUED_NS,
239243
)
240244

241245

@@ -249,7 +253,7 @@ def __init__(self, kv_store: KVStore):
249253
super().__init__(
250254
MetricSeriesKey.CHUNK_DELTA_NS,
251255
kv_store,
252-
subtract_field=SampleField.LAST_RECV_NS,
256+
delta_start_fieldname=SampleField.LAST_RECV_NS,
253257
)
254258

255259

@@ -260,7 +264,7 @@ def __init__(self, kv_store: KVStore):
260264
super().__init__(
261265
MetricSeriesKey.SAMPLE_LATENCY_NS,
262266
kv_store,
263-
subtract_field=SampleField.ISSUED_NS,
267+
delta_start_fieldname=SampleField.ISSUED_NS,
264268
)
265269

266270

@@ -281,11 +285,12 @@ def __init__(
281285
super().__init__(MetricSeriesKey.ISL, kv_store, tokenize_pool, loop)
282286

283287
def fire(self, ev_rec, row, pre_change):
284-
# Sync fast path: pre-tokenized IDs (SGLang)
288+
# Sync fast path: any backend that pre-populates token_ids (e.g. SGLang).
285289
if isinstance(ev_rec.data, PromptData) and ev_rec.data.token_ids is not None:
286290
self.kv_store.update(self.metric_name, len(ev_rec.data.token_ids))
287291
return None
288-
# Async path: tokenize raw text (OpenAI) — handled by base class
292+
# Async path: tokenize raw text — used when token_ids are unavailable
293+
# (e.g. OpenAI-compatible endpoints). Handled by the base class.
289294
return super().fire(ev_rec, row, pre_change)
290295

291296
def _extract_text(self, ev_rec, row, pre_change):

tests/unit/async_utils/services/metrics_aggregator/test_aggregator.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,17 @@ async def test_not_tracked_before_start(self):
5858
sample_event(SampleEventType.ISSUED, "s1", ts=100),
5959
]
6060
)
61-
assert agg._table.get_row("s1") is None
62-
assert store.get_series_values("ttft_ns") == []
63-
assert store.get_series_values("sample_latency_ns") == []
61+
assert agg._table.get_row("s1") is None, (
62+
"Sample issued before START_PERFORMANCE_TRACKING must not create a "
63+
"table row — warmup samples should be excluded from the tracked set."
64+
)
65+
assert (
66+
store.get_series_values("ttft_ns") == []
67+
), "No TTFT should be recorded for samples issued before tracking begins."
68+
assert store.get_series_values("sample_latency_ns") == [], (
69+
"No sample_latency should be recorded for samples issued before "
70+
"tracking begins."
71+
)
6472

6573
@pytest.mark.asyncio
6674
async def test_tracked_after_start(self):
@@ -72,7 +80,10 @@ async def test_tracked_after_start(self):
7280
sample_event(SampleEventType.ISSUED, "s1", ts=100),
7381
]
7482
)
75-
assert agg._table.get_row("s1") is not None
83+
assert agg._table.get_row("s1") is not None, (
84+
"Sample issued after START_PERFORMANCE_TRACKING must create a table "
85+
"row so its metrics are included in the tracked set."
86+
)
7687

7788
@pytest.mark.asyncio
7889
async def test_not_tracked_after_stop(self):
@@ -85,7 +96,10 @@ async def test_not_tracked_after_stop(self):
8596
sample_event(SampleEventType.ISSUED, "s1", ts=100),
8697
]
8798
)
88-
assert agg._table.get_row("s1") is None
99+
assert agg._table.get_row("s1") is None, (
100+
"Sample issued after STOP_PERFORMANCE_TRACKING must not create a "
101+
"table row — the tracking window has closed."
102+
)
89103

90104
@pytest.mark.asyncio
91105
async def test_inflight_sample_continues_after_stop(self):

tests/unit/async_utils/services/metrics_aggregator/test_kv_store.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
"""Tests for the KVStore (BasicKVStore + BasicKVStoreReader)."""
1717

18+
import math
1819
import multiprocessing
1920
import struct
2021
from pathlib import Path
@@ -48,6 +49,10 @@ def test_empty(self):
4849
stats = SeriesStats()
4950
assert stats.count == 0
5051
assert stats.total == 0.0
52+
# Sentinel values for an empty series — compute_summary() is responsible
53+
# for normalizing these to 0 before exposing them to users.
54+
assert stats.min_val == math.inf
55+
assert stats.max_val == -math.inf
5156

5257
def test_incremental_rollup(self):
5358
stats = SeriesStats([1.0, 2.0])
@@ -119,6 +124,28 @@ def test_snapshot(self, tmp_path: Path):
119124
assert snap["latency"].count == 2
120125
store.close()
121126

127+
def test_snapshot_is_isolated_from_later_writes(self, tmp_path: Path):
128+
"""Mutations after snapshot() must not alter the captured snapshot."""
129+
store = BasicKVStore(tmp_path / "kv")
130+
store.create_key("n_issued", "counter")
131+
store.create_key("latency", "series")
132+
store.update("n_issued", 5)
133+
store.update("latency", 100)
134+
store.update("latency", 200)
135+
136+
snap = store.snapshot()
137+
138+
store.update("n_issued", 99)
139+
store.update("latency", 300)
140+
141+
assert snap["n_issued"] == 5
142+
latency_snap = snap["latency"]
143+
assert isinstance(latency_snap, SeriesStats)
144+
assert latency_snap.count == 2
145+
assert latency_snap.values == [100, 200]
146+
assert latency_snap.total == 300
147+
store.close()
148+
122149
def test_update_unknown_key_raises(self, tmp_path: Path):
123150
store = BasicKVStore(tmp_path / "kv")
124151
with pytest.raises(KeyError, match="Key not created"):

tests/unit/load_generator/test_sample_order.py

Lines changed: 40 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -23,43 +23,52 @@
2323
WithReplacementSampleOrder,
2424
)
2525

26+
# Exercise small/medium/large dataset sizes so shuffle-buffer behavior is
27+
# covered for inputs both much smaller and much larger than typical batches.
28+
_DATASET_SIZES = [3, 100, 10_000]
29+
2630

2731
@pytest.mark.unit
2832
class TestWithoutReplacementSampleOrder:
29-
def test_yields_all_indices(self):
33+
@pytest.mark.parametrize("n_samples", _DATASET_SIZES)
34+
def test_yields_all_indices(self, n_samples: int):
3035
order = WithoutReplacementSampleOrder(
31-
n_samples_in_dataset=5, rng=random.Random(42)
36+
n_samples_in_dataset=n_samples, rng=random.Random(42)
3237
)
33-
indices = [next(order) for _ in range(5)]
34-
assert sorted(indices) == [0, 1, 2, 3, 4]
38+
indices = [next(order) for _ in range(n_samples)]
39+
assert sorted(indices) == list(range(n_samples))
3540

36-
def test_reshuffles_after_exhaustion(self):
41+
@pytest.mark.parametrize("n_samples", _DATASET_SIZES)
42+
def test_reshuffles_after_exhaustion(self, n_samples: int):
3743
order = WithoutReplacementSampleOrder(
38-
n_samples_in_dataset=3, rng=random.Random(42)
44+
n_samples_in_dataset=n_samples, rng=random.Random(42)
3945
)
40-
first_pass = [next(order) for _ in range(3)]
41-
second_pass = [next(order) for _ in range(3)]
42-
assert sorted(first_pass) == [0, 1, 2]
43-
assert sorted(second_pass) == [0, 1, 2]
46+
first_pass = [next(order) for _ in range(n_samples)]
47+
second_pass = [next(order) for _ in range(n_samples)]
48+
assert sorted(first_pass) == list(range(n_samples))
49+
assert sorted(second_pass) == list(range(n_samples))
4450

45-
def test_never_raises_stop_iteration(self):
51+
@pytest.mark.parametrize("n_samples", _DATASET_SIZES)
52+
def test_never_raises_stop_iteration(self, n_samples: int):
4653
order = WithoutReplacementSampleOrder(
47-
n_samples_in_dataset=2, rng=random.Random(42)
54+
n_samples_in_dataset=n_samples, rng=random.Random(42)
4855
)
4956
# Should be able to draw far more than dataset size
50-
indices = [next(order) for _ in range(100)]
51-
assert len(indices) == 100
52-
assert all(0 <= i < 2 for i in indices)
57+
draws = max(100, n_samples * 3)
58+
indices = [next(order) for _ in range(draws)]
59+
assert len(indices) == draws
60+
assert all(0 <= i < n_samples for i in indices)
5361

54-
def test_reproducible_with_seed(self):
62+
@pytest.mark.parametrize("n_samples", _DATASET_SIZES)
63+
def test_reproducible_with_seed(self, n_samples: int):
5564
order1 = WithoutReplacementSampleOrder(
56-
n_samples_in_dataset=10, rng=random.Random(42)
65+
n_samples_in_dataset=n_samples, rng=random.Random(42)
5766
)
5867
order2 = WithoutReplacementSampleOrder(
59-
n_samples_in_dataset=10, rng=random.Random(42)
68+
n_samples_in_dataset=n_samples, rng=random.Random(42)
6069
)
61-
seq1 = [next(order1) for _ in range(20)]
62-
seq2 = [next(order2) for _ in range(20)]
70+
seq1 = [next(order1) for _ in range(n_samples * 2)]
71+
seq2 = [next(order2) for _ in range(n_samples * 2)]
6372
assert seq1 == seq2
6473

6574
def test_invalid_size_raises(self):
@@ -69,20 +78,22 @@ def test_invalid_size_raises(self):
6978

7079
@pytest.mark.unit
7180
class TestWithReplacementSampleOrder:
72-
def test_yields_valid_indices(self):
81+
@pytest.mark.parametrize("n_samples", _DATASET_SIZES)
82+
def test_yields_valid_indices(self, n_samples: int):
7383
order = WithReplacementSampleOrder(
74-
n_samples_in_dataset=5, rng=random.Random(42)
84+
n_samples_in_dataset=n_samples, rng=random.Random(42)
7585
)
76-
indices = [next(order) for _ in range(100)]
77-
assert all(0 <= i < 5 for i in indices)
86+
indices = [next(order) for _ in range(max(100, n_samples))]
87+
assert all(0 <= i < n_samples for i in indices)
7888

79-
def test_reproducible_with_seed(self):
89+
@pytest.mark.parametrize("n_samples", _DATASET_SIZES)
90+
def test_reproducible_with_seed(self, n_samples: int):
8091
order1 = WithReplacementSampleOrder(
81-
n_samples_in_dataset=10, rng=random.Random(42)
92+
n_samples_in_dataset=n_samples, rng=random.Random(42)
8293
)
8394
order2 = WithReplacementSampleOrder(
84-
n_samples_in_dataset=10, rng=random.Random(42)
95+
n_samples_in_dataset=n_samples, rng=random.Random(42)
8596
)
86-
seq1 = [next(order1) for _ in range(20)]
87-
seq2 = [next(order2) for _ in range(20)]
97+
seq1 = [next(order1) for _ in range(n_samples * 2)]
98+
seq2 = [next(order2) for _ in range(n_samples * 2)]
8899
assert seq1 == seq2

tests/unit/metrics/test_report_builder.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ def test_empty(self):
3737
s = compute_summary(SeriesStats())
3838
assert s["total"] == 0
3939
assert s["min"] == 0
40+
assert s["max"] == 0
41+
assert s["std_dev"] == 0
4042
assert s["histogram"]["buckets"] == []
4143

4244
def test_single_value(self):

tests/unit/transport/test_zmq_pool_transport.py

Lines changed: 18 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -109,60 +109,24 @@ async def test_socket_closed_on_cancellation(self):
109109

110110
@pytest.mark.unit
111111
@pytest.mark.asyncio
112-
class TestZmqPoolTransportWithPublisher:
113-
"""Test pool transport creation with a publisher on the same context."""
114-
115-
async def _create_publisher_and_pool(
116-
self, loop: asyncio.AbstractEventLoop, num_workers: int
117-
):
118-
"""Helper: create publisher + pool transport, test ready check socket."""
119-
sid = uuid.uuid4().hex[:8]
120-
zmq_ctx = ManagedZMQContext(io_threads=2)
121-
publisher = ZmqEventRecordPublisher(f"ev_pub_{sid}", zmq_ctx, loop=loop)
122-
123-
pool = ZmqWorkerPoolTransport.create(
124-
loop, num_workers, config=ZMQTransportConfig()
125-
)
126-
127-
rc = pool._ready_check
128-
assert not rc._sock.closed
129-
_ = rc._sock.rcvtimeo
130-
131-
with pytest.raises(TimeoutError):
132-
await pool.wait_for_workers_ready(timeout=0.1)
133-
134-
pool.cleanup()
135-
publisher.close()
136-
zmq_ctx.cleanup()
137-
138-
async def test_2_workers(self):
139-
loop = asyncio.get_running_loop()
140-
await self._create_publisher_and_pool(loop, 2)
141-
142-
async def test_3_workers(self):
143-
loop = asyncio.get_running_loop()
144-
await self._create_publisher_and_pool(loop, 3)
145-
146-
async def test_4_workers(self):
147-
loop = asyncio.get_running_loop()
148-
await self._create_publisher_and_pool(loop, 4)
149-
150-
async def test_8_workers(self):
151-
loop = asyncio.get_running_loop()
152-
await self._create_publisher_and_pool(loop, 8)
153-
154-
155-
@pytest.mark.unit
156-
@pytest.mark.asyncio
157-
class TestZmqPoolTransportWithoutPublisher:
158-
"""Test pool transport creation without a publisher (baseline)."""
112+
class TestZmqPoolTransport:
113+
"""Pool transport creation with and without a publisher on the same context."""
159114

160115
@pytest.mark.parametrize("num_workers", [2, 3, 4, 8])
161-
async def test_pool_only(self, num_workers: int):
116+
@pytest.mark.parametrize("create_publisher", [True, False])
117+
async def test_pool(self, num_workers: int, create_publisher: bool):
162118
loop = asyncio.get_running_loop()
163119
zmq_ctx = ManagedZMQContext(io_threads=2)
164-
dummy = zmq_ctx.socket(zmq.PUB)
165-
zmq_ctx.bind(dummy, "dummy")
120+
121+
publisher = None
122+
dummy = None
123+
if create_publisher:
124+
sid = uuid.uuid4().hex[:8]
125+
publisher = ZmqEventRecordPublisher(f"ev_pub_{sid}", zmq_ctx, loop=loop)
126+
else:
127+
# Baseline: bind an unrelated PUB socket so the context is non-empty.
128+
dummy = zmq_ctx.socket(zmq.PUB)
129+
zmq_ctx.bind(dummy, "dummy")
166130

167131
pool = ZmqWorkerPoolTransport.create(
168132
loop, num_workers, config=ZMQTransportConfig()
@@ -176,5 +140,8 @@ async def test_pool_only(self, num_workers: int):
176140
await pool.wait_for_workers_ready(timeout=0.1)
177141

178142
pool.cleanup()
179-
dummy.close()
143+
if publisher is not None:
144+
publisher.close()
145+
if dummy is not None:
146+
dummy.close()
180147
zmq_ctx.cleanup()

0 commit comments

Comments
 (0)