Skip to content

Commit a2cbdcf

Browse files
committed
Revert early block release implementation
Signed-off-by: Athena Cai <athenac@nvidia.com>
1 parent 849febf commit a2cbdcf

10 files changed

Lines changed: 16 additions & 474 deletions

File tree

cpp/include/tensorrt_llm/batch_manager/kvCacheManager.h

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -681,15 +681,6 @@ class GenerationRequest
681681
++mNumFrontBlocksRemovedPerWindow.at(windowSize);
682682
}
683683

684-
//! \brief Advance the per-window front-block counter without touching cache blocks.
685-
//! \details Used by ``BlockManager::releasePrefixBlocks`` to advance the
686-
//! single-window front-block counter once after every ``WindowBlockManager`` has
687-
//! processed the same prefix range.
688-
void incrementNumFrontBlocksRemoved(SizeType32 windowSize)
689-
{
690-
++mNumFrontBlocksRemovedPerWindow.at(windowSize);
691-
}
692-
693684
void removeLastBlock(SizeType32 windowSize)
694685
{
695686
for (auto& beamBlockIds : mCacheBlockIds.at(windowSize))
@@ -982,14 +973,6 @@ class WindowBlockManager
982973
std::optional<KVCacheBlock::IdType> releaseBlocks(
983974
GenerationRequest& sequence, OptionalRef<LlmRequest const> llmRequest);
984975

985-
//! \brief Release prefix blocks in range [startIdx, numBlocks) for a sequence.
986-
//! \details Used by disaggregated serving to free sender-side KV memory
987-
//! for blocks whose data has already been transferred. Reuses the
988-
//! detachFrontBlock mechanism (decRefCount + eviction policy release).
989-
//! Called by BlockManager::releasePrefixBlocks which coordinates the
990-
//! single-window front-block counter across all window managers.
991-
void releasePrefixBlocks(GenerationRequest& sequence, SizeType32 startIdx, SizeType32 numBlocks);
992-
993976
//! \brief Simulate freeing all blocks for that sequence to check impact on number of free blocks
994977
void schedulingReleaseBlocks(LlmRequest::RequestIdType requestId);
995978

@@ -1531,13 +1514,6 @@ class BlockManager
15311514
std::optional<KVCacheBlock::IdType> releaseBlocks(
15321515
GenerationRequest& sequence, OptionalRef<LlmRequest const> llmRequest = std::nullopt, bool pinBlocks = false);
15331516

1534-
//! \brief Release the first numBlocks prefix blocks of a sequence.
1535-
//! \details Mirrors detachFrontBlock logic: decRefCount + eviction policy
1536-
//! release for each prefix block. The front-block counter on
1537-
//! GenerationRequest ensures releaseBlocks (called during removeSequence)
1538-
//! skips already-freed prefix blocks.
1539-
void releasePrefixBlocks(GenerationRequest& sequence, SizeType32 numBlocks);
1540-
15411517
[[nodiscard]] std::vector<KVCacheBlock::IdType> storeBlocksForReuse(
15421518
GenerationRequest& sequence, OptionalRef<LlmRequest const> llmRequest = std::nullopt, bool pinBlocks = false);
15431519

@@ -2455,11 +2431,6 @@ class KVCacheManager : public BaseKVCacheManager
24552431
[[nodiscard]] std::optional<KVCacheBlock::IdType> removeSequence(LlmRequest::RequestIdType requestId,
24562432
OptionalRef<LlmRequest const> llmRequest = std::nullopt, bool pinOnRelease = false) override;
24572433

2458-
//! \brief Release prefix blocks for a sequence without removing it.
2459-
//! \details Used by disaggregated serving for early block release during
2460-
//! chunked KV cache transfer. No-op if the sequence does not exist.
2461-
void releasePrefixBlocks(LlmRequest::RequestIdType requestId, SizeType32 numBlocks);
2462-
24632434
void schedulingRemoveSequence(LlmRequest::RequestIdType requestId) override;
24642435

24652436
[[nodiscard]] runtime::ITensor::SharedPtr getBlockPoolPointers() const override

cpp/tensorrt_llm/batch_manager/kvCacheManager.cpp

Lines changed: 0 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -2897,34 +2897,6 @@ std::optional<KVCacheBlock::IdType> BlockManager::releaseBlocks(
28972897
return lastStoredId;
28982898
}
28992899

2900-
void BlockManager::releasePrefixBlocks(GenerationRequest& sequence, SizeType32 numBlocks)
2901-
{
2902-
// NOTE: This assumes a single window size (no VSWA). With different window
2903-
// sizes, each WindowBlockManager may have a different number of allocated
2904-
// blocks, so releasing the same numBlocks from all managers would need
2905-
// per-window-size handling. Disaggregated serving does not support VSWA
2906-
// today (gated by should_store_blocks: not is_vswa in the executor and
2907-
// beamWidth == 1 assertion in WindowBlockManager::releasePrefixBlocks).
2908-
//
2909-
auto const windowSize = mWindowBlockManagers.cbegin()->first;
2910-
// Snapshot the counter before iterating so that every WindowBlockManager
2911-
// releases the same range. Without this, the first manager would advance
2912-
// the single-window front-block counter and subsequent managers would see
2913-
// the counter already at the target, skipping their own blocks.
2914-
SizeType32 const startIdx = sequence.getNumFrontBlocksRemoved(windowSize);
2915-
for (auto& [_, manager] : mWindowBlockManagers)
2916-
{
2917-
manager.releasePrefixBlocks(sequence, startIdx, numBlocks);
2918-
}
2919-
// Advance the single-window counter once, after all managers have released.
2920-
// Uses incrementNumFrontBlocksRemoved (counter-only) instead of
2921-
// removeFrontBlock so the intent is explicit.
2922-
while (sequence.getNumFrontBlocksRemoved(windowSize) < numBlocks)
2923-
{
2924-
sequence.incrementNumFrontBlocksRemoved(windowSize);
2925-
}
2926-
}
2927-
29282900
void BlockManager::pinBlocks(GenerationRequest& sequence)
29292901
{
29302902
for (auto& [_, manager] : mWindowBlockManagers)
@@ -3737,43 +3709,6 @@ void WindowBlockManager::detachFrontBlock(GenerationRequest& sequence)
37373709
sequence.getNumFrontBlocksRemoved(mWindowSize));
37383710
}
37393711

3740-
void WindowBlockManager::releasePrefixBlocks(GenerationRequest& sequence, SizeType32 startIdx, SizeType32 numBlocks)
3741-
{
3742-
TLLM_CHECK_WITH_INFO(
3743-
sequence.getBeamWidth() == 1, "[kv cache manager] releasePrefixBlocks does not support beamWidth > 1");
3744-
3745-
auto const requestId = sequence.getRequestId();
3746-
auto& allocatedBlocks = mAllocatedBlocksPerSeq.at(requestId);
3747-
SizeType32 const target = std::min(numBlocks, static_cast<SizeType32>(allocatedBlocks.size()));
3748-
3749-
// Release blocks in range [startIdx, target). The single-window
3750-
// front-block counter is advanced by BlockManager after
3751-
// all WindowBlockManagers have processed the same range.
3752-
for (SizeType32 blockIdx = startIdx; blockIdx < target; ++blockIdx)
3753-
{
3754-
auto& block = allocatedBlocks.at(blockIdx);
3755-
auto releasedBlock = block;
3756-
3757-
TLLM_LOG_DEBUG("%s::releasePrefixBlocks - Releasing block %d from sequence %lu", mLogPrefix.c_str(),
3758-
releasedBlock->getBlockId(), requestId);
3759-
3760-
// Replace the sequence slot with a placeholder, matching detachFrontBlock().
3761-
// removeSequence later walks allocatedBlocks in releaseBlocks(); leaving the
3762-
// real block here would release it a second time and corrupt the eviction
3763-
// policy's free-block count.
3764-
block = KVCacheBlock::createPlaceholder();
3765-
3766-
if (releasedBlock->hasRefs())
3767-
{
3768-
releasedBlock->decRefCount();
3769-
}
3770-
if (!releasedBlock->hasRefs())
3771-
{
3772-
mEvictionPolicy->releaseBlock(releasedBlock);
3773-
}
3774-
}
3775-
}
3776-
37773712
PrefixReuseSummary KVCacheManager::analyzePrefixReuse(
37783713
VecUniqueTokens const& uniqueTokens, LlmRequest const& llmRequest) const
37793714
{
@@ -3950,31 +3885,6 @@ std::optional<KVCacheBlock::IdType> KVCacheManager::removeSequence(
39503885
return lastStoredId;
39513886
}
39523887

3953-
void KVCacheManager::releasePrefixBlocks(RequestIdType requestId, SizeType32 numBlocks)
3954-
{
3955-
// Hard precondition: BlockManager::releasePrefixBlocks advances the
3956-
// single-window front-block counter to numBlocks for every WindowBlockManager,
3957-
// even when a window has fewer than numBlocks allocated. Under variable
3958-
// sliding window attention (VSWA), that would cause WindowBlockManager::
3959-
// releaseBlocks (called during removeSequence) to underrun rbegin() and
3960-
// skip tail blocks for the smaller window. Disagg serving already gates
3961-
// VSWA out, but we enforce the assumption here so the C++ API contract is
3962-
// self-defending instead of relying on caller discipline.
3963-
TLLM_CHECK_WITH_INFO(
3964-
!mBlockManager.isVariableWindow(), "releasePrefixBlocks does not support variable sliding window attention");
3965-
if (numBlocks <= 0)
3966-
{
3967-
return;
3968-
}
3969-
std::scoped_lock lock(mSequencesMtx);
3970-
auto it = mSequences.find(requestId);
3971-
if (it == mSequences.end())
3972-
{
3973-
return;
3974-
}
3975-
mBlockManager.releasePrefixBlocks(it->second, numBlocks);
3976-
}
3977-
39783888
std::vector<KVCacheBlock::IdType> KVCacheManager::storeBlocksForReuse(
39793889
RequestIdType requestId, OptionalRef<LlmRequest const> llmRequest, bool pinBlocks)
39803890
{

cpp/tensorrt_llm/nanobind/batch_manager/kvCacheManager.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -683,9 +683,7 @@ void tb::kv_cache_manager::KVCacheManagerBindings::initBindings(nb::module_& m)
683683
.def("copy_linear_attention_block", &tbk::KVCacheManager::copyLinearAttentionBlock, nb::arg("llm_request"),
684684
nb::call_guard<nb::gil_scoped_release>())
685685
.def("copy_linear_attention_block_batch", &tbk::KVCacheManager::copyLinearAttentionBlockBatch,
686-
nb::arg("llm_requests"), nb::call_guard<nb::gil_scoped_release>())
687-
.def("release_prefix_blocks", &tbk::KVCacheManager::releasePrefixBlocks, nb::arg("request_id"),
688-
nb::arg("num_blocks"), nb::call_guard<nb::gil_scoped_release>());
686+
nb::arg("llm_requests"), nb::call_guard<nb::gil_scoped_release>());
689687
}
690688

691689
void tb::BasePeftCacheManagerBindings::initBindings(nb::module_& m)

cpp/tests/unit_tests/batch_manager/kvCacheManagerTest.cpp

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -270,59 +270,6 @@ TEST_F(KVCacheManagerTest, BlockManagerTest)
270270
std::runtime_error);
271271
}
272272

273-
TEST_F(KVCacheManagerTest, BlockManagerReleasePrefixBlocksDoesNotDoubleFreeOnTeardown)
274-
{
275-
auto constexpr numLayers = 12;
276-
auto constexpr numKvHeads = 6;
277-
auto constexpr sizePerHead = 128;
278-
auto constexpr tokensPerBlock = 4;
279-
auto constexpr blocksInPrimaryPool = 8;
280-
auto constexpr blocksInSecondaryPool = 0;
281-
auto constexpr maxNumSequences = 8;
282-
auto const stream = std::make_shared<tr::CudaStream>();
283-
284-
auto constexpr beamWidth = 1;
285-
auto constexpr numBlocksPerBeam = 4;
286-
auto constexpr numTokens = tokensPerBlock * numBlocksPerBeam;
287-
auto constexpr maxAttentionWindow = numTokens;
288-
289-
auto const blocksPerWindow = BlocksPerWindow{{maxAttentionWindow, {blocksInPrimaryPool, blocksInSecondaryPool}}};
290-
291-
BlockManager blockManager(std::vector(numLayers, numKvHeads), sizePerHead, tokensPerBlock, blocksPerWindow,
292-
maxNumSequences, stream, maxAttentionWindow, beamWidth,
293-
std::vector<BlockManager::SizeType32>{maxAttentionWindow}, nvinfer1::DataType::kHALF, 0, maxAttentionWindow);
294-
blockManager.allocatePools(false);
295-
296-
SizeType32 constexpr maxNewTokens{0};
297-
tr::SamplingConfig const samplingConfig{beamWidth};
298-
bool constexpr isStreaming{false};
299-
300-
auto tokens = std::make_shared<VecTokens>();
301-
for (SizeType32 i = 0; i < numTokens; ++i)
302-
{
303-
tokens->push_back(i);
304-
}
305-
306-
LlmRequest::RequestIdType constexpr requestId{42};
307-
auto llmReq = std::make_shared<LlmRequest>(requestId, maxNewTokens, tokens, samplingConfig, isStreaming);
308-
GenerationRequest seq{requestId, numTokens, beamWidth, blockManager.getWindowSizesMetadata()};
309-
310-
(void) blockManager.addSequenceBatch(
311-
{&seq}, {numTokens}, {numBlocksPerBeam}, {std::ref(*llmReq)}, maxAttentionWindow, /*isEnableBlockReuse=*/false);
312-
EXPECT_EQ(blockManager.getNumFreeBlocks(), blocksInPrimaryPool - numBlocksPerBeam);
313-
314-
blockManager.releasePrefixBlocks(seq, 2);
315-
EXPECT_EQ(blockManager.getNumFreeBlocks(), blocksInPrimaryPool - 2);
316-
317-
// releasePrefixBlocks has cumulative semantics. This should release only
318-
// one additional block rather than releasing the first two again.
319-
blockManager.releasePrefixBlocks(seq, 3);
320-
EXPECT_EQ(blockManager.getNumFreeBlocks(), blocksInPrimaryPool - 1);
321-
322-
blockManager.releaseBlocks(seq);
323-
EXPECT_EQ(blockManager.getNumFreeBlocks(), blocksInPrimaryPool);
324-
}
325-
326273
template <typename T>
327274
void writePatternToOffloadedBlocksDRAM(T* rawBlockPtr, int blockSize, int mask)
328275
{

tensorrt_llm/_torch/disaggregation/native/transfer.py

Lines changed: 3 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import weakref
88
from dataclasses import dataclass
99
from enum import Enum
10-
from typing import Callable, List, Optional, Union
10+
from typing import List, Optional, Union
1111

1212
import msgpack
1313
import numpy as np
@@ -57,8 +57,6 @@
5757
AttentionTypeCpp = tensorrt_llm.bindings.internal.batch_manager.AttentionType
5858
LlmRequestType = tensorrt_llm.bindings.internal.batch_manager.LlmRequestType
5959

60-
OnChunkTransferredCallback = Callable[[int, int, int], None]
61-
6260
# Number of worker threads for KV transfer queues (default: 1)
6361
KV_TRANSFER_NUM_THREADS = int(os.environ.get("TRTLLM_KV_TRANSFER_NUM_THREADS", "1"))
6462

@@ -569,29 +567,6 @@ def _deliver_kv_to_agent(self, write_meta: WriteMeta):
569567
)
570568
else:
571569
task.complete()
572-
if session._on_chunk_transferred is not None:
573-
try:
574-
# Use the max across layer groups as the
575-
# cumulative release count. For asymmetric
576-
# layer groups (e.g., sliding window), shorter
577-
# groups may have fewer blocks per chunk, but
578-
# each WindowBlockManager independently clamps
579-
# to its own allocated block count via
580-
# min(numBlocks, allocatedBlocks.size()).
581-
num_blocks = max(
582-
(len(ids) for ids in task._slice.block_ids_per_layer_groups),
583-
default=0,
584-
)
585-
session._on_chunk_transferred(
586-
request_id=session.request_id,
587-
chunk_block_offset=task._slice.chunk_block_offset,
588-
num_blocks=num_blocks,
589-
)
590-
except Exception as e:
591-
logger.warning(
592-
f"on_chunk_transferred callback failed for "
593-
f"request {session.request_id} slice {write_meta.slice_id}: {e}"
594-
)
595570

596571
logger.debug(
597572
f"deliver_kv_to_agent completed: unique_rid={write_meta.unique_rid}, "
@@ -751,10 +726,10 @@ def _build_kv_write_meta(self, task: KVSendTask, req_info: RecvReqInfo) -> Write
751726
f"src={src_block_ids.size}, dst={dst_block_ids.size}"
752727
)
753728
dst_block_ids = dst_block_ids[:-1]
754-
elif block_diff != 0:
729+
elif block_diff > 1:
755730
raise ValueError(
756731
f"src/dst block count mismatch: {src_block_ids.size} vs "
757-
f"{dst_block_ids.size} (expected 0 <= diff <= 1)"
732+
f"{dst_block_ids.size} (expected diff <= 1)"
758733
)
759734
tpb = extractor.page_table.tokens_per_block
760735
token_range = task._slice.token_range
@@ -1131,7 +1106,6 @@ def __init__(
11311106
timeout_s: Optional[float] = None,
11321107
prompt_len: Optional[int] = None,
11331108
beam_width: int = 1,
1134-
on_chunk_transferred: Optional[OnChunkTransferredCallback] = None,
11351109
):
11361110
super().__init__(
11371111
sender,
@@ -1147,7 +1121,6 @@ def __init__(
11471121
self.kv_tasks = []
11481122
self.aux_task = None
11491123
self.lock = threading.Lock()
1150-
self._on_chunk_transferred = on_chunk_transferred
11511124

11521125
self._exception: Optional[Exception] = None
11531126
self._closed = False
@@ -2046,16 +2019,11 @@ def populate_instance_and_rank_info(self, endpoints: list[str], layer_num_per_pp
20462019
def create_tx_session(
20472020
self,
20482021
request: LlmRequest,
2049-
on_chunk_transferred: Optional[OnChunkTransferredCallback] = None,
20502022
) -> TxSession:
20512023
"""Create a TxSession for the given request.
20522024
20532025
Args:
20542026
request: The LLM request to create a send session for.
2055-
on_chunk_transferred: Optional callback invoked on the
2056-
sender worker thread after each chunk's RDMA completes.
2057-
Signature: ``(request_id: int, chunk_block_offset: int,
2058-
num_blocks: int) -> None``.
20592027
20602028
Returns:
20612029
A new ``TxSession`` ready to accept ``send()`` calls.
@@ -2070,7 +2038,6 @@ def create_tx_session(
20702038
timeout_s=self._config.tx_timeout_s,
20712039
prompt_len=request.prompt_len,
20722040
beam_width=request.py_beam_width,
2073-
on_chunk_transferred=on_chunk_transferred,
20742041
)
20752042

20762043
def create_rx_session(self, request: LlmRequest) -> RxSession:

0 commit comments

Comments
 (0)