-
Notifications
You must be signed in to change notification settings - Fork 10
Major refactor to move multi-threaded load generator to async event loops #282
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
936cb5a
Major refactor: move multi-threaded load generator to async event loops
nv-alicheng d44be10
Regenerate template files
nv-alicheng 46da470
Use HF_TOKEN to determine whether or not to run test_template_runs
nv-alicheng a056e0a
update regenerate_template script, pass ci
viraatc File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,114 @@ | ||
| # ReadyCheck Design | ||
|
|
||
| ## Problem | ||
|
|
||
| Subprocess startup is asynchronous. The main process spawns workers or service | ||
| subprocesses, but cannot use them until they have completed initialization | ||
| (bound sockets, subscribed to topics, loaded resources). Without synchronization, | ||
| the main process may send messages that are dropped because the subprocess isn't | ||
| listening yet. | ||
|
|
||
| ## Solution | ||
|
|
||
| A generic PUSH/PULL readiness protocol that works for any subprocess type: | ||
|
|
||
| ``` | ||
| Main Process Subprocess (worker or service) | ||
| ┌───────────────────┐ ┌───────────────────────────┐ | ||
| │ ReadyCheckReceiver│ │ │ | ||
| │ (PULL, bind) │ │ 1. Initialize transports │ | ||
| │ │ │ 2. Subscribe / connect │ | ||
| │ await wait(N) │◄─── READY ───│ 3. send_ready_signal() │ | ||
| │ blocks until N │ (PUSH) │ (ctx, path, id) │ | ||
| │ signals arrive │ │ 4. Start processing │ | ||
| └───────────────────┘ └───────────────────────────┘ | ||
| ``` | ||
|
|
||
| ## Why PUSH/PULL | ||
|
|
||
| PUB/SUB has a "slow joiner" problem — the subscriber may miss messages | ||
| published before it connects. PUSH/PULL guarantees delivery: if the PULL | ||
| socket is bound before the PUSH connects, no messages are lost. | ||
|
|
||
| Multiple PUSH sockets can connect to a single PULL socket (ZMQ fan-in). | ||
| This means one receiver socket handles readiness from all subprocesses. | ||
|
|
||
| ## Components | ||
|
|
||
| ### ReadyCheckReceiver (host side) | ||
|
|
||
| - Binds a ZMQ PULL socket on an IPC path | ||
| - `wait(timeout)` blocks until `count` signals arrive | ||
| - Returns list of identities in arrival order | ||
| - Closes socket after all signals received (or on timeout) | ||
| - Timeout is a total deadline, not per-message | ||
|
|
||
| ### `send_ready_signal()` (subprocess side) | ||
|
|
||
| - Free async function: `send_ready_signal(zmq_context, path, identity)` | ||
| - Uses the subprocess's **existing** ZMQ context — no new context created | ||
| - Opens one PUSH socket, sends one msgpack-encoded int, closes the socket | ||
| - Bounded LINGER (5s) to avoid hanging if receiver is gone | ||
|
|
||
| ## Usage Patterns | ||
|
|
||
| ### Workers (PUSH/PULL primary transport) | ||
|
|
||
| The `_ZmqWorkerConnector` calls `send_ready_signal()` with the worker's | ||
| existing ZMQ context after connecting its request/response transports: | ||
|
|
||
| ```python | ||
| requests = _create_receiver(loop, request_path, zmq_context, ...) | ||
| responses = _create_sender(loop, response_path, zmq_context, ...) | ||
|
|
||
| await send_ready_signal(zmq_context, self.readiness_path, worker_id) | ||
|
|
||
| yield requests, responses | ||
| ``` | ||
|
|
||
| The `ZmqWorkerPoolTransport` creates a `ReadyCheckReceiver` and delegates | ||
| `wait_for_workers_ready()` to it. | ||
|
|
||
| ### Services (PUB/SUB primary transport) | ||
|
|
||
| Services (EventLoggerService, MetricsAggregatorService) accept | ||
| `--readiness-path` and `--readiness-id` CLI arguments. After calling | ||
| `service.start()`, they signal readiness using the same ZMQ context: | ||
|
|
||
| ```python | ||
| service.start() | ||
|
|
||
| if args.readiness_path: | ||
| await send_ready_signal(zmq_ctx, args.readiness_path, args.readiness_id) | ||
|
|
||
| await shutdown_event.wait() | ||
| ``` | ||
|
|
||
| ### ServiceLauncher | ||
|
|
||
| ```python | ||
| launcher = ServiceLauncher(zmq_context) | ||
| procs = await launcher.launch([ | ||
| ServiceConfig(module="...event_logger", args=["--socket-dir", d, ...]), | ||
| ServiceConfig(module="...metrics_aggregator", args=["--socket-dir", d, ...]), | ||
| ], timeout=30.0) | ||
|
|
||
| # ... run benchmark, publish ENDED ... | ||
|
|
||
| ServiceLauncher.wait_for_exit(procs, timeout=60.0) | ||
| ``` | ||
|
|
||
| The launcher: | ||
|
|
||
| 1. Creates a `ReadyCheckReceiver` bound to a unique IPC path | ||
| 2. Spawns each service as `python -m <module> ... --readiness-path <path> --readiness-id <i>` | ||
| 3. Awaits all readiness signals (total deadline timeout) | ||
| 4. Returns subprocess handles for later `wait_for_exit()` | ||
| 5. On failure, checks for subprocess crashes and kills remaining processes | ||
|
|
||
| ## Ordering Guarantee | ||
|
|
||
| The ready signal is sent **after** the subprocess has completed its | ||
| initialization (transport connect, topic subscribe, reader registration). | ||
| This guarantees that when the main process's `wait()` returns, all | ||
| subprocesses are ready to process messages. | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.