[TRTLLM-12499][feat] (WIP) Add support for pipelined KVCache transfer for disaggregated serving in Python Cache Transceiver#15727
Conversation
Signed-off-by: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com> Made-with: Cursor Signed-off-by: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com> Made-with: Cursor Signed-off-by: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com> Made-with: Cursor
… counter - Replace _collect_block_ids with _collect_base_slice to preserve the full KVSlice metadata (including mamba_state_index) through all new code paths: _create_kv_slices (sender) and request_and_receive_async (receiver). Without this, Mamba/hybrid-state model transfers would lose required state metadata. - Fix VSWA shared counter bug in WindowBlockManager::releasePrefixBlocks: snapshot mNumFrontBlocksRemoved before iterating window managers so each manager releases blocks from the same range. Previously the first manager advanced the shared counter, causing subsequent managers to skip their own blocks entirely. - Guard chunking integrity assertion with __debug__ to avoid O(N) CPU overhead on the hot path in optimized builds. - Add tests for mamba_state_index propagation through chunked slices. Signed-off-by: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com>
- Update copyright year to 2026 in nanobind kvCacheManager.cpp - Add OnChunkTransferredCallback type alias for precise callback typing - Add strict=True to zip() calls in chunked transfer tests Signed-off-by: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com> Made-with: Cursor
- Fix chunking integrity check: use np.array_equal() instead of == for numpy array comparison, raise ValueError instead of assert (eopXD comment on transceiver.py) - Add explicit VSWA limitation comment in BlockManager::releasePrefixBlocks documenting the single-window-size assumption (eopXD comment on kvCacheManager.cpp) - Auto-select Python transceiver when chunk_size_blocks is set and backend is NIXL/DEFAULT. The C++ transceiver does not support chunked transfer; this makes chunking work without requiring users to manually set transceiver_runtime="PYTHON" (pcastonguay comment on transceiver.py) Signed-off-by: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com> Made-with: Cursor Signed-off-by: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com> Made-with: Cursor
Per reviewer feedback (chuangz0, Shixiaowei02): chunk_block_offset belongs as a member of KVSlice rather than a function parameter on send(). The KVSlice dataclass was designed to carry all slice metadata. - Add chunk_block_offset: int = 0 to KVSlice dataclass - Remove chunk_block_offset from TxSessionBase.send() signature - Remove chunk_block_offset from TxSession.send() signature - Remove chunk_block_offset from KVSendTask.__init__ - Read chunk_block_offset from task._slice in _build_kv_write_meta and _deliver_kv_to_agent callback - Set chunk_block_offset on each KVSlice in _create_kv_slices - Update all tests accordingly Signed-off-by: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com> Made-with: Cursor Signed-off-by: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com>
Signed-off-by: Athena Cai <athenac@nvidia.com>
Signed-off-by: Athena Cai <athenac@nvidia.com>
…isaggregated serving in Python Cache Transceiver Credit: the design and much of the implementation in this PR comes from NVIDIA#12781 by @chienchunhung TODO: fix _create_kv_slices and add e2e and accuracy tests Signed-off-by: Athena Cai <athenac@nvidia.com>
📝 WalkthroughWalkthroughAdds chunked and pipelined KV cache transfer to disaggregated serving. ChangesChunked & Pipelined KV Transfer
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes 🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (3 passed)
✨ Finishing Touches🧪 Generate unit tests (beta)
⚔️ Resolve merge conflicts
Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
tensorrt_llm/_torch/disaggregation/native/transfer.py (1)
488-507: 🩺 Stability & Availability | 🟠 Major | ⚡ Quick winMark the task as in-flight before waiting on the CUDA event.
Line 492 waits while the task is still
INIT, socancel_request()can see noTRANSFERRINGtasks and free KV pages before the event completes. Move the INIT→TRANSFERRING transition before the event wait, and keep the cancelled/error abort path before synchronization.Suggested fix
- # For pipelined prefill-transfer: wait for the GPU forward - # to finish writing KV data before starting RDMA. This - # blocks only this worker thread, not the GPU or main thread. - if task._slice.cuda_event is not None: # TODO: should I sync after the task status is set to TRANSFERRING? - task._slice.cuda_event.synchronize() - - if timer: - timer.record_push_end(write_meta.peer_rank) # Hold session.lock to serialize the INIT→TRANSFERRING transition with # cancel(): prevents cancel_request() from freeing KV pages while a # worker is about to write into them. with session.lock: status = session.status if status in (SessionStatus.ERROR, SessionStatus.CANCELLED): should_abort = True else: task.status = TaskStatus.TRANSFERRING should_abort = False + + if should_abort: + ... + return + + # For pipelined prefill-transfer: wait for the GPU forward + # to finish writing KV data before starting RDMA. + if task._slice.cuda_event is not None: + task._slice.cuda_event.synchronize() + + if timer: + timer.record_push_end(write_meta.peer_rank)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tensorrt_llm/_torch/disaggregation/native/transfer.py` around lines 488 - 507, The task transition in transfer.py is happening too late in the prefill-transfer flow: `task._slice.cuda_event.synchronize()` runs while the task is still `INIT`, so `cancel_request()` can miss it and free KV pages too early. In the transfer path around `task`, `session.lock`, and `TaskStatus.TRANSFERRING`, move the INIT→TRANSFERRING state update (with the session ERROR/CANCELLED abort check) before waiting on the CUDA event, and keep the abort branch ahead of synchronization so in-flight work is visible before any blocking wait.
🧹 Nitpick comments (2)
tensorrt_llm/_torch/disaggregation/transceiver.py (2)
582-585: 📐 Maintainability & Code Quality | 🔵 Trivial | 💤 Low valueRedundant session assignment.
_get_or_create_send_sessionalready inserts the session intoself._send_sessions, so re-assigning the return value is redundant (and could mask a future divergence between the two code paths). Mirror the simpler form used inrespond_and_send_async.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tensorrt_llm/_torch/disaggregation/transceiver.py` around lines 582 - 585, The send-session initialization in transceiver logic has a redundant assignment because _get_or_create_send_session already stores the session in self._send_sessions. Update the rid-not-in-self._send_sessions branch in transceiver.py to follow the same pattern as respond_and_send_async by simply invoking _get_or_create_send_session(req) for its side effects, then keep setting _ever_had_send_session and _pipelined_chunk_offsets[rid] as before.
602-604: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winNumerous open TODOs in the pipelined-send path before merge.
send_prefill_chunkandrespond_and_send_asynccarry several unresolvedTODO(athenac)questions on correctness-critical fields (token_range,mamba_state_index,layer_range, thereq.statetransition, the offset accumulation "might be a faulty calculation", and the redundancy between the two methods). Since the PR is marked WIP, these need resolution before this is production-ready. I can help draft the offset/metadata handling and consolidate the shared logic into a single helper.Also applies to: 608-611, 628-631, 657-666, 675-675
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tensorrt_llm/_torch/disaggregation/transceiver.py` around lines 602 - 604, The pipelined-send path in transceiver.py still contains unresolved correctness TODOs in send_prefill_chunk and respond_and_send_async, especially around token_range, mamba_state_index, layer_range, req.state transitions, and the offset accumulation logic. Resolve these TODO(athenac) questions by verifying the metadata semantics, fixing the offset calculation, and making the state update explicit and correct before merge. Also remove the duplicated logic between send_prefill_chunk and respond_and_send_async by consolidating the shared send/metadata assembly into a single helper so the two paths stay consistent.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@tensorrt_llm/_torch/disaggregation/native/transfer.py`:
- Around line 728-745: The chunked destination slicing in transfer logic is now
using chunk offsets, but the token alignment still assumes each chunk maps to a
suffix ending at token_range.end. Update the code around the chunked path in
transfer.py and the downstream token-start calculation to derive starts from
chunk_block_offset, or require callers to provide per-chunk KVSlice.token_range
for each chunk. Make sure the block selection and token-range alignment stay
consistent for prefix-cache and SWA cases so the written blocks match the
intended chunk.
- Around line 523-524: The abort/result notification path in transfer.py still
uses write_meta.slice_id, which can conflict with the receiver’s single-task
slice handling. Update the abort send logic in the relevant transfer routine to
mirror the success path by reporting receiver_slice_id as 0 for aborts too, so
the receiver does not see a later-chunk slice ID and hit its slice assertion.
Keep the existing task/event unblocking behavior intact while ensuring the
aborted/failure result is always sent to receiver slice 0.
- Around line 736-743: The chunk-to-destination mapping in transfer.py is too
strict for exhausted layer groups: when len(src_block_ids) is 0, the current
bounds check in the chunk slicing logic still raises on advanced chunk_offset
values. Update the chunk handling around the dst_block_ids slice so empty source
chunks become a no-op and do not trigger the out-of-bounds error; keep the
existing bounds validation for non-empty chunks in the same chunk
offset/full_dst_block_ids path.
In `@tensorrt_llm/_torch/disaggregation/transceiver.py`:
- Line 648: The `respond_and_send_async` skip guard in `transceiver.py` needs
both a lint fix and a logic check: move the `return` onto its own line to
satisfy E701, and verify the condition around `rid in self._send_sessions and
rid not in self._pipelined_chunk_offsets` correctly prevents duplicate sends
while pipelined chunks are still outstanding. If needed, adjust the guard so the
full `_create_kv_slices` resend path only runs when it is safe, using the
existing `_send_sessions` and `_pipelined_chunk_offsets` state to avoid
duplicate transfer.
- Around line 591-611: The chunking logic in send_prefill_chunk() and the
_pipelined_chunk_offsets update can split KV slices on token boundaries that are
not aligned to tokens_per_block, which causes the boundary block to be resent
and offsets to drift. Adjust the prefill chunk selection so every chunk boundary
lands on a KV block boundary (or clamp the sliding-window fallback so it only
overlaps when it evenly divides tokens_per_block), and then recompute
_pipelined_chunk_offsets from the actual block count in the chunk.
In `@tests/unittest/disaggregated/test_kv_transfer.py`:
- Around line 1789-1825: The send/receive flow is using the wrong API shape:
TxSession.send() and RxSession.receive() should be called with a fully populated
KVSlice rather than extra kwargs, and they do not return futures. Update the
test setup around KVSlice, sender_session.send(), and
receiver_sessions/RxSession.receive() to set chunk_block_offset and cuda_event
on the slice object before calling send/receive, then replace the .result()
waits with wait_complete()/wait_complete(blocking=True) on the session or slice
as appropriate.
---
Outside diff comments:
In `@tensorrt_llm/_torch/disaggregation/native/transfer.py`:
- Around line 488-507: The task transition in transfer.py is happening too late
in the prefill-transfer flow: `task._slice.cuda_event.synchronize()` runs while
the task is still `INIT`, so `cancel_request()` can miss it and free KV pages
too early. In the transfer path around `task`, `session.lock`, and
`TaskStatus.TRANSFERRING`, move the INIT→TRANSFERRING state update (with the
session ERROR/CANCELLED abort check) before waiting on the CUDA event, and keep
the abort branch ahead of synchronization so in-flight work is visible before
any blocking wait.
---
Nitpick comments:
In `@tensorrt_llm/_torch/disaggregation/transceiver.py`:
- Around line 582-585: The send-session initialization in transceiver logic has
a redundant assignment because _get_or_create_send_session already stores the
session in self._send_sessions. Update the rid-not-in-self._send_sessions branch
in transceiver.py to follow the same pattern as respond_and_send_async by simply
invoking _get_or_create_send_session(req) for its side effects, then keep
setting _ever_had_send_session and _pipelined_chunk_offsets[rid] as before.
- Around line 602-604: The pipelined-send path in transceiver.py still contains
unresolved correctness TODOs in send_prefill_chunk and respond_and_send_async,
especially around token_range, mamba_state_index, layer_range, req.state
transitions, and the offset accumulation logic. Resolve these TODO(athenac)
questions by verifying the metadata semantics, fixing the offset calculation,
and making the state update explicit and correct before merge. Also remove the
duplicated logic between send_prefill_chunk and respond_and_send_async by
consolidating the shared send/metadata assembly into a single helper so the two
paths stay consistent.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Enterprise
Run ID: 784a62c4-609a-4545-bc83-7d70311b9078
📒 Files selected for processing (11)
tensorrt_llm/_torch/disaggregation/base/transfer.pytensorrt_llm/_torch/disaggregation/native/transfer.pytensorrt_llm/_torch/disaggregation/transceiver.pytensorrt_llm/_torch/pyexecutor/kv_cache_transceiver.pytensorrt_llm/_torch/pyexecutor/py_executor.pytensorrt_llm/llmapi/llm_args.pytests/integration/defs/accuracy/test_disaggregated_serving.pytests/integration/test_lists/test-db/l0_dgx_b200.ymltests/unittest/disaggregated/test_chunked_transfer.pytests/unittest/disaggregated/test_kv_transfer.pytests/unittest/llmapi/test_llm_args.py
| str(write_meta.slice_id).encode("ascii"), | ||
| b"True", # is_last_slice — ensures receiver resolves its task future | ||
| b"True", # is_last_slice — ensures receiver resolves its task event |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟠 Major | ⚡ Quick win
Send abort results to receiver slice 0 as well.
The success path makes sender-side chunking transparent by reporting receiver_slice_id = 0; this abort path still sends write_meta.slice_id, so a cancelled/failed later chunk can trip the receiver’s single-task slice assertion instead of unblocking it.
Suggested fix
- str(write_meta.slice_id).encode("ascii"),
+ b"0",📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| str(write_meta.slice_id).encode("ascii"), | |
| b"True", # is_last_slice — ensures receiver resolves its task future | |
| b"True", # is_last_slice — ensures receiver resolves its task event | |
| b"0", | |
| b"True", # is_last_slice — ensures receiver resolves its task event |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tensorrt_llm/_torch/disaggregation/native/transfer.py` around lines 523 -
524, The abort/result notification path in transfer.py still uses
write_meta.slice_id, which can conflict with the receiver’s single-task slice
handling. Update the abort send logic in the relevant transfer routine to mirror
the success path by reporting receiver_slice_id as 0 for aborts too, so the
receiver does not see a later-chunk slice ID and hit its slice assertion. Keep
the existing task/event unblocking behavior intact while ensuring the
aborted/failure result is always sent to receiver slice 0.
| chunk_offset = task._slice.chunk_block_offset | ||
| for (self_lg, self_pi), (peer_lg, peer_pi) in pool_mapping.items(): | ||
| src_block_ids = src_block_ids_per_groups[self_lg] | ||
| dst_block_ids = dst_block_ids_per_groups[peer_lg] | ||
| full_dst_block_ids = dst_block_ids_per_groups[peer_lg] | ||
|
|
||
| # When sender uses chunking, the receiver sends all dst | ||
| # blocks in a single RecvReqInfo. Slice dst to match | ||
| # this task's src chunk position. | ||
| if chunk_offset > 0 or not task._slice.is_last_slice: | ||
| chunk_end = chunk_offset + len(src_block_ids) | ||
| if chunk_end > full_dst_block_ids.size: | ||
| raise ValueError( | ||
| f"dst chunk range out of bounds: offset={chunk_offset}, " | ||
| f"len={len(src_block_ids)}, dst_blocks={full_dst_block_ids.size}" | ||
| ) | ||
| dst_block_ids = full_dst_block_ids[chunk_offset:chunk_end] | ||
| else: | ||
| dst_block_ids = full_dst_block_ids |
There was a problem hiding this comment.
🗄️ Data Integrity & Integration | 🟠 Major | 🏗️ Heavy lift
Keep chunk token ranges consistent with chunk_block_offset.
This path now slices destination blocks by chunk, but the alignment below still infers token starts as if each chunk’s block list were the suffix ending at token_range.end. If chunked slices carry the full request range, prefix-cache/SWA cases can align and write the wrong blocks. Either require per-chunk KVSlice.token_range from callers or derive the token starts from chunk_block_offset.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tensorrt_llm/_torch/disaggregation/native/transfer.py` around lines 728 -
745, The chunked destination slicing in transfer logic is now using chunk
offsets, but the token alignment still assumes each chunk maps to a suffix
ending at token_range.end. Update the code around the chunked path in
transfer.py and the downstream token-start calculation to derive starts from
chunk_block_offset, or require callers to provide per-chunk KVSlice.token_range
for each chunk. Make sure the block selection and token-range alignment stay
consistent for prefix-cache and SWA cases so the written blocks match the
intended chunk.
| if chunk_offset > 0 or not task._slice.is_last_slice: | ||
| chunk_end = chunk_offset + len(src_block_ids) | ||
| if chunk_end > full_dst_block_ids.size: | ||
| raise ValueError( | ||
| f"dst chunk range out of bounds: offset={chunk_offset}, " | ||
| f"len={len(src_block_ids)}, dst_blocks={full_dst_block_ids.size}" | ||
| ) | ||
| dst_block_ids = full_dst_block_ids[chunk_offset:chunk_end] |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major | ⚡ Quick win
Allow exhausted layer groups to map to empty destination slices.
With asymmetric layer-group lengths, later chunks can have len(src_block_ids) == 0 while chunk_offset has advanced past that group’s destination length. That should be a no-op, not an out-of-bounds error.
Suggested fix
if chunk_offset > 0 or not task._slice.is_last_slice:
+ if len(src_block_ids) == 0:
+ dst_block_ids = full_dst_block_ids[:0]
+ continue
chunk_end = chunk_offset + len(src_block_ids)
if chunk_end > full_dst_block_ids.size:
raise ValueError(📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if chunk_offset > 0 or not task._slice.is_last_slice: | |
| chunk_end = chunk_offset + len(src_block_ids) | |
| if chunk_end > full_dst_block_ids.size: | |
| raise ValueError( | |
| f"dst chunk range out of bounds: offset={chunk_offset}, " | |
| f"len={len(src_block_ids)}, dst_blocks={full_dst_block_ids.size}" | |
| ) | |
| dst_block_ids = full_dst_block_ids[chunk_offset:chunk_end] | |
| if chunk_offset > 0 or not task._slice.is_last_slice: | |
| if len(src_block_ids) == 0: | |
| dst_block_ids = full_dst_block_ids[:0] | |
| continue | |
| chunk_end = chunk_offset + len(src_block_ids) | |
| if chunk_end > full_dst_block_ids.size: | |
| raise ValueError( | |
| f"dst chunk range out of bounds: offset={chunk_offset}, " | |
| f"len={len(src_block_ids)}, dst_blocks={full_dst_block_ids.size}" | |
| ) | |
| dst_block_ids = full_dst_block_ids[chunk_offset:chunk_end] |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tensorrt_llm/_torch/disaggregation/native/transfer.py` around lines 736 -
743, The chunk-to-destination mapping in transfer.py is too strict for exhausted
layer groups: when len(src_block_ids) is 0, the current bounds check in the
chunk slicing logic still raises on advanced chunk_offset values. Update the
chunk handling around the dst_block_ids slice so empty source chunks become a
no-op and do not trigger the out-of-bounds error; keep the existing bounds
validation for non-empty chunks in the same chunk offset/full_dst_block_ids
path.
| base_slice = self._collect_base_slice(req) | ||
| chunk_block_ids = [ | ||
| block_ids[chunk_start_block:chunk_end_block] | ||
| for block_ids in base_slice.block_ids_per_layer_groups | ||
| ] | ||
|
|
||
| kv_slice = KVSlice( | ||
| is_last_slice=is_last_chunk, | ||
| block_ids_per_layer_groups=chunk_block_ids, | ||
| chunk_block_offset=chunk_block_offset, | ||
| cuda_event=cuda_event, | ||
| token_range=base_slice.token_range, # TODO(athenac): is this correct? probably not. what is token range used for? | ||
| mamba_state_index=base_slice.mamba_state_index, # TODO(athenac): is this correct? | ||
| layer_range=base_slice.layer_range, # TODO(athenac): is this correct? | ||
| ) | ||
|
|
||
| session.send(kv_slice) | ||
| req.state = LlmRequestState.DISAGG_CONTEXT_TRANS_IN_PROGRESS # TODO(athenac): is this correct? | ||
|
|
||
| num_blocks_this_chunk = max((len(ids) for ids in chunk_block_ids), default=0) | ||
| self._pipelined_chunk_offsets[rid] = chunk_block_offset + num_blocks_this_chunk # TODO(athenac): this might be a faulty calculation |
There was a problem hiding this comment.
🗄️ Data Integrity & Integration | 🟠 Major | 🏗️ Heavy lift
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Inspect how context_chunk_size relates to tokens_per_block for chunked prefill
rg -nP -C3 '(context_chunk_size|chunk_size|tokens_per_block)' --type=py -g '!**/test*' tensorrt_llm/_torch/pyexecutor | rg -nP -C3 'tokens_per_block|chunk'Repository: NVIDIA/TensorRT-LLM
Length of output: 50376
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== transceiver slice =="
sed -n '560,630p' tensorrt_llm/_torch/disaggregation/transceiver.py
echo
echo "== scheduler chunk sizing =="
sed -n '540,620p' tensorrt_llm/_torch/pyexecutor/scheduler/scheduler_v2.py
echo
echo "== alignment helper =="
sed -n '611,780p' tensorrt_llm/_torch/pyexecutor/scheduler/scheduler_v2.py
echo
echo "== search for context_chunk_size consumers =="
rg -n "context_chunk_size|is_last_context_chunk|chunk_unit_size|tokens_per_block" tensorrt_llm/_torch/pyexecutor tensorrt_llm/_torch/disaggregation -g '*.py'Repository: NVIDIA/TensorRT-LLM
Length of output: 42122
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== transceiver callers =="
rg -n "chunk_start_block|chunk_end_block|send_kv|send_async|_send.*chunk|KVCacheTransceiverV2" tensorrt_llm/_torch -g '*.py' -C 3
echo
echo "== request_position / current_position handling around chunk scheduling =="
rg -n "context_current_position|context_remaining_length|context_chunk_size|is_last_context_chunk" tensorrt_llm/_torch/pyexecutor -g '*.py' -C 2 | sed -n '1,220p'Repository: NVIDIA/TensorRT-LLM
Length of output: 38455
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== chunk_unit_size setup =="
sed -n '690,720p' tensorrt_llm/_torch/pyexecutor/py_executor_creator.py
echo
echo "== request chunk position bookkeeping =="
sed -n '4940,4985p' tensorrt_llm/_torch/pyexecutor/py_executor.py
echo
echo "== LlmRequest chunk helpers =="
rg -n "py_last_context_chunk|move_to_next_context_chunk|context_current_position" tensorrt_llm/_torch -g '*.py' -C 2Repository: NVIDIA/TensorRT-LLM
Length of output: 25841
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== tokens_per_block defaults and overrides =="
rg -n "tokens_per_block *=|tokens_per_block: int|tokens_per_block=.*64|tokens_per_block=.*32|max\(256, chunk_unit_size\)|chunk_unit_size = max\(256, chunk_unit_size\)" tensorrt_llm/_torch -g '*.py' -C 2
echo
echo "== any validation on tokens_per_block divisibility or allowed values =="
rg -n "tokens_per_block.*(must|should|assert|raise|divis|power of two|64|32)" tensorrt_llm/_torch -g '*.py' -C 2Repository: NVIDIA/TensorRT-LLM
Length of output: 50375
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== any validation / normalization of tokens_per_block =="
rg -n "tokens_per_block.*(32|64|128|256|power|validate|assert|must|allowed|divisible)" tensorrt_llm/_torch tensorrt_llm -g '*.py' -C 2
echo
echo "== kv cache config field docs =="
rg -n "tokens_per_block" tensorrt_llm/_torch -g '*.py' -C 1 | sed -n '1,120p'Repository: NVIDIA/TensorRT-LLM
Length of output: 29457
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== explicit tokens_per_block assignments =="
rg -n "tokens_per_block\s*=\s*(32|64|128|256|512|1024)" tensorrt_llm -g '*.py' -C 1
echo
echo "== tokens_per_block in config schema / docs =="
rg -n "tokens_per_block" tensorrt_llm -g '*.py' -g '*.md' -C 1 | rg -n "default|32|64|128|256|power|multiple"Repository: NVIDIA/TensorRT-LLM
Length of output: 17643
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== prepopulated prompt len / current position semantics =="
rg -n "set_prepopulated_prompt_len|prepopulated_prompt_len|num_committed_tokens" tensorrt_llm/_torch/pyexecutor -g '*.py' -C 2Repository: NVIDIA/TensorRT-LLM
Length of output: 5346
Keep prefill chunk boundaries on KV block boundaries. send_prefill_chunk() slices by block index, so a boundary that isn’t a tokens_per_block multiple will resend the boundary block and skew _pipelined_chunk_offsets. The 256-token sliding-window fallback is only safe when it still divides tokens_per_block; otherwise clamp the chunk size or drop the overlap.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tensorrt_llm/_torch/disaggregation/transceiver.py` around lines 591 - 611,
The chunking logic in send_prefill_chunk() and the _pipelined_chunk_offsets
update can split KV slices on token boundaries that are not aligned to
tokens_per_block, which causes the boundary block to be resent and offsets to
drift. Adjust the prefill chunk selection so every chunk boundary lands on a KV
block boundary (or clamp the sliding-window fallback so it only overlaps when it
evenly divides tokens_per_block), and then recompute _pipelined_chunk_offsets
from the actual block count in the chunk.
| assert rid is not None | ||
|
|
||
| # If pipelined transfer already sent all chunks, skip re-sending. | ||
| if rid in self._send_sessions and rid not in self._pipelined_chunk_offsets: return |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟡 Minor | ⚡ Quick win
Move return to its own line (lint failure) and confirm the skip guard.
Static analysis flags E701 (multiple statements on one line). Beyond the formatting, note the guard rid in self._send_sessions and rid not in self._pipelined_chunk_offsets only skips re-send once the last pipelined chunk has run (which pops rid from _pipelined_chunk_offsets). If respond_and_send_async is ever reached while pipelined chunks are still in flight (rid still present), it falls through and re-sends the full slice set via _create_kv_slices, duplicating the transfer.
🐛 Proposed formatting fix
- if rid in self._send_sessions and rid not in self._pipelined_chunk_offsets: return
+ if rid in self._send_sessions and rid not in self._pipelined_chunk_offsets:
+ return📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if rid in self._send_sessions and rid not in self._pipelined_chunk_offsets: return | |
| if rid in self._send_sessions and rid not in self._pipelined_chunk_offsets: | |
| return |
🧰 Tools
🪛 Ruff (0.15.20)
[error] 648-648: Multiple statements on one line (colon)
(E701)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tensorrt_llm/_torch/disaggregation/transceiver.py` at line 648, The
`respond_and_send_async` skip guard in `transceiver.py` needs both a lint fix
and a logic check: move the `return` onto its own line to satisfy E701, and
verify the condition around `rid in self._send_sessions and rid not in
self._pipelined_chunk_offsets` correctly prevents duplicate sends while
pipelined chunks are still outstanding. If needed, adjust the guard so the full
`_create_kv_slices` resend path only runs when it is safe, using the existing
`_send_sessions` and `_pipelined_chunk_offsets` state to avoid duplicate
transfer.
Source: Linters/SAST tools
| for sender_session, block_ids_per_groups in zip(sender_sessions, ctx_block_ids): | ||
| max_blocks = max(len(ids) for ids in block_ids_per_groups) | ||
| num_chunks = math.ceil(max_blocks / chunk_size_blocks) | ||
| chunk_offset = 0 | ||
| for chunk_idx in range(num_chunks): | ||
| start = chunk_idx * chunk_size_blocks | ||
| end = start + chunk_size_blocks | ||
| is_last = chunk_idx == num_chunks - 1 | ||
| chunk_block_ids = [ids[start:end] for ids in block_ids_per_groups] | ||
| kv_slice = KVSlice( | ||
| is_last_slice=is_last, | ||
| block_ids_per_layer_groups=chunk_block_ids, | ||
| ) | ||
| cuda_event = torch.cuda.Event() | ||
| cuda_event.record() | ||
| send_futures.append( | ||
| sender_session.send( | ||
| kv_slice, chunk_block_offset=chunk_offset, cuda_event=cuda_event | ||
| ) | ||
| ) | ||
| chunk_offset += max(len(ids) for ids in chunk_block_ids) | ||
|
|
||
| receiver_sessions = [ | ||
| tw.create_rx_session(ctx_info["gen_request"]) for tw in gen_transfer_workers | ||
| ] | ||
| recv_futures = [] | ||
| for recv_session, block_ids_per_groups in zip(receiver_sessions, gen_block_ids): | ||
| full_slice = KVSlice( | ||
| is_last_slice=True, | ||
| block_ids_per_layer_groups=block_ids_per_groups, | ||
| ) | ||
| recv_futures.append(recv_session.receive(full_slice)) | ||
|
|
||
| for f in send_futures: | ||
| f.result() | ||
| for f in recv_futures: | ||
| f.result() |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major | ⚡ Quick win
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Confirm send/receive signatures and return types.
rg -nP 'def send\(|def receive\(' \
tensorrt_llm/_torch/disaggregation/native/transfer.py tensorrt_llm/_torch/disaggregation/base/transfer.py -A4Repository: NVIDIA/TensorRT-LLM
Length of output: 1946
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Inspect the test helper around the flagged lines.
sed -n '1760,1845p' tests/unittest/disaggregated/test_kv_transfer.py
printf '\n--- KVSlice definition ---\n'
rg -n "class KVSlice|chunk_block_offset|cuda_event" tensorrt_llm/_torch/disaggregation -A6 -B4
printf '\n--- wait_complete usage in tests ---\n'
rg -n "wait_complete\(" tests/unittest/disaggregated/test_kv_transfer.py tensorrt_llm/_torch/disaggregation -A3 -B3Repository: NVIDIA/TensorRT-LLM
Length of output: 26431
Build the KVSlice metadata before calling send()/receive()
TxSession.send()/RxSession.receive() take only the slice and return None, so passing chunk_block_offset/cuda_event as kwargs and then calling .result() on the return values will fail. Set those fields on KVSlice here, and synchronize with wait_complete()/wait_complete(blocking=True) instead.
🧰 Tools
🪛 Ruff (0.15.20)
[warning] 1789-1789: zip() without an explicit strict= parameter
Add explicit value for parameter strict=
(B905)
[warning] 1815-1815: zip() without an explicit strict= parameter
Add explicit value for parameter strict=
(B905)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/unittest/disaggregated/test_kv_transfer.py` around lines 1789 - 1825,
The send/receive flow is using the wrong API shape: TxSession.send() and
RxSession.receive() should be called with a fully populated KVSlice rather than
extra kwargs, and they do not return futures. Update the test setup around
KVSlice, sender_session.send(), and receiver_sessions/RxSession.receive() to set
chunk_block_offset and cuda_event on the slice object before calling
send/receive, then replace the .result() waits with
wait_complete()/wait_complete(blocking=True) on the session or slice as
appropriate.
Synchronization on KVSlice::cuda_event for pipelined KV block transfer is redundant because PyExecutor::_forward_step() already does this. Signed-off-by: Athena Cai <athenac@nvidia.com>
|
@coderabbitai check the test coverage, including function test, perf test, stress test and multi nodes test. |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
set -euo pipefail
echo "== Changed/mentioned test files =="
for f in \
tests/unittest/disaggregated/test_chunked_transfer.py \
tests/unittest/disaggregated/test_kv_transfer.py \
tests/unittest/llmapi/test_llm_args.py \
tests/integration/defs/accuracy/test_disaggregated_serving.py \
tests/integration/test_lists/test-db/l0_dgx_b200.yml \
tensorrt_llm/_torch/disaggregation/transceiver.py \
tensorrt_llm/_torch/pyexecutor/py_executor.py \
tensorrt_llm/_torch/pyexecutor/kv_cache_transceiver.py
do
if [ -f "$f" ]; then
echo "-- outline: $f"
ast-grep outline "$f" --view expanded || true
echo
else
echo "Missing: $f"
fi
done
echo "== Search for perf/stress/multi-node/disaggregated related tests =="
rg -n -i -C2 \
'(perf|performance|benchmark|stress|soak|multi[-_ ]node|multinode|disaggregated serving|disaggregated|chunked|pipelined)' \
tests tensorrt_llm/_torch | sed -n '1,260p'
echo
echo "== Inspect relevant slices from changed tests =="
sed -n '1,260p' tests/unittest/disaggregated/test_chunked_transfer.py
echo "-----"
sed -n '1,260p' tests/unittest/disaggregated/test_kv_transfer.py
echo "-----"
sed -n '1,220p' tests/integration/defs/accuracy/test_disaggregated_serving.py
echo "-----"
sed -n '1,220p' tests/unittest/llmapi/test_llm_args.py
echo "-----"
sed -n '1,220p' tests/integration/test_lists/test-db/l0_dgx_b200.yml
echo
echo "== Inspect relevant implementation slices for testability points =="
sed -n '1,260p' tensorrt_llm/_torch/disaggregation/transceiver.py
echo "-----"
sed -n '1,220p' tensorrt_llm/_torch/pyexecutor/py_executor.py
echo "-----"
sed -n '1,220p' tensorrt_llm/_torch/pyexecutor/kv_cache_transceiver.pyLength of output: 144072 🏁 Script executed: #!/bin/bash
set -euo pipefail
echo "== Direct test references for new pipelined/chunked entry points =="
rg -n -C2 \
'_maybe_send_prefill_chunk|send_prefill_chunk|enable_pipelined_transfer|respond_and_send_async|request_and_receive_async|_create_kv_slices|create_kv_cache_transceiver' \
tests tensorrt_llm/_torch | sed -n '1,260p'
echo
echo "== Disaggregated multi-node / stress / soak style tests in test tree =="
rg -n -i -C2 \
'(multi[-_ ]node|multinode|soak|stress|long[-_ ]run|longevity|burn[-_ ]in)' \
tests | sed -n '1,220p'
echo
echo "== Show pipelined accuracy test and nearby YAML entries =="
sed -n '720,820p' tests/integration/defs/accuracy/test_disaggregated_serving.py
echo "-----"
rg -n -C2 'chunked_kv_transfer|pipelined_kv_transfer' tests/integration/test_lists/test-db/l0_dgx_b200.ymlLength of output: 49120
What is covered wellFunction / unit tests
Integration / accuracy
Gaps I found1. Some new entry points are not directly tested
So the core mechanics are exercised indirectly, but several newly added integration points are still missing targeted tests. 2. Pipelined accuracy test does not appear to be scheduled in the shown pre-merge list
So pipelined accuracy exists in code, but it does not look wired into this pre-merge list yet. 3. No PR-specific perf validation
Given the PR objective is explicitly performance-oriented, this is a notable gap. 4. No PR-specific stress coverage
But I did not find stress coverage specifically targeting:
5. No PR-specific multi-node coverage
But I did not find multi-node tests specifically exercising this PR’s new chunked/pipelined KV transfer path. Overall assessment
Recommended follow-ups
If you want, I can turn this into a concrete review comment with prioritized test additions. |
Description
Summary
Implements pipelined prefill-transfer for disaggregated serving. Instead of waiting for all prefill chunks to complete before starting KV cache transfer, each chunk's KV data is transferred to the generation server immediately after its prefill completes. This overlaps GPU compute with RDMA transfer, hiding transfer latency behind prefill computation. When transfer time per chunk is less than prefill time per chunk (typical for 100+ Gbps NIC with long-context workloads), transfer latency is nearly fully hidden.
TODO: fix _create_kv_slices and add e2e and accuracy tests
Configuration
Enable chunked KV cache transfer by setting
chunk_size_blocksin the YAML config:Phased Roadmap
This PR is Phase 1 of a 2-phase effort: