Skip to content

Commit 1a72e81

Browse files
nv-alichengclaude
andcommitted
fix(metrics): address P0 review-council findings
Two high-severity issues raised by the review-council pass on PR #306: 1. (#306-1) Subscriber late-binding could drop early ticks via the ZMQ slow-joiner pattern. Move MetricsSnapshotSubscriber construction + start() BEFORE launcher.launch() so the SUB handshake completes during the subprocess-spawn window. ZMQ tolerates connect-before- bind on IPC — the connect resolves once the binder appears. The prior ordering (subscribe AFTER launch returns) had a window where the aggregator could begin ticking on STARTED before the SUB subscription warmed up, dropping early live snapshots and, in the worst case, missing COMPLETE entirely. 2. (#306-2) MetricsPublisher._write_atomic_fallback runs synchronous f.flush + fsync(file) + fsync(parent dir) + rename on the aggregator's event loop. On a busy host this can block tens-to- hundreds of ms — long enough to back-pressure event-record processing. Wrap with asyncio.to_thread inside publish_final. Both fixes are localized — no API changes, no test changes required. Existing integration tests (test_concurrency_benchmark, test_end_to_ end_oracle) exercise both paths end-to-end and still pass. The third P0 item (#306-3, unbounded raw-sample retention) is the agreed memory trade documented in metrics_pubsub_design_v5.md §11; addressed by adding "--persist-raw" as a tracked follow-up rather than a code change in this PR. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 4a6fa28 commit 1a72e81

2 files changed

Lines changed: 23 additions & 17 deletions

File tree

src/inference_endpoint/async_utils/services/metrics_aggregator/publisher.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,15 @@ async def publish_final(
160160
logger.exception("metrics: pub/sub final publish failed")
161161

162162
# Disk fallback — best-effort, must not affect pub/sub above.
163+
# The atomic write does synchronous f.flush + fsync(file) +
164+
# fsync(parent dir) + rename, which can block tens-to-hundreds of
165+
# ms on a busy host. Run it on a worker thread so it doesn't
166+
# back-pressure any in-flight event-record processing on the
167+
# aggregator's event loop.
163168
try:
164-
self._write_atomic_fallback(self._encoder.encode(snap))
169+
await asyncio.to_thread(
170+
self._write_atomic_fallback, self._encoder.encode(snap)
171+
)
165172
except Exception: # noqa: BLE001 — best-effort.
166173
logger.exception("metrics: disk fallback write failed")
167174

src/inference_endpoint/commands/benchmark/execute.py

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -423,18 +423,25 @@ async def _run_benchmark_async(
423423
metrics_output_dir = ctx.report_dir / "metrics"
424424
metrics_output_dir.mkdir(parents=True, exist_ok=True)
425425

426-
# Subscribe to the metrics PUB socket BEFORE the aggregator binds it,
427-
# so we never miss the STARTED-time first ticks. The aggregator's
428-
# ManagedZMQContext is a separate process; we share socket_dir.
429426
metrics_socket_name = f"metrics_pub_{uuid.uuid4().hex[:8]}"
430-
# The aggregator subprocess will bind metrics_socket_name; the main
431-
# process just needs to know the path to connect to. Connect is
432-
# deferred until after launcher.launch() so the IPC file exists.
433427

434-
# Launch service subprocesses
435-
launcher = ServiceLauncher(zmq_ctx)
428+
# Connect the metrics-snapshot subscriber BEFORE launching the
429+
# aggregator subprocess that binds the matching PUB socket. ZMQ
430+
# tolerates connect-before-bind on IPC (the connect resolves once
431+
# the binder appears), and starting the SUB reader early gives
432+
# the subscription handshake time to complete during the
433+
# ~1-2 second subprocess-launch window. This eliminates the
434+
# slow-joiner risk of dropping early live ticks (or the worst
435+
# case: missing COMPLETE if the SUB handshake never warms up).
436436
if zmq_ctx.socket_dir is None:
437437
raise RuntimeError("ZMQ socket_dir must be set after publisher bind")
438+
metrics_subscriber = MetricsSnapshotSubscriber(
439+
metrics_socket_name, zmq_ctx, loop
440+
)
441+
metrics_subscriber.start()
442+
443+
# Launch service subprocesses
444+
launcher = ServiceLauncher(zmq_ctx)
438445
aggregator_args: list[str] = [
439446
"--socket-dir",
440447
zmq_ctx.socket_dir,
@@ -476,14 +483,6 @@ async def _run_benchmark_async(
476483
timeout=30.0,
477484
)
478485

479-
# Connect the metrics-snapshot subscriber AFTER aggregator readiness
480-
# so the IPC bind is in place. We may still miss the very first tick;
481-
# the disk fallback covers the missing-final case.
482-
metrics_subscriber = MetricsSnapshotSubscriber(
483-
metrics_socket_name, zmq_ctx, loop
484-
)
485-
metrics_subscriber.start()
486-
487486
# Create endpoint client on the shared loop
488487
endpoints = config.endpoint_config.endpoints
489488
logger.info(f"Connecting: {endpoints}")

0 commit comments

Comments
 (0)