Skip to content

Commit 2e6abd1

Browse files
[https://nvbugs/6179661][fix] Harden disagg cache transceiver teardown (#15422)
Signed-off-by: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com>
1 parent 4d7bf0d commit 2e6abd1

2 files changed

Lines changed: 21 additions & 9 deletions

File tree

cpp/tensorrt_llm/batch_manager/cacheTransceiver.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,11 @@ CacheTransceiver::CacheTransceiver(kv_cache_manager::BaseKVCacheManager* cacheMa
508508

509509
CacheTransceiver::~CacheTransceiver()
510510
{
511+
// Stop sender/receiver workers while the connection manager and transfer
512+
// plugin are still alive. The workers can access both during termination.
513+
mCacheSender.reset();
514+
mCacheReceiver.reset();
515+
511516
if (mWrapperLibHandle)
512517
{
513518
std::lock_guard<std::mutex> lock(mDllMutex);

cpp/tensorrt_llm/batch_manager/dataTransceiver.cpp

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
* SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
33
* SPDX-License-Identifier: Apache-2.0
44
*
55
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -33,6 +33,7 @@
3333
#include <future>
3434
#include <map>
3535
#include <memory>
36+
#include <optional>
3637
#include <unordered_map>
3738

3839
namespace tensorrt_llm::batch_manager
@@ -351,7 +352,7 @@ class CacheSender::Impl
351352
mRequestToSession.erase(it);
352353
}
353354

354-
[[nodiscard]] RequestInfo recvRequestInfo()
355+
[[nodiscard]] std::optional<RequestInfo> recvRequestInfo()
355356
{
356357
auto* agentConnectionManager = dynamic_cast<executor::kv_cache::AgentConnectionManager*>(mManager);
357358
bool isAgent = agentConnectionManager != nullptr;
@@ -361,10 +362,10 @@ class CacheSender::Impl
361362
auto const* connection = isAgent
362363
? agentConnectionManager->recvConnectionAndRequestInfo(info, mTerminate)
363364
: mManager->recvConnect(DataContext{TransceiverTag::kID_TAG, mTerminate}, &id, sizeof(id));
364-
if (connection == nullptr && !mManager->isRunning())
365+
if (connection == nullptr)
365366
{
366-
TLLM_LOG_WARNING(" recvRequestInfo connection is nullptr, maybe the server is terminating");
367-
return info;
367+
TLLM_LOG_WARNING("recvRequestInfo connection is nullptr, maybe the server is terminating");
368+
return std::nullopt;
368369
}
369370

370371
if (!isAgent)
@@ -639,12 +640,12 @@ class CacheSender::Impl
639640
}
640641
if (!mReadyResponses.empty())
641642
{
642-
auto const& requestInfo = recvRequestInfo();
643-
if (mTerminate || !mManager->isRunning())
643+
auto requestInfo = recvRequestInfo();
644+
if (!requestInfo.has_value() || mTerminate || !mManager->isRunning())
644645
{
645646
return;
646647
}
647-
auto reqId = requestInfo.getRequestId();
648+
auto reqId = requestInfo->getRequestId();
648649

649650
{
650651
std::scoped_lock lk(mSenderMutex);
@@ -674,6 +675,10 @@ class CacheSender::Impl
674675
}
675676
it = getCurrentResponse();
676677
}
678+
if (mTerminate || it == mReadyResponses.end())
679+
{
680+
break;
681+
}
677682
sendResponse(it);
678683
}
679684
}
@@ -1288,7 +1293,9 @@ void CacheSender::sendSync(LlmRequest const& llmRequest)
12881293

12891294
RequestInfo CacheSender::recvRequestInfo()
12901295
{
1291-
return mImpl->recvRequestInfo();
1296+
auto requestInfo = mImpl->recvRequestInfo();
1297+
TLLM_CHECK(requestInfo.has_value());
1298+
return *requestInfo;
12921299
}
12931300

12941301
bool CacheSender::cancelRequest(LlmRequest const& llmRequest)

0 commit comments

Comments
 (0)