[DEP-950][feat] add multi-rank sleep/wakeup support to MPI executor path#14636
Conversation
Signed-off-by: Hannah Zhang <hannahz@nvidia.com>
📝 WalkthroughWalkthroughThis PR enables coordinated GPU virtual memory sleep/wakeup across multiple MPI-distributed ranks. It adds a dedicated control communicator, background listener threads on non-rank-0 workers, orchestration logic on rank 0 to broadcast commands and collect ACKs, updates RPC guards to allowlist these operations for multi-rank deployments, and includes integration and unit tests for the full control flow. ChangesMulti-rank GPU VMM Sleep/Wakeup Support
Sequence Diagram(s)sequenceDiagram
participant Rank0Worker as Rank 0 Worker
participant Rank0Lock as _control_action_lock
participant ControlComm as control_comm
participant RankNListener as Rank N Listener
Rank0Worker->>Rank0Lock: acquire
Rank0Worker->>Rank0Worker: release_with_tag (local)
Rank0Worker->>ControlComm: send sleep/wakeup action
ControlComm->>RankNListener: recv action
RankNListener->>RankNListener: release/materialize_with_tag
RankNListener->>ControlComm: send ACK
Rank0Worker->>ControlComm: recv all ACKs
Rank0Worker->>Rank0Lock: release
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes 🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
🧹 Nitpick comments (3)
tests/unittest/executor/test_sleep_collective_rpc_guards.py (1)
502-577: No QA test-list update is needed for this unittest file.These additions are narrow unit coverage under
tests/unittest/, so they do not need atests/integration/test_lists/qa/*entry on their own. Any QA-list work belongs to the separate integration-test changes in this PR, not this file.As per coding guidelines, "If the PR only touches unittest/ or narrow unit scope, say explicitly whether QA list updates are unnecessary or optional."
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/unittest/executor/test_sleep_collective_rpc_guards.py` around lines 502 - 577, This unittest-only change adds narrow unit coverage under tests/unittest (e.g., TestProxyCollectiveRpcGuards and TestIpcProxyRpcClientGuard) and does not require any QA test-list updates; update the PR description or commit message to explicitly state that QA list updates are unnecessary for this change (mentioning the added tests in TestProxyCollectiveRpcGuards and TestIpcProxyRpcClientGuard) so reviewers know no tests/integration/test_lists/qa/* entry is required.tests/unittest/_torch/multi_gpu/test_mpi_sleep_wakeup.py (1)
81-191: QA list update looks unnecessary for this addition.This lands under
tests/unittest/_torch/multi_gpu/, nottests/integration/defs/, so I do not see a required update undertests/integration/test_lists/qa/for this PR.As per coding guidelines "If the PR only touches unittest/ or narrow unit scope, say explicitly whether QA list updates are unnecessary or optional."
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/unittest/_torch/multi_gpu/test_mpi_sleep_wakeup.py` around lines 81 - 191, This PR adds unit tests (functions test_mpi_sleep_wakeup_tp2 and test_mpi_sleep_wakeup_kv_cache_only_tp2) under the multi-GPU unittest suite and therefore does not require QA list changes; update the PR description (or the commit message) with a one-line note stating "QA list update unnecessary — changes are limited to unit tests under the unittest multi-GPU suite" so reviewers see you considered QA guidance and explicitly declared it unnecessary.tensorrt_llm/executor/base_worker.py (1)
719-723: ⚡ Quick winUse PEP 585 generics in these new annotations.
Since this repo targets Python 3.10+, the new/modified signatures should use
list[...]instead ofList[...]for consistency with the rest of the codebase.As per coding guidelines, "Prefer built-in types `list`, `dict`, and `tuple` to legacy `typing.List`, `typing.Dict`, `typing.Tuple` in Python"; the retrieved learning also notes this repo requires Python >=3.10 and can use PEP 585 generics like `list[str]`.Suggested fix
- def _multi_rank_sleep_wakeup( - self, - action: str, - tags: List[ExecutorMemoryType], - ) -> None: + def _multi_rank_sleep_wakeup( + self, + action: str, + tags: list[ExecutorMemoryType], + ) -> None: @@ - def sleep(self, sleep_tags: List[str]) -> None: + def sleep(self, sleep_tags: list[str]) -> None: @@ - def wakeup(self, wakeup_tags: List[str]) -> None: + def wakeup(self, wakeup_tags: list[str]) -> None:Also applies to: 812-812, 861-861
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tensorrt_llm/executor/base_worker.py` around lines 719 - 723, The function _multi_rank_sleep_wakeup currently uses typing.List for its parameter annotation; update this and all other occurrences in this file to use PEP 585 built-in generics (e.g., use list[ExecutorMemoryType] instead of List[ExecutorMemoryType]). Locate _multi_rank_sleep_wakeup and the other annotated functions mentioned in the comment (the additional occurrences noted around the other two locations) and replace typing.List, typing.Dict, typing.Tuple usages with their modern built-in equivalents to match the codebase style.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@tensorrt_llm/_torch/pyexecutor/py_executor.py`:
- Around line 2116-2153: The control listener decodes msg["tags"] and constructs
ExecutorMemoryType(t) before entering the try/except, so decoding errors can
bypass the finally ACK; move the decoding/validation of msg (accessing
msg["tags"], building tags list via ExecutorMemoryType) inside the try block
that ends with the finally which sends the ACK, and narrow the except to
specific exceptions (e.g., KeyError, TypeError, ValueError, and the runtime
errors from ExecutorMemoryType/materialize/release) instead of a bare except
Exception so any decode/runtime failure results in an error ACK being sent via
self._control_comm.send in the finally; keep the existing action handling
(sleep/wakeup/shutdown/unknown) and logging unchanged.
In `@tensorrt_llm/executor/base_worker.py`:
- Around line 747-748: Reformat the wrapped import of materialize_with_tag and
release_with_tag from tensorrt_llm._torch.virtual_memory to a normal inline
import to satisfy Flake8/isort; replace the parenthesized, wrapped form with a
single-line import statement that imports materialize_with_tag and
release_with_tag from tensorrt_llm._torch.virtual_memory (so references to
materialize_with_tag and release_with_tag in base_worker.py continue to work).
- Around line 784-788: Refactor the rank-0 VMM local error handling so ACK
draining always runs by moving the ACK drain logic out of the broad except and
into a finally block (ensure the drain code that currently follows the
try/except always executes); narrow the exception capture around the local VMM
call (the block that calls release_with_tag / materialize_with_tag) to only
catch CUDA-related errors (RuntimeError and torch.OutOfMemoryError) and build
local_error/wrap into RuntimeError for those cases, but for any other unexpected
exception re-raise it after the finally drain so it is not masked; keep the
existing local_error variable semantics for the CUDA-related path and ensure
traceback.format_exc() is only included for the CUDA-handled branch.
In `@tests/unittest/_torch/multi_gpu/test_mpi_sleep_wakeup.py`:
- Around line 114-145: Snapshot the set of active devices from
_per_device_gpu_memory() into a variable (e.g., active_devices = {dev for dev,
bytes in mem_active.items() if bytes > 0}) when
process_gpu_memory_info_available is true, assert that active_devices is
non-empty before calling llm._collective_rpc("sleep", ...) to ensure the test
can fail, then use that active_devices set for both the post-sleep and
post-wakeup comparisons (iterating over active_devices and comparing mem_sleep
and mem_wakeup for those devices) instead of relying on mem_active/mem_sleep
emptiness checks inside the loops.
- Around line 57-77: The NVML error handling in _per_device_gpu_memory() is too
broad; narrow the excepts so we don't silently hide probe/init failures: only
swallow ImportError when importing pynvml (return {}), but let pynvml.nvmlInit()
raise pynvml.NVMLError (do not convert it to {}); in the per-device loop around
pynvml.nvmlDeviceGetComputeRunningProcesses(handle) only catch specific NVML
errors that mean “no data available” (e.g., pynvml.NVMLError_NotSupported or
NVMLError_NoPermission) and set result[idx]=0 for those, otherwise re-raise
other pynvml.NVMLError exceptions so real probe failures surface.
In `@tests/unittest/executor/test_sleep_collective_rpc_guards.py`:
- Around line 397-404: Reindent the multi-line context-manager "with patch(...)"
blocks in tests/unittest/executor/test_sleep_collective_rpc_guards.py so
continuation lines align under the opening parenthesis (use 4 spaces) and remove
the stray backslash continuation; specifically reformat the block that patches
"tensorrt_llm._torch.virtual_memory.release_with_tag",
"tensorrt_llm._torch.virtual_memory.materialize_with_tag",
"torch.cuda.synchronize", "gc.collect", and "torch.cuda.empty_cache" (and the
similar block around the other occurrence) so each patch(...) call is a properly
indented argument to the initial with and no over-indented continuation lines
remain, conforming to Python 3.10+ 4-space indentation and fixing Flake8 E127.
- Around line 193-195: Update the failing multiline docstrings in the test file
so they conform to Ruff D205: for the docstring that begins
"_control_action_lock must be acquired before and released after the
control_action + send/recv sequence." (and the other occurrences reported),
either collapse the docstring to a single-line or insert a blank line between
the one-line summary and the following descriptive text; ensure you update each
offending docstring (the ones around the _control_action_lock comment and the
other reported ranges) to follow that pattern.
---
Nitpick comments:
In `@tensorrt_llm/executor/base_worker.py`:
- Around line 719-723: The function _multi_rank_sleep_wakeup currently uses
typing.List for its parameter annotation; update this and all other occurrences
in this file to use PEP 585 built-in generics (e.g., use
list[ExecutorMemoryType] instead of List[ExecutorMemoryType]). Locate
_multi_rank_sleep_wakeup and the other annotated functions mentioned in the
comment (the additional occurrences noted around the other two locations) and
replace typing.List, typing.Dict, typing.Tuple usages with their modern built-in
equivalents to match the codebase style.
In `@tests/unittest/_torch/multi_gpu/test_mpi_sleep_wakeup.py`:
- Around line 81-191: This PR adds unit tests (functions
test_mpi_sleep_wakeup_tp2 and test_mpi_sleep_wakeup_kv_cache_only_tp2) under the
multi-GPU unittest suite and therefore does not require QA list changes; update
the PR description (or the commit message) with a one-line note stating "QA list
update unnecessary — changes are limited to unit tests under the unittest
multi-GPU suite" so reviewers see you considered QA guidance and explicitly
declared it unnecessary.
In `@tests/unittest/executor/test_sleep_collective_rpc_guards.py`:
- Around line 502-577: This unittest-only change adds narrow unit coverage under
tests/unittest (e.g., TestProxyCollectiveRpcGuards and
TestIpcProxyRpcClientGuard) and does not require any QA test-list updates;
update the PR description or commit message to explicitly state that QA list
updates are unnecessary for this change (mentioning the added tests in
TestProxyCollectiveRpcGuards and TestIpcProxyRpcClientGuard) so reviewers know
no tests/integration/test_lists/qa/* entry is required.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Enterprise
Run ID: 32c9d98e-0f54-4127-a4bd-bfbf14156c00
📒 Files selected for processing (6)
tensorrt_llm/_torch/pyexecutor/py_executor.pytensorrt_llm/executor/base_worker.pytensorrt_llm/executor/proxy.pytensorrt_llm/executor/rpc_proxy.pytests/unittest/_torch/multi_gpu/test_mpi_sleep_wakeup.pytests/unittest/executor/test_sleep_collective_rpc_guards.py
Signed-off-by: Hannah Zhang <hannahz@nvidia.com>
Signed-off-by: Hannah Zhang <hannahz@nvidia.com>
Signed-off-by: Hannah Zhang <hannahz@nvidia.com>
chienchunhung
left a comment
There was a problem hiding this comment.
Thanks for the PR. Overall approach looks good. I took the first pass and left a couple of comments.
Question: what are the expected behavior if the sleep/wakeup actions succeeded partially, e.g., some ranks succeed while some fail? Do we reconcile?
Signed-off-by: Hannah Zhang <hannahz@nvidia.com>
…nnahz/dep-950-add-multi-rank-sleepwakeup-support-to-mpi-executor-path
There's no reconciliation -- this is an intentional design decision matching how the rest of the executor treats unrecoverable errors. If any rank fails its VMM op, then the system is in an inconsistent state due to some ranks having released/materialized their memory and some not. Attempting to reconcile would require another coordinated round trip and could itself fail. Instead, all errors are aggregated and a single |
Signed-off-by: Hannah Zhang <hannahz@nvidia.com>
Signed-off-by: Hannah Zhang <hannahz@nvidia.com>
…nnahz/dep-950-add-multi-rank-sleepwakeup-support-to-mpi-executor-path
Signed-off-by: Hannah Zhang <hannahz@nvidia.com>
chienchunhung
left a comment
There was a problem hiding this comment.
Overall LGTM. It'd be great to address the following in the follow-up PRs (1) add the load tests, (2) harden the corner case thread cleanup.
Signed-off-by: Hannah Zhang <hannahz@nvidia.com>
…nnahz/dep-950-add-multi-rank-sleepwakeup-support-to-mpi-executor-path
|
/bot run --disable-fail-fast |
…upport-to-mpi-executor-path
|
/bot run --disable-fail-fast |
|
PR_Github #53634 [ run ] triggered by Bot. Commit: |
|
PR_Github #53634 [ run ] completed with state
|
…upport-to-mpi-executor-path
|
/bot run --disable-fail-fast |
|
PR_Github #54322 [ run ] triggered by Bot. Commit: |
|
PR_Github #54322 [ run ] completed with state
|
…upport-to-mpi-executor-path
|
The real PR-caused failure is the new MPI sleep/wakeup path:
Both fail on The hang happens after So the listener executes/receives sleep-wakeup messages, but it does not solve the executor control barrier on peer ranks. The PR’s rank-0-only |
Signed-off-by: Hannah Zhang <hannahz@nvidia.com>
Head branch was pushed to by a user without write access
|
/bot run --disable-fail-fast |
|
PR_Github #54881 [ run ] triggered by Bot. Commit: |
|
PR_Github #54881 [ run ] completed with state
|
|
@hhzhang16 One recommendation: Initialize the sleep/wakeup communicator/listener only when multi-rank sleep/wakeup can actually be used, e.g. PyTorch backend + sleep_config enabled + MPI executor path, rather than all multi-rank PyExecutor workers. I believe the current implementation would initiate the listener thread even when Ray orchestration is used, which is not necessary. |
Signed-off-by: Hannah Zhang <hannahz@nvidia.com>
…nnahz/dep-950-add-multi-rank-sleepwakeup-support-to-mpi-executor-path
|
@chienchunhung good catch, added gate |
|
/bot run --disable-fail-fast |
|
PR_Github #55327 [ run ] triggered by Bot. Commit: |
|
PR_Github #55327 [ run ] completed with state
|
|
/bot run --disable-fail-fast |
|
PR_Github #55559 [ run ] triggered by Bot. Commit: |
|
PR_Github #55559 [ run ] completed with state
|
…upport-to-mpi-executor-path
…upport-to-mpi-executor-path
|
/bot run --disable-fail-fast |
|
PR_Github #55856 [ run ] triggered by Bot. Commit: |
|
PR_Github #55856 [ run ] completed with state
|
Extends
sleep()/wakeup()to work correctly whentensor_parallel_size > 1.Previously the MPI/IPC path rejected multi-rank sleep/wakeup because
control_action()only pauses the local rank-0 executor loop; it does not run code in peer rank processes. Sincerelease_with_tag()/materialize_with_tag()must execute in each rank's own process, this MR adds a dedicated control communicator and listener thread for non-rank-0 workers.Control-Listener Infrastructure (
py_executor.py)_control_comm) instart_worker()after PP broadcast-comm setup. This is collective and runs on the main thread before the worker thread starts._control_listener_threaddaemon on non-rank-0 ranks._CONTROL_ACTION_TAG._CONTROL_ACK_TAGwithstatus: ok/error._control_action_lockto serialize concurrent callers.control_action()uses an Event barrier, not a mutex._control_commor mutate VMM out of order."shutdown"sentinel to all non-rank-0 listeners.Coordination (
base_worker.py)world_size > 1NotImplementedErrorfrom_check_sleep_wakeup_preconditions()._multi_rank_sleep_wakeup():_control_action_lock.control_action().sleep()andwakeup()so both branches are serialized:world_size == 1world_size > 1Proxy Allowlist (
proxy.py,rpc_proxy.py)_MULTI_RANK_ALLOWED_METHODS = {"sleep", "wakeup"}._check_collective_rpc_guard()to acceptmethod=.collective_rpc()still rejects general methods, but allowssleep()/wakeup()because those now coordinate internally across ranks.Sleep/Wakeup Sequence
sequenceDiagram participant API as API caller participant R0 as Rank 0 worker participant Loop0 as Rank 0 executor loop participant C as _control_comm participant R1 as Rank 1 listener participant R2 as Rank 2 listener API->>R0: sleep(tags) or wakeup(tags) R0->>R0: acquire _control_action_lock R0->>Loop0: enter control_action() Loop0-->>R0: drained and paused R0->>C: send action/tags to rank 1 C->>R1: deliver control action R0->>C: send action/tags to rank 2 C->>R2: deliver control action par local VMM ops R0->>R0: release/materialize local VMM and peer VMM ops R1->>R1: release/materialize local VMM R2->>R2: release/materialize local VMM end R1-->>C: ACK status ok/error R2-->>C: ACK status ok/error C-->>R0: ACKs R0->>R0: drain all ACKs R0->>Loop0: exit control_action() R0->>R0: release _control_action_lock R0-->>API: success or aggregate errorError Handling
flowchart TD Start["Rank 0 broadcasts action"] Local["Rank 0 local VMM op"] Peer1["Rank 1 VMM op"] Peer2["Rank 2 VMM op"] Ack1["Rank 1 ACK: status ok/error"] Ack2["Rank 2 ACK: status ok/error"] Drain["Rank 0 drains all ACKs"] AnyError{"Any local or peer error?"} Raise["Raise aggregate RuntimeError"] Return["Return success"] Start --> Local Start --> Peer1 --> Ack1 --> Drain Start --> Peer2 --> Ack2 --> Drain Local --> Drain Drain --> AnyError AnyError -->|yes| Raise AnyError -->|no| ReturnThe important guarantee is that rank 0 always drains peer ACKs before raising. That prevents stale ACKs from being consumed by the next sleep/wakeup operation.
Tests
test_sleep_collective_rpc_guards.pyTestMultiRankSleepWakeupLock.SpyLockto verify acquire/release ordering.TestMultiRankAckErrorPropagation_multi_rank_sleep_wakeup()directly with a fakerecv()that returns scripted ACKs.test_peer_error_ack_raisesverifies a single non-ok ACK surfaces its error detail.test_multiple_peer_errors_aggregatedusesworld_size=3with two error ACKs and verifies both messages appear in the aggregate error.TestMultiRankRank0LocalFailureDrainsAcksrelease_with_tag()to raise.TestSingleRankLockAcquiredSpyLockto verify single-ranksleep()/wakeup()also acquire_control_action_lock._control_commbecause the single-rank path never touches it._make_proto_workerworld_size=3so tests cover multiple peers instead of only the degenerate single-peer case.test_mpi_sleep_wakeup.py(new,@pytest.mark.gpu2)_per_device_gpu_memory()queries NVML per device.Summary by CodeRabbit
Release Notes
New Features
Bug Fixes
Tests