Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cpp/tensorrt_llm/nanobind/runtime/moeBindings.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2022-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2022-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -117,6 +117,9 @@ void initMoeBindings(nb::module_& m)
nb::call_guard<nb::gil_scoped_release>())
.def("end_iter", &tr::MoeLoadBalancer::endIter, nb::arg("iter_id"), "End the iteration with the given ID",
nb::call_guard<nb::gil_scoped_release>())
.def("reconfigure_mask_only", &tr::MoeLoadBalancer::reconfigureMaskOnly, nb::arg("dead_ranks"),
"Reconfigure EPLB routing metadata so slots on dead EP ranks are unreachable",
nb::call_guard<nb::gil_scoped_release>())
.def("shutdown", &tr::MoeLoadBalancer::shutdown, "Shutdown the load balancer and clean up resources",
nb::call_guard<nb::gil_scoped_release>());

Expand Down
191 changes: 179 additions & 12 deletions cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2024, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2022-2026, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -54,11 +54,42 @@ struct ReplicaInfo
}
};

namespace
{

bool isRankMasked(std::vector<uint8_t> const* deadRankMask, int rank)
{
return deadRankMask != nullptr && ((*deadRankMask)[rank] != 0);
}

int getActiveSlotCount(
tensorrt_llm::kernels::MoeLoadBalanceMetaInfo const& metaInfo, std::vector<uint8_t> const* deadRankMask)
{
TLLM_CHECK_WITH_INFO(deadRankMask == nullptr || static_cast<int>(deadRankMask->size()) == metaInfo.epSize,
"deadRankMask size (%ld) must match epSize (%d)",
deadRankMask == nullptr ? 0L : static_cast<long>(deadRankMask->size()), metaInfo.epSize);

int activeRankCount = 0;
for (int rank = 0; rank < metaInfo.epSize; ++rank)
{
if (!isRankMasked(deadRankMask, rank))
{
++activeRankCount;
}
}
return activeRankCount * metaInfo.slotCountPerRank;
}
Comment thread
chienchunhung marked this conversation as resolved.

} // namespace

void doReplication(tensorrt_llm::kernels::MoeLoadBalanceMetaInfo metaInfo, float* const expertLoadFactor,
MoePlacementCpuInfo* cpuPlacement)
MoePlacementCpuInfo* cpuPlacement, std::vector<uint8_t> const* deadRankMask)
{
cpuPlacement->expertReplicaCount.resize(metaInfo.expertCount);
int totalSlotCount = metaInfo.epSize * metaInfo.slotCountPerRank;
int totalSlotCount = getActiveSlotCount(metaInfo, deadRankMask);
TLLM_CHECK_WITH_INFO(totalSlotCount >= metaInfo.expertCount,
"Mask-only EPLB reconfigure would leave fewer active slots (%d) than experts (%d)", totalSlotCount,
metaInfo.expertCount);
// --- Edge Case 1: No replication needed ---
if (totalSlotCount == metaInfo.expertCount)
{
Expand Down Expand Up @@ -122,20 +153,21 @@ void doReplication(tensorrt_llm::kernels::MoeLoadBalanceMetaInfo metaInfo, float
}

void doPlacement(tensorrt_llm::kernels::MoeLoadBalanceMetaInfo metaInfo, float* const expertLoadFactor,
MoePlacementCpuInfo* cpuPlacement)
MoePlacementCpuInfo* cpuPlacement, std::vector<uint8_t> const* deadRankMask)
{
// This function only update these two vectors
auto& rankExpertIds = cpuPlacement->rankExpertIds;
auto& replicaCount = cpuPlacement->expertReplicaCount;

int totalSlotCount = metaInfo.epSize * metaInfo.slotCountPerRank;
int totalSlotCount = getActiveSlotCount(metaInfo, deadRankMask);
// 1. Create all replica information
std::vector<ReplicaInfo> allReplicas;
allReplicas.reserve(totalSlotCount);

for (int expertId = 0; expertId < metaInfo.expertCount; ++expertId)
{
assert(replicaCount[expertId] > 0); // Ensure replica count is positive
TLLM_CHECK_WITH_INFO(replicaCount[expertId] > 0, "Replica count (%d) for expert %d must be positive",
replicaCount[expertId], expertId);
double slotSize = expertLoadFactor[expertId] / static_cast<double>(replicaCount[expertId]);
for (int replicaId = 0; replicaId < replicaCount[expertId]; ++replicaId)
{
Expand All @@ -144,14 +176,20 @@ void doPlacement(tensorrt_llm::kernels::MoeLoadBalanceMetaInfo metaInfo, float*
}
}

assert(static_cast<int>(allReplicas.size()) == totalSlotCount);
TLLM_CHECK_WITH_INFO(static_cast<int>(allReplicas.size()) == totalSlotCount,
"Replica count sum (%ld) must match active slot count (%d)", static_cast<long>(allReplicas.size()),
totalSlotCount);

// 2. Sort replicas by slotSize descending
std::sort(allReplicas.begin(), allReplicas.end());

// 3. Maintain Rank state and initialize Priority Queue
std::vector<double> currentRankLoad(metaInfo.epSize, 0.0);
std::vector<int> currentRankSlots(metaInfo.epSize, 0); // Tracks the count of assigned slots per rank
for (int rank = 0; rank < metaInfo.epSize; ++rank)
{
std::fill_n(rankExpertIds[rank].begin(), metaInfo.slotCountPerRank, -1);
}

// Define a min-priority queue storing pairs of {load, rank_id}
using RankLoadPair = std::pair<double, int>;
Expand All @@ -160,8 +198,12 @@ void doPlacement(tensorrt_llm::kernels::MoeLoadBalanceMetaInfo metaInfo, float*
// Initialize the priority queue with all ranks having 0 load
for (int rank = 0; rank < metaInfo.epSize; ++rank)
{
pq.push({0.0, rank});
if (!isRankMasked(deadRankMask, rank))
{
pq.push({0.0, rank});
}
}
TLLM_CHECK_WITH_INFO(!pq.empty(), "Mask-only EPLB reconfigure requires at least one active rank");

// 4. Optimized Greedy assignment using Priority Queue, writing directly to rankExpertIds
for (auto const& replica : allReplicas)
Expand Down Expand Up @@ -252,19 +294,36 @@ void prepareGpuPlacementInfo(tensorrt_llm::kernels::MoeLoadBalanceMetaInfo metaI
// globalSlotIds[i][j] is the list of global slot ids for expert i's j-th replica
// different experts have different number of replicas, so globalSlotIds is a vector of vectors
// the sum of sizes of all vectors in globalSlotIds is equal to the total number of slots
int const totalSlotCount = metaInfo.epSize * metaInfo.slotCountPerRank;
int const invalidSlotId = totalSlotCount;
std::fill_n(cpuPlacement->placementInfoForGPU.globalSlotIds, totalSlotCount, invalidSlotId);

std::vector<std::vector<int>> globalSlotIds(metaInfo.expertCount);
int assignedSlotCount = 0;
for (int rank = 0; rank < metaInfo.epSize; ++rank)
{
for (int slotId = 0; slotId < metaInfo.slotCountPerRank; ++slotId)
{
int expertId = cpuPlacement->rankExpertIds[rank][slotId];
if (expertId < 0)
{
continue;
}
TLLM_CHECK_WITH_INFO(expertId < metaInfo.expertCount, "expertId=%d should be in range [0, %d)", expertId,
metaInfo.expertCount);
int replicaId = globalSlotIds[expertId].size();
TLLM_CHECK_WITH_INFO(replicaId < cpuPlacement->placementInfoForGPU.expertReplicaCount[expertId],
"Expert %d has more placed replicas than its replica count (%d)", expertId,
cpuPlacement->placementInfoForGPU.expertReplicaCount[expertId]);
int globalSlotId = rank * metaInfo.slotCountPerRank + slotId;
globalSlotIds[expertId].push_back(globalSlotId);
int offset = cpuPlacement->placementInfoForGPU.expertReplicaStartOffset[expertId] + replicaId;
cpuPlacement->placementInfoForGPU.globalSlotIds[offset] = globalSlotId;
++assignedSlotCount;
}
}
TLLM_CHECK_WITH_INFO(assignedSlotCount == startOffset,
"Placed slot count (%d) must match active replica count (%d)", assignedSlotCount, startOffset);
// printCpuPlacementInfo(metaInfo, cpuPlacement);
}

Expand Down Expand Up @@ -515,6 +574,61 @@ void SingleLayerMoeLoadBalancer::waitLastUpdateDone()
}
}

std::vector<int> SingleLayerMoeLoadBalancer::computeMaskOnlyReplicaCounts(
std::vector<uint8_t> const& deadRankMask) const
{
TLLM_CHECK_WITH_INFO(static_cast<int>(deadRankMask.size()) == mMetaInfo.epSize,
"deadRankMask size (%ld) must match epSize (%d)", static_cast<long>(deadRankMask.size()), mMetaInfo.epSize);

std::vector<int> expertReplicaCount(mMetaInfo.expertCount, 0);
for (int rank = 0; rank < mMetaInfo.epSize; ++rank)
{
bool const rankDead = deadRankMask[rank] != 0;
for (int localSlotId = 0; localSlotId < mMetaInfo.slotCountPerRank; ++localSlotId)
{
int expertId = mCpuPlacementInfo.oldRankExpertIds[rank][localSlotId];
TLLM_CHECK_WITH_INFO(expertId == -1 || (expertId >= 0 && expertId < mMetaInfo.expertCount),
"expertId=%d should be -1 or in range [0, %d)", expertId, mMetaInfo.expertCount);
if (!rankDead && expertId >= 0)
{
++expertReplicaCount[expertId];
}
}
}

for (int expertId = 0; expertId < mMetaInfo.expertCount; ++expertId)
{
TLLM_CHECK_WITH_INFO(expertReplicaCount[expertId] > 0,
"Mask-only EPLB reconfigure would leave expert %d with no surviving replica", expertId);
}
return expertReplicaCount;
}

void SingleLayerMoeLoadBalancer::validateMaskOnly(std::vector<uint8_t> const& deadRankMask) const
{
(void) computeMaskOnlyReplicaCounts(deadRankMask);
}

void SingleLayerMoeLoadBalancer::reconfigureMaskOnly(std::vector<uint8_t> const& deadRankMask)
{
mCpuPlacementInfo.expertReplicaCount = computeMaskOnlyReplicaCounts(deadRankMask);

for (int rank = 0; rank < mMetaInfo.epSize; ++rank)
{
bool const rankDead = deadRankMask[rank] != 0;
for (int localSlotId = 0; localSlotId < mMetaInfo.slotCountPerRank; ++localSlotId)
{
int expertId = mCpuPlacementInfo.oldRankExpertIds[rank][localSlotId];
mCpuPlacementInfo.rankExpertIds[rank][localSlotId] = rankDead ? -1 : expertId;
}
}

prepareGpuPlacementInfo(mMetaInfo, &mCpuPlacementInfo);
copyPlacementInfoToGpu();
TLLM_CUDA_CHECK(cudaEventRecord(mUpdateWeightsDoneEvent, mMoeLoadBalancer->mStream));
TLLM_CUDA_CHECK(cudaEventSynchronize(mUpdateWeightsDoneEvent));
}

cudaStream_t SingleLayerMoeLoadBalancer::getStream() const
{
return mMoeLoadBalancer->mStream;
Expand Down Expand Up @@ -561,8 +675,9 @@ void SingleLayerMoeLoadBalancer::copyPlacementInfoToGpuByCpu()

void SingleLayerMoeLoadBalancer::updateWeightsRoutine()
{
doReplication(mMetaInfo, mStatisticInfo.expertLoadFactor, &mCpuPlacementInfo);
doPlacement(mMetaInfo, mStatisticInfo.expertLoadFactor, &mCpuPlacementInfo);
auto const deadRankMask = mMoeLoadBalancer->getDeadRankMaskSnapshot();
doReplication(mMetaInfo, mStatisticInfo.expertLoadFactor, &mCpuPlacementInfo, &deadRankMask);
doPlacement(mMetaInfo, mStatisticInfo.expertLoadFactor, &mCpuPlacementInfo, &deadRankMask);
prepareGpuPlacementInfo(mMetaInfo, &mCpuPlacementInfo);
mWeightUpdater->updateWeights(&mCpuPlacementInfo);
copyPlacementInfoToGpu();
Expand All @@ -575,8 +690,9 @@ void SingleLayerMoeLoadBalancer::updateWeightsRoutine()

void SingleLayerMoeLoadBalancer::updateWeightsRoutineByCpu()
{
doReplication(mMetaInfo, mStatisticInfo.expertLoadFactor, &mCpuPlacementInfo);
doPlacement(mMetaInfo, mStatisticInfo.expertLoadFactor, &mCpuPlacementInfo);
auto const deadRankMask = mMoeLoadBalancer->getDeadRankMaskSnapshot();
doReplication(mMetaInfo, mStatisticInfo.expertLoadFactor, &mCpuPlacementInfo, &deadRankMask);
doPlacement(mMetaInfo, mStatisticInfo.expertLoadFactor, &mCpuPlacementInfo, &deadRankMask);
prepareGpuPlacementInfo(mMetaInfo, &mCpuPlacementInfo);
mLastUpdateTaskId = mMoeLoadBalancer->addCopyTask(
[this](int rank, int size) { mWeightUpdater->updateWeights(&mCpuPlacementInfo, rank, size); });
Expand Down Expand Up @@ -832,6 +948,7 @@ void HostMemoryMoeWeightUpdater::updateWeights(
MoeLoadBalancer::MoeLoadBalancer(int epRank, int epSize, int layerUpdatesPerIter)
: mEpRank{epRank}
, mEpSize{epSize}
, mDeadRankMask(epSize, 0)
, mLayerUpdatesPerIter{layerUpdatesPerIter}
{
TLLM_CUDA_CHECK(cudaGetDevice(&mCudaDeviceId));
Expand Down Expand Up @@ -947,9 +1064,11 @@ void MoeLoadBalancer::startIter(int64_t iterId, bool enableStatistic, bool enabl
{
std::unique_lock<std::mutex> lock(mWorkerThreadMutex);
TLLM_CHECK_WITH_INFO(mModelFinalized == true, "Model is not finalized, cannot start iteration.");
TLLM_CHECK_WITH_INFO(!mIterActive, "Cannot start iteration while another iteration is active.");
TLLM_CHECK_WITH_INFO(mIterId + 1 == iterId, "Expected iterId=%ld, but got %ld", mIterId + 1, iterId);

mIterId = iterId;
mIterActive = true;
// disable update for warm up iters.
bool isWarmUpIter = mIterId <= mWarmUpUntilIter;
bool fixedUpdateWeightsEnabled = enableUpdateWeights && !isWarmUpIter;
Expand All @@ -961,7 +1080,10 @@ void MoeLoadBalancer::startIter(int64_t iterId, bool enableStatistic, bool enabl

void MoeLoadBalancer::endIter(int64_t iterId)
{
std::unique_lock<std::mutex> lock(mWorkerThreadMutex);
TLLM_CHECK_WITH_INFO(mIterActive, "No active iteration to end.");
TLLM_CHECK_WITH_INFO(mIterId == iterId, "endIter expected iterId=%ld, but got %ld", mIterId, iterId);
mIterActive = false;
}

void MoeLoadBalancer::shutdown()
Expand All @@ -979,6 +1101,45 @@ void MoeLoadBalancer::shutdown()
}
}

void MoeLoadBalancer::reconfigureMaskOnly(std::vector<int> const& deadRanks)
{
std::unique_lock<std::mutex> workerLock(mWorkerThreadMutex);
TLLM_CHECK_WITH_INFO(mModelFinalized == true, "Model is not finalized, cannot reconfigure mask-only placement.");
TLLM_CHECK_WITH_INFO(!mIterActive, "Cannot reconfigure mask-only placement while an iteration is active.");
TLLM_CHECK_WITH_INFO(mIterInfoQueue.empty(), "Cannot reconfigure mask-only placement while iterations are queued.");

std::vector<uint8_t> candidateDeadRankMask;
{
std::lock_guard<std::mutex> maskLock(mDeadRankMaskMutex);
candidateDeadRankMask = mDeadRankMask;
for (int deadRank : deadRanks)
{
TLLM_CHECK_WITH_INFO(
deadRank >= 0 && deadRank < mEpSize, "deadRank=%d should be in range [0, %d)", deadRank, mEpSize);
candidateDeadRankMask[deadRank] = 1;
}
TLLM_CHECK_WITH_INFO(
candidateDeadRankMask[mEpRank] == 0, "Local epRank (%d) cannot be masked by a survivor", mEpRank);
}

for (auto& layer : mLayers)
{
layer->waitLastUpdateDone();
}
for (auto& layer : mLayers)
{
layer->validateMaskOnly(candidateDeadRankMask);
}
for (auto& layer : mLayers)
{
layer->reconfigureMaskOnly(candidateDeadRankMask);
}
{
std::lock_guard<std::mutex> maskLock(mDeadRankMaskMutex);
mDeadRankMask = candidateDeadRankMask;
}
}

void MoeLoadBalancer::workerThread()
{
TLLM_CUDA_CHECK(cudaSetDevice(mCudaDeviceId));
Expand Down Expand Up @@ -1064,6 +1225,12 @@ void MoeLoadBalancer::waitCopyTaskDone(int64_t taskId)
}
}

std::vector<uint8_t> MoeLoadBalancer::getDeadRankMaskSnapshot()
{
std::lock_guard<std::mutex> lock(mDeadRankMaskMutex);
return mDeadRankMask;
}

MultiThreadWorker::MultiThreadWorker(int numThreads, int cudaDeviceId)
: mNumThreads(numThreads)
, mCudaDeviceId(cudaDeviceId)
Expand Down
Loading
Loading