Skip to content

Commit 2a60f63

Browse files
nv-alichengclaudeviraatc
authored
Major refactor to move multi-threaded load generator to async event loops (#282)
* Major refactor: move multi-threaded load generator to async event loops Squashed combination of 8 commits developed on feat/alicheng-pubsub-integration: - Initial cleanup to remove all features that will be replaced and permanently removed (#214) - Add KVStore, ready-check mechanism, and ServiceLauncher (#215) - Add report building (#216) - Fix rebase errors - Make Loadgen Async (#255) - Do not use /dev/shm tmpfs for KVStore on ARM, fix rebase errors in e2e_oracle tests - Add batched publishing to pubsub (#281) - Disable warmup temporarily, bugfixes. (#288) Squashed locally prior to rebase onto origin/main to avoid a case-insensitive-filesystem conflict on docs/load_generator/DESIGN.md between intermediate commits. Full per-sub-PR history is preserved in the review threads on PR #282 and on the backup ref backup/pre-squash-pubsub-integration. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Regenerate template files * Use HF_TOKEN to determine whether or not to run test_template_runs * update regenerate_template script, pass ci --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: Viraat Chandra <viraatc@nvidia.com>
1 parent ed1704e commit 2a60f63

98 files changed

Lines changed: 8156 additions & 8462 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,3 +193,7 @@ examples/03_BenchmarkComparison/vllm_venv/
193193
.cursor_artifacts/
194194
.cursor/
195195
docs/superpowers/
196+
.claude/agent-memory/
197+
198+
# User-specific local dev configs; do not commit
199+
CLAUDE.local.md

AGENTS.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,8 @@ Known failure modes when AI tools generate code for this project. Reference thes
351351
- **Importing removed or renamed modules**: After refactors, AI (working from stale context) may import old module paths. Always verify imports resolve to actual files.
352352
- **Over-documenting**: AI generates verbose docstrings, inline comments explaining obvious code, and type annotations on trivial variables. This project prefers minimal comments — only where the _why_ isn't obvious from the code.
353353
- **Adding backwards-compatibility shims**: If something was renamed or removed, AI may add re-exports, aliases, or deprecation wrappers. In this project, just delete the old thing and update all call sites.
354+
- **Empty except blocks**: Every `except` block must contain either a comment explaining why the exception is ignored, or a logging statement. Bare `except: pass` without explanation is disallowed. AI often generates empty handlers — always add the reason.
355+
- **No lazy imports**: All imports must be at the top of the file. Imports inside functions, methods, or conditional blocks (other than `TYPE_CHECKING`) are disallowed. The only exceptions are: (1) circular import avoidance with a documenting comment, (2) optional dependencies with a top-level try/except that sets the import to `None`, (3) security sandboxing code that intentionally restricts imports.
354356

355357
### Dependency & Environment
356358

docs/ENDPOINT_CLIENT.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,8 +261,6 @@ class HTTPClientConfig(BaseModel):
261261
# -1 = auto: min(max(8, numa_domain_size), 24)
262262
num_workers: int = -1
263263

264-
record_worker_events: bool = False
265-
event_logs_dir: Path | None = None
266264
log_level: str = "INFO"
267265

268266
# When True, all SSE chunks emitted via IPC (high main-thread overhead).

docs/async_utils/services/metrics_aggregator/DESIGN.md

Lines changed: 0 additions & 389 deletions
This file was deleted.
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
# ReadyCheck Design
2+
3+
## Problem
4+
5+
Subprocess startup is asynchronous. The main process spawns workers or service
6+
subprocesses, but cannot use them until they have completed initialization
7+
(bound sockets, subscribed to topics, loaded resources). Without synchronization,
8+
the main process may send messages that are dropped because the subprocess isn't
9+
listening yet.
10+
11+
## Solution
12+
13+
A generic PUSH/PULL readiness protocol that works for any subprocess type:
14+
15+
```
16+
Main Process Subprocess (worker or service)
17+
┌───────────────────┐ ┌───────────────────────────┐
18+
│ ReadyCheckReceiver│ │ │
19+
│ (PULL, bind) │ │ 1. Initialize transports │
20+
│ │ │ 2. Subscribe / connect │
21+
│ await wait(N) │◄─── READY ───│ 3. send_ready_signal() │
22+
│ blocks until N │ (PUSH) │ (ctx, path, id) │
23+
│ signals arrive │ │ 4. Start processing │
24+
└───────────────────┘ └───────────────────────────┘
25+
```
26+
27+
## Why PUSH/PULL
28+
29+
PUB/SUB has a "slow joiner" problem — the subscriber may miss messages
30+
published before it connects. PUSH/PULL guarantees delivery: if the PULL
31+
socket is bound before the PUSH connects, no messages are lost.
32+
33+
Multiple PUSH sockets can connect to a single PULL socket (ZMQ fan-in).
34+
This means one receiver socket handles readiness from all subprocesses.
35+
36+
## Components
37+
38+
### ReadyCheckReceiver (host side)
39+
40+
- Binds a ZMQ PULL socket on an IPC path
41+
- `wait(timeout)` blocks until `count` signals arrive
42+
- Returns list of identities in arrival order
43+
- Closes socket after all signals received (or on timeout)
44+
- Timeout is a total deadline, not per-message
45+
46+
### `send_ready_signal()` (subprocess side)
47+
48+
- Free async function: `send_ready_signal(zmq_context, path, identity)`
49+
- Uses the subprocess's **existing** ZMQ context — no new context created
50+
- Opens one PUSH socket, sends one msgpack-encoded int, closes the socket
51+
- Bounded LINGER (5s) to avoid hanging if receiver is gone
52+
53+
## Usage Patterns
54+
55+
### Workers (PUSH/PULL primary transport)
56+
57+
The `_ZmqWorkerConnector` calls `send_ready_signal()` with the worker's
58+
existing ZMQ context after connecting its request/response transports:
59+
60+
```python
61+
requests = _create_receiver(loop, request_path, zmq_context, ...)
62+
responses = _create_sender(loop, response_path, zmq_context, ...)
63+
64+
await send_ready_signal(zmq_context, self.readiness_path, worker_id)
65+
66+
yield requests, responses
67+
```
68+
69+
The `ZmqWorkerPoolTransport` creates a `ReadyCheckReceiver` and delegates
70+
`wait_for_workers_ready()` to it.
71+
72+
### Services (PUB/SUB primary transport)
73+
74+
Services (EventLoggerService, MetricsAggregatorService) accept
75+
`--readiness-path` and `--readiness-id` CLI arguments. After calling
76+
`service.start()`, they signal readiness using the same ZMQ context:
77+
78+
```python
79+
service.start()
80+
81+
if args.readiness_path:
82+
await send_ready_signal(zmq_ctx, args.readiness_path, args.readiness_id)
83+
84+
await shutdown_event.wait()
85+
```
86+
87+
### ServiceLauncher
88+
89+
```python
90+
launcher = ServiceLauncher(zmq_context)
91+
procs = await launcher.launch([
92+
ServiceConfig(module="...event_logger", args=["--socket-dir", d, ...]),
93+
ServiceConfig(module="...metrics_aggregator", args=["--socket-dir", d, ...]),
94+
], timeout=30.0)
95+
96+
# ... run benchmark, publish ENDED ...
97+
98+
ServiceLauncher.wait_for_exit(procs, timeout=60.0)
99+
```
100+
101+
The launcher:
102+
103+
1. Creates a `ReadyCheckReceiver` bound to a unique IPC path
104+
2. Spawns each service as `python -m <module> ... --readiness-path <path> --readiness-id <i>`
105+
3. Awaits all readiness signals (total deadline timeout)
106+
4. Returns subprocess handles for later `wait_for_exit()`
107+
5. On failure, checks for subprocess crashes and kills remaining processes
108+
109+
## Ordering Guarantee
110+
111+
The ready signal is sent **after** the subprocess has completed its
112+
initialization (transport connect, topic subscribe, reader registration).
113+
This guarantees that when the main process's `wait()` returns, all
114+
subprocesses are ready to process messages.

0 commit comments

Comments
 (0)