Skip to content

Commit a3ac34c

Browse files
committed
[None][feat] WideEP FT: add EPLB mask-only reconfigure
Signed-off-by: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com>
1 parent 7193f41 commit a3ac34c

6 files changed

Lines changed: 330 additions & 17 deletions

File tree

cpp/tensorrt_llm/nanobind/runtime/moeBindings.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* SPDX-FileCopyrightText: Copyright (c) 2022-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
* SPDX-FileCopyrightText: Copyright (c) 2022-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
33
* SPDX-License-Identifier: Apache-2.0
44
*
55
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -117,6 +117,9 @@ void initMoeBindings(nb::module_& m)
117117
nb::call_guard<nb::gil_scoped_release>())
118118
.def("end_iter", &tr::MoeLoadBalancer::endIter, nb::arg("iter_id"), "End the iteration with the given ID",
119119
nb::call_guard<nb::gil_scoped_release>())
120+
.def("reconfigure_mask_only", &tr::MoeLoadBalancer::reconfigureMaskOnly, nb::arg("dead_ranks"),
121+
"Reconfigure EPLB routing metadata so slots on dead EP ranks are unreachable",
122+
nb::call_guard<nb::gil_scoped_release>())
120123
.def("shutdown", &tr::MoeLoadBalancer::shutdown, "Shutdown the load balancer and clean up resources",
121124
nb::call_guard<nb::gil_scoped_release>());
122125

cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.cpp

Lines changed: 171 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022-2024, NVIDIA CORPORATION. All rights reserved.
2+
* Copyright (c) 2022-2026, NVIDIA CORPORATION. All rights reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -54,11 +54,39 @@ struct ReplicaInfo
5454
}
5555
};
5656

57+
namespace
58+
{
59+
60+
bool isRankMasked(std::vector<uint8_t> const* deadRankMask, int rank)
61+
{
62+
return deadRankMask != nullptr && rank >= 0 && rank < static_cast<int>(deadRankMask->size())
63+
&& ((*deadRankMask)[rank] != 0);
64+
}
65+
66+
int getActiveSlotCount(
67+
tensorrt_llm::kernels::MoeLoadBalanceMetaInfo const& metaInfo, std::vector<uint8_t> const* deadRankMask)
68+
{
69+
int activeRankCount = 0;
70+
for (int rank = 0; rank < metaInfo.epSize; ++rank)
71+
{
72+
if (!isRankMasked(deadRankMask, rank))
73+
{
74+
++activeRankCount;
75+
}
76+
}
77+
return activeRankCount * metaInfo.slotCountPerRank;
78+
}
79+
80+
} // namespace
81+
5782
void doReplication(tensorrt_llm::kernels::MoeLoadBalanceMetaInfo metaInfo, float* const expertLoadFactor,
58-
MoePlacementCpuInfo* cpuPlacement)
83+
MoePlacementCpuInfo* cpuPlacement, std::vector<uint8_t> const* deadRankMask)
5984
{
6085
cpuPlacement->expertReplicaCount.resize(metaInfo.expertCount);
61-
int totalSlotCount = metaInfo.epSize * metaInfo.slotCountPerRank;
86+
int totalSlotCount = getActiveSlotCount(metaInfo, deadRankMask);
87+
TLLM_CHECK_WITH_INFO(totalSlotCount >= metaInfo.expertCount,
88+
"Mask-only EPLB reconfigure would leave fewer active slots (%d) than experts (%d)", totalSlotCount,
89+
metaInfo.expertCount);
6290
// --- Edge Case 1: No replication needed ---
6391
if (totalSlotCount == metaInfo.expertCount)
6492
{
@@ -122,13 +150,13 @@ void doReplication(tensorrt_llm::kernels::MoeLoadBalanceMetaInfo metaInfo, float
122150
}
123151

124152
void doPlacement(tensorrt_llm::kernels::MoeLoadBalanceMetaInfo metaInfo, float* const expertLoadFactor,
125-
MoePlacementCpuInfo* cpuPlacement)
153+
MoePlacementCpuInfo* cpuPlacement, std::vector<uint8_t> const* deadRankMask)
126154
{
127155
// This function only update these two vectors
128156
auto& rankExpertIds = cpuPlacement->rankExpertIds;
129157
auto& replicaCount = cpuPlacement->expertReplicaCount;
130158

131-
int totalSlotCount = metaInfo.epSize * metaInfo.slotCountPerRank;
159+
int totalSlotCount = getActiveSlotCount(metaInfo, deadRankMask);
132160
// 1. Create all replica information
133161
std::vector<ReplicaInfo> allReplicas;
134162
allReplicas.reserve(totalSlotCount);
@@ -152,6 +180,10 @@ void doPlacement(tensorrt_llm::kernels::MoeLoadBalanceMetaInfo metaInfo, float*
152180
// 3. Maintain Rank state and initialize Priority Queue
153181
std::vector<double> currentRankLoad(metaInfo.epSize, 0.0);
154182
std::vector<int> currentRankSlots(metaInfo.epSize, 0); // Tracks the count of assigned slots per rank
183+
for (int rank = 0; rank < metaInfo.epSize; ++rank)
184+
{
185+
std::fill_n(rankExpertIds[rank].begin(), metaInfo.slotCountPerRank, -1);
186+
}
155187

156188
// Define a min-priority queue storing pairs of {load, rank_id}
157189
using RankLoadPair = std::pair<double, int>;
@@ -160,8 +192,12 @@ void doPlacement(tensorrt_llm::kernels::MoeLoadBalanceMetaInfo metaInfo, float*
160192
// Initialize the priority queue with all ranks having 0 load
161193
for (int rank = 0; rank < metaInfo.epSize; ++rank)
162194
{
163-
pq.push({0.0, rank});
195+
if (!isRankMasked(deadRankMask, rank))
196+
{
197+
pq.push({0.0, rank});
198+
}
164199
}
200+
TLLM_CHECK_WITH_INFO(!pq.empty(), "Mask-only EPLB reconfigure requires at least one active rank");
165201

166202
// 4. Optimized Greedy assignment using Priority Queue, writing directly to rankExpertIds
167203
for (auto const& replica : allReplicas)
@@ -252,19 +288,36 @@ void prepareGpuPlacementInfo(tensorrt_llm::kernels::MoeLoadBalanceMetaInfo metaI
252288
// globalSlotIds[i][j] is the list of global slot ids for expert i's j-th replica
253289
// different experts have different number of replicas, so globalSlotIds is a vector of vectors
254290
// the sum of sizes of all vectors in globalSlotIds is equal to the total number of slots
291+
int const totalSlotCount = metaInfo.epSize * metaInfo.slotCountPerRank;
292+
int const invalidSlotId = totalSlotCount;
293+
std::fill_n(cpuPlacement->placementInfoForGPU.globalSlotIds, totalSlotCount, invalidSlotId);
294+
255295
std::vector<std::vector<int>> globalSlotIds(metaInfo.expertCount);
296+
int assignedSlotCount = 0;
256297
for (int rank = 0; rank < metaInfo.epSize; ++rank)
257298
{
258299
for (int slotId = 0; slotId < metaInfo.slotCountPerRank; ++slotId)
259300
{
260301
int expertId = cpuPlacement->rankExpertIds[rank][slotId];
302+
if (expertId < 0)
303+
{
304+
continue;
305+
}
306+
TLLM_CHECK_WITH_INFO(expertId < metaInfo.expertCount, "expertId=%d should be in range [0, %d)", expertId,
307+
metaInfo.expertCount);
261308
int replicaId = globalSlotIds[expertId].size();
309+
TLLM_CHECK_WITH_INFO(replicaId < cpuPlacement->placementInfoForGPU.expertReplicaCount[expertId],
310+
"Expert %d has more placed replicas than its replica count (%d)", expertId,
311+
cpuPlacement->placementInfoForGPU.expertReplicaCount[expertId]);
262312
int globalSlotId = rank * metaInfo.slotCountPerRank + slotId;
263313
globalSlotIds[expertId].push_back(globalSlotId);
264314
int offset = cpuPlacement->placementInfoForGPU.expertReplicaStartOffset[expertId] + replicaId;
265315
cpuPlacement->placementInfoForGPU.globalSlotIds[offset] = globalSlotId;
316+
++assignedSlotCount;
266317
}
267318
}
319+
TLLM_CHECK_WITH_INFO(assignedSlotCount == startOffset,
320+
"Placed slot count (%d) must match active replica count (%d)", assignedSlotCount, startOffset);
268321
// printCpuPlacementInfo(metaInfo, cpuPlacement);
269322
}
270323

@@ -515,6 +568,61 @@ void SingleLayerMoeLoadBalancer::waitLastUpdateDone()
515568
}
516569
}
517570

571+
std::vector<int> SingleLayerMoeLoadBalancer::computeMaskOnlyReplicaCounts(
572+
std::vector<uint8_t> const& deadRankMask) const
573+
{
574+
TLLM_CHECK_WITH_INFO(static_cast<int>(deadRankMask.size()) == mMetaInfo.epSize,
575+
"deadRankMask size (%ld) must match epSize (%d)", static_cast<long>(deadRankMask.size()), mMetaInfo.epSize);
576+
577+
std::vector<int> expertReplicaCount(mMetaInfo.expertCount, 0);
578+
for (int rank = 0; rank < mMetaInfo.epSize; ++rank)
579+
{
580+
bool const rankDead = deadRankMask[rank] != 0;
581+
for (int localSlotId = 0; localSlotId < mMetaInfo.slotCountPerRank; ++localSlotId)
582+
{
583+
int expertId = mCpuPlacementInfo.oldRankExpertIds[rank][localSlotId];
584+
TLLM_CHECK_WITH_INFO(expertId == -1 || (expertId >= 0 && expertId < mMetaInfo.expertCount),
585+
"expertId=%d should be -1 or in range [0, %d)", expertId, mMetaInfo.expertCount);
586+
if (!rankDead && expertId >= 0)
587+
{
588+
++expertReplicaCount[expertId];
589+
}
590+
}
591+
}
592+
593+
for (int expertId = 0; expertId < mMetaInfo.expertCount; ++expertId)
594+
{
595+
TLLM_CHECK_WITH_INFO(expertReplicaCount[expertId] > 0,
596+
"Mask-only EPLB reconfigure would leave expert %d with no surviving replica", expertId);
597+
}
598+
return expertReplicaCount;
599+
}
600+
601+
void SingleLayerMoeLoadBalancer::validateMaskOnly(std::vector<uint8_t> const& deadRankMask) const
602+
{
603+
(void) computeMaskOnlyReplicaCounts(deadRankMask);
604+
}
605+
606+
void SingleLayerMoeLoadBalancer::reconfigureMaskOnly(std::vector<uint8_t> const& deadRankMask)
607+
{
608+
mCpuPlacementInfo.expertReplicaCount = computeMaskOnlyReplicaCounts(deadRankMask);
609+
610+
for (int rank = 0; rank < mMetaInfo.epSize; ++rank)
611+
{
612+
bool const rankDead = deadRankMask[rank] != 0;
613+
for (int localSlotId = 0; localSlotId < mMetaInfo.slotCountPerRank; ++localSlotId)
614+
{
615+
int expertId = mCpuPlacementInfo.oldRankExpertIds[rank][localSlotId];
616+
mCpuPlacementInfo.rankExpertIds[rank][localSlotId] = rankDead ? -1 : expertId;
617+
}
618+
}
619+
620+
prepareGpuPlacementInfo(mMetaInfo, &mCpuPlacementInfo);
621+
copyPlacementInfoToGpu();
622+
TLLM_CUDA_CHECK(cudaEventRecord(mUpdateWeightsDoneEvent, mMoeLoadBalancer->mStream));
623+
TLLM_CUDA_CHECK(cudaEventSynchronize(mUpdateWeightsDoneEvent));
624+
}
625+
518626
cudaStream_t SingleLayerMoeLoadBalancer::getStream() const
519627
{
520628
return mMoeLoadBalancer->mStream;
@@ -561,8 +669,9 @@ void SingleLayerMoeLoadBalancer::copyPlacementInfoToGpuByCpu()
561669

562670
void SingleLayerMoeLoadBalancer::updateWeightsRoutine()
563671
{
564-
doReplication(mMetaInfo, mStatisticInfo.expertLoadFactor, &mCpuPlacementInfo);
565-
doPlacement(mMetaInfo, mStatisticInfo.expertLoadFactor, &mCpuPlacementInfo);
672+
auto const deadRankMask = mMoeLoadBalancer->getDeadRankMaskSnapshot();
673+
doReplication(mMetaInfo, mStatisticInfo.expertLoadFactor, &mCpuPlacementInfo, &deadRankMask);
674+
doPlacement(mMetaInfo, mStatisticInfo.expertLoadFactor, &mCpuPlacementInfo, &deadRankMask);
566675
prepareGpuPlacementInfo(mMetaInfo, &mCpuPlacementInfo);
567676
mWeightUpdater->updateWeights(&mCpuPlacementInfo);
568677
copyPlacementInfoToGpu();
@@ -575,8 +684,9 @@ void SingleLayerMoeLoadBalancer::updateWeightsRoutine()
575684

576685
void SingleLayerMoeLoadBalancer::updateWeightsRoutineByCpu()
577686
{
578-
doReplication(mMetaInfo, mStatisticInfo.expertLoadFactor, &mCpuPlacementInfo);
579-
doPlacement(mMetaInfo, mStatisticInfo.expertLoadFactor, &mCpuPlacementInfo);
687+
auto const deadRankMask = mMoeLoadBalancer->getDeadRankMaskSnapshot();
688+
doReplication(mMetaInfo, mStatisticInfo.expertLoadFactor, &mCpuPlacementInfo, &deadRankMask);
689+
doPlacement(mMetaInfo, mStatisticInfo.expertLoadFactor, &mCpuPlacementInfo, &deadRankMask);
580690
prepareGpuPlacementInfo(mMetaInfo, &mCpuPlacementInfo);
581691
mLastUpdateTaskId = mMoeLoadBalancer->addCopyTask(
582692
[this](int rank, int size) { mWeightUpdater->updateWeights(&mCpuPlacementInfo, rank, size); });
@@ -832,6 +942,7 @@ void HostMemoryMoeWeightUpdater::updateWeights(
832942
MoeLoadBalancer::MoeLoadBalancer(int epRank, int epSize, int layerUpdatesPerIter)
833943
: mEpRank{epRank}
834944
, mEpSize{epSize}
945+
, mDeadRankMask(epSize, 0)
835946
, mLayerUpdatesPerIter{layerUpdatesPerIter}
836947
{
837948
TLLM_CUDA_CHECK(cudaGetDevice(&mCudaDeviceId));
@@ -947,9 +1058,11 @@ void MoeLoadBalancer::startIter(int64_t iterId, bool enableStatistic, bool enabl
9471058
{
9481059
std::unique_lock<std::mutex> lock(mWorkerThreadMutex);
9491060
TLLM_CHECK_WITH_INFO(mModelFinalized == true, "Model is not finalized, cannot start iteration.");
1061+
TLLM_CHECK_WITH_INFO(!mIterActive, "Cannot start iteration while another iteration is active.");
9501062
TLLM_CHECK_WITH_INFO(mIterId + 1 == iterId, "Expected iterId=%ld, but got %ld", mIterId + 1, iterId);
9511063

9521064
mIterId = iterId;
1065+
mIterActive = true;
9531066
// disable update for warm up iters.
9541067
bool isWarmUpIter = mIterId <= mWarmUpUntilIter;
9551068
bool fixedUpdateWeightsEnabled = enableUpdateWeights && !isWarmUpIter;
@@ -961,7 +1074,10 @@ void MoeLoadBalancer::startIter(int64_t iterId, bool enableStatistic, bool enabl
9611074

9621075
void MoeLoadBalancer::endIter(int64_t iterId)
9631076
{
1077+
std::unique_lock<std::mutex> lock(mWorkerThreadMutex);
1078+
TLLM_CHECK_WITH_INFO(mIterActive, "No active iteration to end.");
9641079
TLLM_CHECK_WITH_INFO(mIterId == iterId, "endIter expected iterId=%ld, but got %ld", mIterId, iterId);
1080+
mIterActive = false;
9651081
}
9661082

9671083
void MoeLoadBalancer::shutdown()
@@ -979,6 +1095,45 @@ void MoeLoadBalancer::shutdown()
9791095
}
9801096
}
9811097

1098+
void MoeLoadBalancer::reconfigureMaskOnly(std::vector<int> const& deadRanks)
1099+
{
1100+
std::unique_lock<std::mutex> workerLock(mWorkerThreadMutex);
1101+
TLLM_CHECK_WITH_INFO(mModelFinalized == true, "Model is not finalized, cannot reconfigure mask-only placement.");
1102+
TLLM_CHECK_WITH_INFO(!mIterActive, "Cannot reconfigure mask-only placement while an iteration is active.");
1103+
TLLM_CHECK_WITH_INFO(mIterInfoQueue.empty(), "Cannot reconfigure mask-only placement while iterations are queued.");
1104+
1105+
std::vector<uint8_t> candidateDeadRankMask;
1106+
{
1107+
std::lock_guard<std::mutex> maskLock(mDeadRankMaskMutex);
1108+
candidateDeadRankMask = mDeadRankMask;
1109+
for (int deadRank : deadRanks)
1110+
{
1111+
TLLM_CHECK_WITH_INFO(
1112+
deadRank >= 0 && deadRank < mEpSize, "deadRank=%d should be in range [0, %d)", deadRank, mEpSize);
1113+
candidateDeadRankMask[deadRank] = 1;
1114+
}
1115+
TLLM_CHECK_WITH_INFO(
1116+
candidateDeadRankMask[mEpRank] == 0, "Local epRank (%d) cannot be masked by a survivor", mEpRank);
1117+
}
1118+
1119+
for (auto& layer : mLayers)
1120+
{
1121+
layer->waitLastUpdateDone();
1122+
}
1123+
for (auto& layer : mLayers)
1124+
{
1125+
layer->validateMaskOnly(candidateDeadRankMask);
1126+
}
1127+
{
1128+
std::lock_guard<std::mutex> maskLock(mDeadRankMaskMutex);
1129+
mDeadRankMask = candidateDeadRankMask;
1130+
}
1131+
for (auto& layer : mLayers)
1132+
{
1133+
layer->reconfigureMaskOnly(candidateDeadRankMask);
1134+
}
1135+
}
1136+
9821137
void MoeLoadBalancer::workerThread()
9831138
{
9841139
TLLM_CUDA_CHECK(cudaSetDevice(mCudaDeviceId));
@@ -1064,6 +1219,12 @@ void MoeLoadBalancer::waitCopyTaskDone(int64_t taskId)
10641219
}
10651220
}
10661221

1222+
std::vector<uint8_t> MoeLoadBalancer::getDeadRankMaskSnapshot()
1223+
{
1224+
std::lock_guard<std::mutex> lock(mDeadRankMaskMutex);
1225+
return mDeadRankMask;
1226+
}
1227+
10671228
MultiThreadWorker::MultiThreadWorker(int numThreads, int cudaDeviceId)
10681229
: mNumThreads(numThreads)
10691230
, mCudaDeviceId(cudaDeviceId)

cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.h

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022-2024, NVIDIA CORPORATION. All rights reserved.
2+
* Copyright (c) 2022-2026, NVIDIA CORPORATION. All rights reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -191,6 +191,9 @@ class SingleLayerMoeLoadBalancer
191191
void waitCpuStage();
192192
void maybeStartUpdateWeights();
193193
void waitLastUpdateDone();
194+
void validateMaskOnly(std::vector<uint8_t> const& deadRankMask) const;
195+
void reconfigureMaskOnly(std::vector<uint8_t> const& deadRankMask);
196+
std::vector<int> computeMaskOnlyReplicaCounts(std::vector<uint8_t> const& deadRankMask) const;
194197

195198
MoeLoadBalancer* mMoeLoadBalancer = nullptr;
196199

@@ -283,6 +286,11 @@ class MoeLoadBalancer
283286
// should bind to python
284287
void shutdown();
285288

289+
// should bind to python
290+
// This API only validates placement safety, i.e. every expert keeps at least one surviving replica.
291+
// The caller must separately gate degraded-mode capacity/HBM headroom before invoking it.
292+
void reconfigureMaskOnly(std::vector<int> const& deadRanks);
293+
286294
// Test interface to use GPU to do memcpy test functionality
287295
void setUseGpuMemcpy(bool useGpuMemcpy = false)
288296
{
@@ -312,6 +320,7 @@ class MoeLoadBalancer
312320
void addUpdateTask(std::function<void()> task);
313321
int64_t addCopyTask(std::function<void(int, int)> task);
314322
void waitCopyTaskDone(int64_t taskId);
323+
std::vector<uint8_t> getDeadRankMaskSnapshot();
315324

316325
std::vector<std::shared_ptr<SingleLayerMoeLoadBalancer>> mLayers;
317326

@@ -327,6 +336,7 @@ class MoeLoadBalancer
327336
std::queue<IterInfo> mIterInfoQueue;
328337

329338
bool mModelFinalized = false;
339+
bool mIterActive = false;
330340

331341
int mEpRank = 0;
332342
int mEpSize = 1;
@@ -339,6 +349,9 @@ class MoeLoadBalancer
339349

340350
std::unique_ptr<MultiThreadWorker> mMultiThreadWorker;
341351

352+
std::mutex mDeadRankMaskMutex;
353+
std::vector<uint8_t> mDeadRankMask;
354+
342355
// update plan member and function
343356
int mLayerUpdatesPerIter = 1;
344357
std::deque<std::set<int>> mUpdateLayerQueue;
@@ -349,9 +362,9 @@ class MoeLoadBalancer
349362

350363
// functions exposed for testing
351364
void doReplication(tensorrt_llm::kernels::MoeLoadBalanceMetaInfo metaInfo, float* const expertLoadFactor,
352-
MoePlacementCpuInfo* cpuPlacement);
365+
MoePlacementCpuInfo* cpuPlacement, std::vector<uint8_t> const* deadRankMask = nullptr);
353366

354367
void doPlacement(tensorrt_llm::kernels::MoeLoadBalanceMetaInfo metaInfo, float* const expertLoadFactor,
355-
MoePlacementCpuInfo* cpuPlacement);
368+
MoePlacementCpuInfo* cpuPlacement, std::vector<uint8_t> const* deadRankMask = nullptr);
356369

357370
} // namespace tensorrt_llm::runtime

0 commit comments

Comments
 (0)