Skip to content

Change MetricsAggregatorService to publisher to support live metrics without shmem#306

Open
nv-alicheng wants to merge 10 commits intomainfrom
prototype/alicheng-zmq-metrics
Open

Change MetricsAggregatorService to publisher to support live metrics without shmem#306
nv-alicheng wants to merge 10 commits intomainfrom
prototype/alicheng-zmq-metrics

Conversation

@nv-alicheng
Copy link
Copy Markdown
Collaborator

What does this PR do?

shmem implementation of KVStore in MetricsAggregatorService causes issues on ARM. Several solutions exist:

  1. Rewrite shmem implementation in C/C++ where memory fencing primitives are exposed.
  2. Restructure the design of the MetricsAggregatorService to be a ZMQ Publisher which publishes metrics at a fixed rate, which other processes, such as a TUI, can subscribe to.

This PR implements (2).

Type of change

  • Bug fix
  • New feature
  • Documentation update
  • Refactor/cleanup

Related issues

Testing

  • Tests added/updated
  • All tests pass locally
  • Manual testing completed

Checklist

  • Code follows project style
  • Pre-commit hooks pass
  • Documentation updated (if needed)

@nv-alicheng nv-alicheng requested a review from a team May 5, 2026 17:21
@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 5, 2026

MLCommons CLA bot All contributors have signed the MLCommons CLA ✍️ ✅

@github-actions github-actions Bot requested review from arekay-nv and nvzhihanj May 5, 2026 17:22
Comment thread src/inference_endpoint/async_utils/transport/protocol.py Fixed
Comment thread src/inference_endpoint/async_utils/transport/protocol.py Fixed
Comment thread src/inference_endpoint/async_utils/transport/protocol.py Fixed
Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the metrics aggregation system to use a registry-based architecture with HDR histograms and a generalized pub/sub transport layer, replacing the legacy mmap-backed storage. The update introduces periodic snapshot publishing with disk fallback and updates reporting logic to consume these snapshots. Feedback suggests improving encapsulation by exposing in-flight task metrics through public properties and adopting a more numerically stable variance formula for high-precision latency calculations. Additionally, several legacy tests have been skipped pending migration to the new system.

self._refresh_hz,
get_runtime_state=lambda: (
self._session_state,
len(table._in_flight_tasks),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Directly accessing the protected member _in_flight_tasks of MetricsTable breaks encapsulation. It is better to expose this information through a public property on the MetricsTable class.

Suggested change
len(table._in_flight_tasks),
len(table.in_flight_tasks_count),

# ENDED has been observed; transition to DRAINING so any tick
# that fires before publish_final reflects the new state.
self._session_state = SessionState.DRAINING
logger.info("Draining %d async tasks...", len(table._in_flight_tasks))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Directly accessing the protected member _in_flight_tasks of MetricsTable breaks encapsulation. It is better to expose this information through a public property on the MetricsTable class.

Suggested change
logger.info("Draining %d async tasks...", len(table._in_flight_tasks))
logger.info("Draining %d async tasks...", table.in_flight_tasks_count)

Comment on lines +351 to +353
for t in list(table._in_flight_tasks):
if not t.done():
t.cancel()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Iterating over and cancelling tasks directly from the aggregator violates the encapsulation of MetricsTable. This logic should be moved into a dedicated method within MetricsTable.

                table.cancel_all_tasks()

for t in list(table._in_flight_tasks):
if not t.done():
t.cancel()
n_pending = len(table._in_flight_tasks)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Directly accessing the protected member _in_flight_tasks of MetricsTable breaks encapsulation. It is better to expose this information through a public property on the MetricsTable class.

Suggested change
n_pending = len(table._in_flight_tasks)
n_pending = table.in_flight_tasks_count

self._registry = registry
self._in_flight: dict[str, SampleRow] = {}
self._triggers: dict[str, list[EmitTrigger]] = {}
self._in_flight_tasks: set[asyncio.Task] = set()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To improve encapsulation and support the refactored MetricsAggregatorService, please expose the count of in-flight tasks and a method to cancel them.

        self._in_flight_tasks: set[asyncio.Task] = set()

    @property
    def in_flight_tasks_count(self) -> int:
        """Number of async trigger tasks currently in flight."""
        return len(self._in_flight_tasks)

    def cancel_all_tasks(self) -> None:
        """Cancel all in-flight async trigger tasks."""
        for task in self._in_flight_tasks:
            if not task.done():
                task.cancel()

Comment on lines +52 to +53
var_num = stat.sum_sq - stat.total * stat.total / n
std_dev = math.sqrt(var_num / (n - 1)) if var_num > 0 else 0.0
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The naive variance formula sum_sq - total^2 / n is numerically unstable when total^2 / n is very close to sum_sq, which is common for high-precision latency metrics in nanoseconds. Since total and sum_sq are Python integers for latency series, using the integer-based formula (n * sum_sq - total^2) / (n * (n - 1)) provides exact precision before the final square root.

Suggested change
var_num = stat.sum_sq - stat.total * stat.total / n
std_dev = math.sqrt(var_num / (n - 1)) if var_num > 0 else 0.0
# Use a more numerically stable formula for variance when aggregates are large.
# If using integers, (n * sum_sq - total^2) is exact.
if isinstance(stat.total, int) and isinstance(stat.sum_sq, int):
var_num = (n * stat.sum_sq - stat.total**2)
std_dev = math.sqrt(max(0, var_num)) / math.sqrt(n * (n - 1))
else:
var_num = stat.sum_sq - stat.total * stat.total / n
std_dev = math.sqrt(max(0, var_num / (n - 1)))

Copy link
Copy Markdown
Collaborator

@arekay-nv arekay-nv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(superseded — see #pullrequestreview-4237753512 for the full review-council output)

Copy link
Copy Markdown
Collaborator

@arekay-nv arekay-nv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Council — Multi-AI Code Review

Reviewed by: Codex + Claude | Depth: thorough

Found 12 issues across 7 files (excluding tests, pyproject.toml, uv.lock).

⚠️ GitHub's review-comment API is currently returning a persistent pull_request_review_thread.base internal error on this PR for inline comments via line+side and position-based posts (the github-code-quality and gemini-code-assist bot comments from yesterday were unaffected). Falling back to a single review with file:line links — clicking each link opens the PR diff at the cited line.

Note: the existing gemini comments on aggregator.py:286/342/353/354 already cover the encapsulation concern around table._in_flight_tasks direct access, so that issue is intentionally not duplicated here. The cancel-without-await issue (#4 below) is a separate concurrency concern at the same file:line.

🔴 Must Fix (critical/high)

Issues that will cause incorrect behavior, data loss, or security problems in production.

# File:Line Reviewer Category Summary
1 src/inference_endpoint/commands/benchmark/execute.py:482 Claude data-integrity Subscriber late-binding can drop initial ticks (ZMQ slow-joiner)
2 src/inference_endpoint/async_utils/services/metrics_aggregator/publisher.py:168 Claude performance _write_atomic_fallback runs blocking I/O on the event loop
3 src/inference_endpoint/async_utils/services/metrics_aggregator/registry.py:208 Codex performance Unbounded raw-sample retention in SeriesSampler

🟡 Should Fix (medium)

Real issues that trigger under specific conditions or design flaws that will compound.

# File:Line Reviewer Category Summary
4 src/inference_endpoint/async_utils/services/metrics_aggregator/aggregator.py:353 Claude error-handling Cancellations not awaited before reading n_pending
5 src/inference_endpoint/async_utils/services/metrics_aggregator/registry.py:161 Claude bug HdrHistogram(low, high, sig_figs) constructed without high >= 2*low check
6 src/inference_endpoint/async_utils/services/metrics_aggregator/subscriber.py:55 Claude data-integrity conflate=True default for the Report consumer is fragile
7 src/inference_endpoint/commands/benchmark/execute.py:423 Codex data-integrity Stale final_snapshot.msgpack not cleared on report-dir reuse

🔵 Consider (low)

Valid improvements that could be follow-ups.

# File:Line Reviewer Category Summary
8 src/inference_endpoint/async_utils/services/metrics_aggregator/aggregator.py:281 Claude bug Double STARTED reassigns and orphans the tick task
9 src/inference_endpoint/async_utils/services/metrics_aggregator/publisher.py:195 Claude concurrency close() cancels tick task but doesn't await it
10 src/inference_endpoint/async_utils/services/metrics_aggregator/__main__.py:166 Claude error-handling No top-level exception handling around run_until_complete(main())
11 src/inference_endpoint/load_generator/session.py:408 Claude design ERROR-before-COMPLETE relies on an undocumented publisher ordering contract
12 src/inference_endpoint/async_utils/services/metrics_aggregator/aggregator.py:132 Claude bug SIGTERM bypasses publish_final

Detailed findings

1. 🔴 high · data-integrity · [Claude]

src/inference_endpoint/commands/benchmark/execute.py:482

Subscriber late-binding can drop initial ticks (ZMQ slow-joiner)

MetricsSnapshotSubscriber is constructed (482) and start()ed (485) AFTER launcher.launch() waits for aggregator readiness. The aggregator can begin publishing as soon as STARTED is observed; ZMQ slow-joiner means snapshots emitted before the SUB handshake completes are dropped. The comment on line 480 acknowledges this. On a busy host the subscriber may also miss COMPLETE if the publisher closes before the subscription warms up, silently degrading to LIVE/DRAINING (incomplete report). Fix: move subscriber construction + start() BEFORE launcher.launch(). Connecting to a not-yet-bound IPC path is fine — ZMQ retries connect transparently.

2. 🔴 high · performance · [Claude]

src/inference_endpoint/async_utils/services/metrics_aggregator/publisher.py:168

_write_atomic_fallback runs blocking I/O on the event loop

f.write + f.flush + two os.fsync (file + parent dir) + os.rename execute synchronously on the aggregator subprocess's main event loop (awaited via publish_final from aggregator.py:362). On a busy box os.fsync can block tens-to-hundreds of ms — long enough to back-pressure event-record processing if any events are still in flight at finalization. Fix: wrap with await asyncio.to_thread(self._write_atomic_fallback, payload).

3. 🔴 high · performance · [Codex]

src/inference_endpoint/async_utils/services/metrics_aggregator/registry.py:208

Unbounded raw-sample retention in SeriesSampler

self._raw.append(value) retains every observed value in an array.array for the full run so the final snapshot can recompute exact percentiles. The prior mmap-backed store spilled this to disk; the new path keeps it in the aggregator subprocess RAM, so memory now scales linearly with run length. A 5-min 50k-QPS run produces ~15M samples (~120MB per int64 series); once latency/ISL/OSL/TTFT/TPOT are all tracked, several hundred MB; longer runs OOM or swap heavily before the report is emitted. Fix: periodically spill _raw to disk, cap with reservoir sampling, or rely solely on the HDR digest for percentiles.

4. 🟡 medium · error-handling · [Claude]

src/inference_endpoint/async_utils/services/metrics_aggregator/aggregator.py:353

Cancellations not awaited before reading n_pending

After the drain timeout, t.cancel() only schedules cancellation at the next await point. n_pending = len(table._in_flight_tasks) on the next line therefore reads a count that's still high, and the not-yet-cancelled tasks may still be running when _publisher.close() and the loop teardown happen → "Task was destroyed but it is pending!" warnings on shutdown. Fix: after t.cancel(), do await asyncio.gather(*tasks, return_exceptions=True) (bounded by a short timeout) before recomputing n_pending. Independent of the encapsulation point gemini already raised.

5. 🟡 medium · bug · [Claude]

src/inference_endpoint/async_utils/services/metrics_aggregator/registry.py:161

HdrHistogram(low, high, sig_figs) constructed without high >= 2*low check

hdrh requires high >= 2*low. Current bounds (1ns–3.6e12ns, 1–1e7 tokens) satisfy this, but a future caller passing e.g. hdr_low=1, hdr_high=1 (or any hdr_high < 2*hdr_low) gets an opaque ValueError from deep inside the C library. Fix: explicit pre-check after the clamps: if self._hdr_high < self._hdr_low * 2: raise ValueError(...).

6. 🟡 medium · data-integrity · [Claude]

src/inference_endpoint/async_utils/services/metrics_aggregator/subscriber.py:55

conflate=True default for the Report consumer is fragile

The docstring argues this is safe because COMPLETE is the publisher's last message, but ZMQ CONFLATE only keeps the most recent unread message. If the subscriber's loop is starved (main process busy in wait_for_exit) and the publisher closes before the SUB sees COMPLETE (LINGER expires, IPC socket unlinked), complete never fires and latest may be a stale LIVE. The 2 s wait_for_complete timeout in execute.py:548 papers over most cases, but consider conflate=False for the Report consumer (a single producer at a few snapshots/sec, not a TUI).

7. 🟡 medium · data-integrity · [Codex]

src/inference_endpoint/commands/benchmark/execute.py:423

Stale final_snapshot.msgpack not cleared on report-dir reuse

metrics_output_dir = ctx.report_dir / "metrics" is created (423) but an existing final_snapshot.msgpack from a prior run is never removed. If a user reuses --report-dir and the new run exits before publishing a fresh final snapshot (SIGTERM, crash), _load_final_snapshot_from_disk() decodes the previous run's file and silently builds the new report from stale metrics. Fix: unlink any pre-existing final_snapshot.msgpack here, OR refuse to decode a snapshot whose timestamp/run-id doesn't match the current run.

8. 🔵 low · bug · [Claude]

src/inference_endpoint/async_utils/services/metrics_aggregator/aggregator.py:281

Double STARTED reassigns and orphans the tick task

self._publisher.start(...) is called every time a SessionEventType.STARTED record is observed. If two STARTED events ever land in the EventRecord stream (replay, buggy producer, test fixture), start() reassigns _tick_task; the previous task keeps running until garbage-collected and races with the new task to publish snapshots. Fix: guard with if self._tick_task is None: ... inside publisher.start(), or only call start() on the first STARTED in process().

9. 🔵 low · concurrency · [Claude]

src/inference_endpoint/async_utils/services/metrics_aggregator/publisher.py:195

close() cancels tick task but doesn't await it

self._tick_task.cancel() (205) is followed immediately by self._publisher.close() (206). If close() is invoked from a sync path, the cancelled task may still be runnable when the underlying ZMQ publisher socket is closed → brief window where _tick could call publish() on a closed transport. ZmqMessagePublisher.publish() early-returns on is_closed so this isn't fatal, but produces noisy CancelledError-during-shutdown traces. Fix: make close() async-aware (mirror publish_final) or document that callers must await publish_final (which already cancels the task) before close.

10. 🔵 low · error-handling · [Claude]

src/inference_endpoint/async_utils/services/metrics_aggregator/__main__.py:166

No top-level exception handling around run_until_complete(main())

If main() raises (bad CLI args, ZMQ bind failure, tokenizer load error), the subprocess exits with a stack trace but the parent's ServiceLauncher.wait_for_exit sees only a non-zero exit code with no diagnostic context propagated upward. Fix: wrap with try/except + structured logging so failures are surfaced in the parent's logs alongside the exit code.

11. 🔵 low · design · [Claude]

src/inference_endpoint/load_generator/session.py:408

ERROR-before-COMPLETE relies on an undocumented publisher ordering contract

The aggregator's TRACKED_SAMPLES_FAILED logic (aggregator.py:303–305) assumes ERROR is observed BEFORE COMPLETE removes the row. The ordering is correct here, but it depends on ZmqMessagePublisher preserving publish() order through batching AND ZMQ PUB→SUB delivering in order to a single SUB (both currently true). A short comment in session.py noting "publisher must preserve publish-order" would harden this against future transport refactors.

12. 🔵 low · bug · [Claude]

src/inference_endpoint/async_utils/services/metrics_aggregator/aggregator.py:132

SIGTERM bypasses publish_final

_finalize() only sets _shutdown_event after publish_final and _publisher.close() complete, but __main__.py doesn't install a SIGTERM handler. On parent-side launcher.kill_all (or any external SIGTERM before ENDED arrives), neither pub/sub COMPLETE nor disk fallback runs, leaving the parent's triple-redundant snapshot path empty. Fix: loop.add_signal_handler(SIGTERM, ...) to flush the final snapshot defensively before exit.


Generated by /review-council — Codex gpt-5.4 (review of git diff against main) + Claude (direct file review with HEAD-source line verification).

nv-alicheng added a commit that referenced this pull request May 7, 2026
Two high-severity issues raised by the review-council pass on PR #306:

1. (#306-1) Subscriber late-binding could drop early ticks via the ZMQ
   slow-joiner pattern. Move MetricsSnapshotSubscriber construction +
   start() BEFORE launcher.launch() so the SUB handshake completes
   during the subprocess-spawn window. ZMQ tolerates connect-before-
   bind on IPC — the connect resolves once the binder appears. The
   prior ordering (subscribe AFTER launch returns) had a window where
   the aggregator could begin ticking on STARTED before the SUB
   subscription warmed up, dropping early live snapshots and, in the
   worst case, missing COMPLETE entirely.

2. (#306-2) MetricsPublisher._write_atomic_fallback runs synchronous
   f.flush + fsync(file) + fsync(parent dir) + rename on the
   aggregator's event loop. On a busy host this can block tens-to-
   hundreds of ms — long enough to back-pressure event-record
   processing. Wrap with asyncio.to_thread inside publish_final.

Both fixes are localized — no API changes, no test changes required.
Existing integration tests (test_concurrency_benchmark, test_end_to_
end_oracle) exercise both paths end-to-end and still pass.

The third P0 item (#306-3, unbounded raw-sample retention) is the
agreed memory trade documented in metrics_pubsub_design_v5.md §11;
addressed by adding "--persist-raw" as a tracked follow-up rather
than a code change in this PR.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Comment thread tests/unit/metrics/test_report_builder.py Dismissed
nv-alicheng and others added 10 commits May 7, 2026 16:24
Introduces the three primitives that the upcoming pub/sub metrics path
will compose on top of:

- snapshot.py: MetricsSnapshot wire struct (msgspec, tagged union of
  CounterStat | SeriesStat) plus SessionState enum (LIVE / DRAINING /
  COMPLETE) and msgpack codec.
- registry.py: MetricsRegistry holding CounterSamplers and
  SeriesSamplers. Series samplers carry an HDR Histogram for cheap live
  percentiles, an array.array of raw values for exact-final
  computation, and exact rollup primitives. Histogram bucket edges are
  log-spaced over the observed [min, max] per snapshot, so they
  auto-zoom to data instead of wasting buckets on empty range.
- New unit tests cover the wire codec round-trip, sampler hot path,
  and registry registration/collision behavior.

Adds hdrhistogram==0.10.3 as a runtime dependency.

Wiring of these primitives into the aggregator and removal of the old
KVStore path follow in subsequent commits.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- publisher.py: MetricsPublisher owns the periodic tick task that
  publishes live MetricsSnapshots over IPC pub/sub at refresh_hz, plus
  publish_final() which is awaited by the aggregator on ENDED. Final
  delivery is dual-path:
  * pub/sub publish (best-effort, telemetry knobs sndhwm=4, linger=10s)
  * disk fallback (atomic: tmp + fsync(file) + rename + fsync(parent dir))
  Both paths are independently wrapped in try/except — neither failure
  suppresses the other. publish_final is async and awaits tick-task
  cancellation before publishing COMPLETE so a late LIVE/DRAINING tick
  can never land after COMPLETE on the wire.

- subscriber.py: MetricsSnapshotSubscriber tracks ``latest`` and the
  ``COMPLETE``-state snapshot. Defaults to conflate=True (TUI / report
  consumer) but parametrized for any consumer that needs every tick.

- New unit tests cover tick-task lifecycle, atomic disk fallback,
  independence of pub/sub vs disk failure paths, and the regression
  that publish_final must await tick-task cancellation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the mmap-backed BasicKVStore with the registry/publisher path
introduced in the previous two commits.

Aggregator changes:
- MetricsAggregatorService now constructs a MetricsRegistry and
  MetricsPublisher on entry; trigger callbacks call registry.record /
  registry.increment instead of kv_store.update.
- Tracks SessionState (LIVE → DRAINING on ENDED → COMPLETE on
  publish_final). The publisher tick task captures (state,
  n_pending_tasks) per tick via a callback; consumers detect drain
  timeout as state == COMPLETE and n_pending_tasks > 0.
- Adds TRACKED_SAMPLES_FAILED counter, incremented on ERROR events
  whose tracked row still exists at processing time. Correctness
  depends on the load_generator emitting ERROR before COMPLETE; the
  matching test asserts that order.
- ENDED handler awaits drain_tasks (30s timeout), publish_final, and
  closes the publisher (linger=10s drains pending pub/sub frames).

Report changes:
- Replaces from_kv_reader with from_snapshot (pure function on a
  MetricsSnapshot). complete is derived from state == COMPLETE and
  n_pending_tasks == 0. Display warns when not complete.

Main-process changes (commands/benchmark/execute.py):
- Spawns a MetricsSnapshotSubscriber on the main loop. Triple-redundant
  report sourcing: pub/sub COMPLETE → disk fallback → latest live.
- Removes _setup_kv_reader, ARM tmpfs branching, and mmap salvage in
  _salvage_tmpfs (events.jsonl salvage is preserved).
- Awaits subscriber.wait_for_complete(timeout=2.0) after launcher
  exit so the loop can dispatch the COMPLETE frame before deciding
  the pub/sub path missed.

Removed:
- async_utils/services/metrics_aggregator/kv_store.py
- async_utils/services/metrics_aggregator/fs_check.py

Tests:
- Deletes test_kv_store.py.
- Marks test_aggregator.py / test_aggregator_e2e.py /
  test_metrics_table.py / test_report_builder.py / conftest.py with
  module-level skip + a TODO referencing the design doc; rewriting
  these on the new fixtures is a tracked follow-up.
- Adds test_aggregator_error_handler.py covering the
  TRACKED_SAMPLES_FAILED increment path and the negative case where
  COMPLETE arrives before ERROR (documents the bug the ERROR/COMPLETE
  swap fixes).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Swaps the publish order in BenchmarkSession._handle_response so that a
QueryResult carrying an error emits ErrorEventType.GENERIC first, then
SampleEventType.COMPLETE.

This is required for metrics-aggregator correctness: COMPLETE causes
MetricsTable.set_field to remove the tracked row, so an ERROR observed
afterward has no row to inspect and TRACKED_SAMPLES_FAILED would
silently stay at 0. Emitting ERROR first keeps the row alive long
enough for the aggregator's error handler to identify the failure as
tracked. EventLoggerService and other event consumers treat the two
event types independently, so order is invisible to them.

The test_failed_query_published_as_error_event test now asserts the
order explicitly so a future revert is caught immediately, and the
aggregator-side regression is covered by test_aggregator_error_handler.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Core Data Flow diagram: replaces "EventRecorder + MetricsReporter"
  with the events PUB → EventLoggerService / MetricsAggregatorService
  fan-out + main-process Report subscriber.
- Key Components table: adds Metrics Aggregator and Report rows;
  notes the load_generator's ERROR-before-COMPLETE invariant.
- New "Metrics Aggregator subprocess (pub/sub)" section under
  Hot-Path Architecture: state machine, sampler storage layout, hot
  path API, dual-path final delivery, dynamic histogram edges.
- Code Organization tree: expands metrics_aggregator/ to list each
  module; removes deleted recorder.py / reporter.py from metrics/.
- Key Dependencies table: adds hdrhistogram (C-backed HDR Histogram).
- Test fixtures: removes events_db (deleted with the KVStore path).

Per AGENTS.md's own self-update rules: "Treat AGENTS.md changes as
part of the refactor itself — include them in the same PR".

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two high-severity issues raised by the review-council pass on PR #306:

1. (#306-1) Subscriber late-binding could drop early ticks via the ZMQ
   slow-joiner pattern. Move MetricsSnapshotSubscriber construction +
   start() BEFORE launcher.launch() so the SUB handshake completes
   during the subprocess-spawn window. ZMQ tolerates connect-before-
   bind on IPC — the connect resolves once the binder appears. The
   prior ordering (subscribe AFTER launch returns) had a window where
   the aggregator could begin ticking on STARTED before the SUB
   subscription warmed up, dropping early live snapshots and, in the
   worst case, missing COMPLETE entirely.

2. (#306-2) MetricsPublisher._write_atomic_fallback runs synchronous
   f.flush + fsync(file) + fsync(parent dir) + rename on the
   aggregator's event loop. On a busy host this can block tens-to-
   hundreds of ms — long enough to back-pressure event-record
   processing. Wrap with asyncio.to_thread inside publish_final.

Both fixes are localized — no API changes, no test changes required.
Existing integration tests (test_concurrency_benchmark, test_end_to_
end_oracle) exercise both paths end-to-end and still pass.

The third P0 item (#306-3, unbounded raw-sample retention) is the
agreed memory trade documented in metrics_pubsub_design_v5.md §11;
addressed by adding "--persist-raw" as a tracked follow-up rather
than a code change in this PR.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The metrics pub/sub refactor (PR #N) module-level-skipped four test
files plus their conftest because they hard-coupled to the deleted
KVStore API. This commit reinstates them on the new fixtures, in
scope with the design doc's "test impact" callout.

- conftest.py: rewrites shared fixtures to construct MetricsRegistry
  and MetricsTable instances directly. Drops events_db (SQLite
  fixture deleted with the KVStore path).
- test_metrics_table.py: 16 tests covering tracking-window lifecycle,
  trigger dispatch on field updates, tracked-block accounting, and
  the in-flight async-task drain path.
- test_aggregator.py: 31 tests covering MetricsAggregatorService end
  to end (in-process, MagicMock publisher) — counter accounting,
  ISSUED/COMPLETE/error event handling, ENDED → publish_final
  sequence, and the LIVE → DRAINING state transition. Adds a new
  TestCounterAccounting class to cover the total_* vs tracked_*
  counter split that the legacy tests conflated.
- test_aggregator_e2e.py: 3 tests round-tripping a real
  MetricsPublisher ↔ MetricsSnapshotSubscriber over IPC, covering
  COMPLETE-only delivery, LIVE-tick-then-COMPLETE lifecycle, and
  counter+series wire shape.
- test_report_builder.py: 14 tests on Report.from_snapshot, including
  the complete=False derivation when state != COMPLETE or
  n_pending_tasks > 0.

Net: 64 new tests across the 4 suites; full unit suite up from
660 to 724 passing. The 4 module-level skips are gone.

Production-code surfaces flagged for follow-up coverage:
- AsyncTokenTrigger exception path in metrics_table.py
- SeriesSampler HDR clamp warn-once branch
- MetricsAggregatorService._finalize shutdown_event signaling
- Report.tps() OSL-empty-with-duration case

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The 6 generated-template integration tests were skipped unconditionally
in CI/dev because the template placeholders default to gated
meta-llama/Llama-3.1-* repos that require HF_TOKEN to fetch the
tokenizer.

Substitute TinyLlama/TinyLlama-1.1B-Chat-v1.0 for the model name in
_resolve_template after placeholder expansion. TinyLlama is non-gated
(~1MB tokenizer download), shares the Llama-family tokenizer the
templates were written against, and the echo-server path doesn't care
about model identity — only that AutoTokenizer.from_pretrained
succeeds for the metrics aggregator's ISL/OSL/TPOT triggers.

Drops the @pytest.mark.skipif(not HF_TOKEN) decorator, removes the now-
unused os import.

Effect: integration suite goes from 20 passed / 8 skipped to 26 passed
/ 2 skipped. The remaining 2 skips need real LLM servers (vLLM/SGLang)
which aren't in scope.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds two new sections to AGENTS.md "Development Standards":

1. "Documentation references — no local-only artifacts" — docs and
   comments must not reference paths outside the repo (gitignored
   directories, local scratch dirs, contributor workstation paths).
   A reviewer fetching the PR should be able to follow every cited
   reference.

2. "Comments and docstrings — describe current state, not development
   history" — no comments narrating iteration on the codebase ("we
   tried X first", "an earlier implementation did Y"). Such pointers
   belong in the PR description and git log, not the source tree.
   Especially relevant under AI-assisted development where it's
   tempting to leave a paper trail of design pivots inline.

Sweeps existing violations across both rules:

Production code: drops cites to ``metrics_pubsub_design_v5.md`` from
module/class docstrings (snapshot.py, registry.py, publisher.py) and
inlines self-contained rationale where useful (aggregator.py HDR
bounds, TOTAL_DURATION_NS comment).

Tests: removes "Migrated to ..." / "The legacy tests ..." framing
from rewritten test module docstrings; reframes regression-test
docstrings (test_registry.py, test_publisher.py, test_aggregator.py)
to describe the invariant being protected rather than narrating the
prior bug's discovery.

AGENTS.md: removes its own self-violation cite to the gitignored
design doc.

Behavior: no functional changes.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Conflict resolution during rebase onto origin/main left the merged
VideoGen row with the older (narrower) column padding. Prettier
normalizes the column widths.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@nv-alicheng nv-alicheng force-pushed the prototype/alicheng-zmq-metrics branch from 85dfac6 to 4110447 Compare May 7, 2026 23:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants