From a3ac34c82dd81c1a3fd4c0dfe2dcb67b0a2cf320 Mon Sep 17 00:00:00 2001 From: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com> Date: Mon, 22 Jun 2026 12:43:17 -0700 Subject: [PATCH 1/2] [None][feat] WideEP FT: add EPLB mask-only reconfigure Signed-off-by: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com> --- .../nanobind/runtime/moeBindings.cpp | 5 +- .../moeLoadBalancer/moeLoadBalancer.cpp | 181 +++++++++++++++++- .../runtime/moeLoadBalancer/moeLoadBalancer.h | 19 +- .../runtime/moeLoadBalancerTest.cpp | 108 ++++++++++- .../modules/fused_moe/moe_load_balancer.py | 13 ++ .../_torch/modules/test_moe_load_balancer.py | 21 ++ 6 files changed, 330 insertions(+), 17 deletions(-) diff --git a/cpp/tensorrt_llm/nanobind/runtime/moeBindings.cpp b/cpp/tensorrt_llm/nanobind/runtime/moeBindings.cpp index c7c9dc8a6937..5f0982eeeb60 100644 --- a/cpp/tensorrt_llm/nanobind/runtime/moeBindings.cpp +++ b/cpp/tensorrt_llm/nanobind/runtime/moeBindings.cpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2022-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-FileCopyrightText: Copyright (c) 2022-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -117,6 +117,9 @@ void initMoeBindings(nb::module_& m) nb::call_guard()) .def("end_iter", &tr::MoeLoadBalancer::endIter, nb::arg("iter_id"), "End the iteration with the given ID", nb::call_guard()) + .def("reconfigure_mask_only", &tr::MoeLoadBalancer::reconfigureMaskOnly, nb::arg("dead_ranks"), + "Reconfigure EPLB routing metadata so slots on dead EP ranks are unreachable", + nb::call_guard()) .def("shutdown", &tr::MoeLoadBalancer::shutdown, "Shutdown the load balancer and clean up resources", nb::call_guard()); diff --git a/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.cpp b/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.cpp index 10c879c69d9c..133d007d29f0 100644 --- a/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.cpp +++ b/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2022-2026, NVIDIA CORPORATION. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -54,11 +54,39 @@ struct ReplicaInfo } }; +namespace +{ + +bool isRankMasked(std::vector const* deadRankMask, int rank) +{ + return deadRankMask != nullptr && rank >= 0 && rank < static_cast(deadRankMask->size()) + && ((*deadRankMask)[rank] != 0); +} + +int getActiveSlotCount( + tensorrt_llm::kernels::MoeLoadBalanceMetaInfo const& metaInfo, std::vector const* deadRankMask) +{ + int activeRankCount = 0; + for (int rank = 0; rank < metaInfo.epSize; ++rank) + { + if (!isRankMasked(deadRankMask, rank)) + { + ++activeRankCount; + } + } + return activeRankCount * metaInfo.slotCountPerRank; +} + +} // namespace + void doReplication(tensorrt_llm::kernels::MoeLoadBalanceMetaInfo metaInfo, float* const expertLoadFactor, - MoePlacementCpuInfo* cpuPlacement) + MoePlacementCpuInfo* cpuPlacement, std::vector const* deadRankMask) { cpuPlacement->expertReplicaCount.resize(metaInfo.expertCount); - int totalSlotCount = metaInfo.epSize * metaInfo.slotCountPerRank; + int totalSlotCount = getActiveSlotCount(metaInfo, deadRankMask); + TLLM_CHECK_WITH_INFO(totalSlotCount >= metaInfo.expertCount, + "Mask-only EPLB reconfigure would leave fewer active slots (%d) than experts (%d)", totalSlotCount, + metaInfo.expertCount); // --- Edge Case 1: No replication needed --- if (totalSlotCount == metaInfo.expertCount) { @@ -122,13 +150,13 @@ void doReplication(tensorrt_llm::kernels::MoeLoadBalanceMetaInfo metaInfo, float } void doPlacement(tensorrt_llm::kernels::MoeLoadBalanceMetaInfo metaInfo, float* const expertLoadFactor, - MoePlacementCpuInfo* cpuPlacement) + MoePlacementCpuInfo* cpuPlacement, std::vector const* deadRankMask) { // This function only update these two vectors auto& rankExpertIds = cpuPlacement->rankExpertIds; auto& replicaCount = cpuPlacement->expertReplicaCount; - int totalSlotCount = metaInfo.epSize * metaInfo.slotCountPerRank; + int totalSlotCount = getActiveSlotCount(metaInfo, deadRankMask); // 1. Create all replica information std::vector allReplicas; allReplicas.reserve(totalSlotCount); @@ -152,6 +180,10 @@ void doPlacement(tensorrt_llm::kernels::MoeLoadBalanceMetaInfo metaInfo, float* // 3. Maintain Rank state and initialize Priority Queue std::vector currentRankLoad(metaInfo.epSize, 0.0); std::vector currentRankSlots(metaInfo.epSize, 0); // Tracks the count of assigned slots per rank + for (int rank = 0; rank < metaInfo.epSize; ++rank) + { + std::fill_n(rankExpertIds[rank].begin(), metaInfo.slotCountPerRank, -1); + } // Define a min-priority queue storing pairs of {load, rank_id} using RankLoadPair = std::pair; @@ -160,8 +192,12 @@ void doPlacement(tensorrt_llm::kernels::MoeLoadBalanceMetaInfo metaInfo, float* // Initialize the priority queue with all ranks having 0 load for (int rank = 0; rank < metaInfo.epSize; ++rank) { - pq.push({0.0, rank}); + if (!isRankMasked(deadRankMask, rank)) + { + pq.push({0.0, rank}); + } } + TLLM_CHECK_WITH_INFO(!pq.empty(), "Mask-only EPLB reconfigure requires at least one active rank"); // 4. Optimized Greedy assignment using Priority Queue, writing directly to rankExpertIds for (auto const& replica : allReplicas) @@ -252,19 +288,36 @@ void prepareGpuPlacementInfo(tensorrt_llm::kernels::MoeLoadBalanceMetaInfo metaI // globalSlotIds[i][j] is the list of global slot ids for expert i's j-th replica // different experts have different number of replicas, so globalSlotIds is a vector of vectors // the sum of sizes of all vectors in globalSlotIds is equal to the total number of slots + int const totalSlotCount = metaInfo.epSize * metaInfo.slotCountPerRank; + int const invalidSlotId = totalSlotCount; + std::fill_n(cpuPlacement->placementInfoForGPU.globalSlotIds, totalSlotCount, invalidSlotId); + std::vector> globalSlotIds(metaInfo.expertCount); + int assignedSlotCount = 0; for (int rank = 0; rank < metaInfo.epSize; ++rank) { for (int slotId = 0; slotId < metaInfo.slotCountPerRank; ++slotId) { int expertId = cpuPlacement->rankExpertIds[rank][slotId]; + if (expertId < 0) + { + continue; + } + TLLM_CHECK_WITH_INFO(expertId < metaInfo.expertCount, "expertId=%d should be in range [0, %d)", expertId, + metaInfo.expertCount); int replicaId = globalSlotIds[expertId].size(); + TLLM_CHECK_WITH_INFO(replicaId < cpuPlacement->placementInfoForGPU.expertReplicaCount[expertId], + "Expert %d has more placed replicas than its replica count (%d)", expertId, + cpuPlacement->placementInfoForGPU.expertReplicaCount[expertId]); int globalSlotId = rank * metaInfo.slotCountPerRank + slotId; globalSlotIds[expertId].push_back(globalSlotId); int offset = cpuPlacement->placementInfoForGPU.expertReplicaStartOffset[expertId] + replicaId; cpuPlacement->placementInfoForGPU.globalSlotIds[offset] = globalSlotId; + ++assignedSlotCount; } } + TLLM_CHECK_WITH_INFO(assignedSlotCount == startOffset, + "Placed slot count (%d) must match active replica count (%d)", assignedSlotCount, startOffset); // printCpuPlacementInfo(metaInfo, cpuPlacement); } @@ -515,6 +568,61 @@ void SingleLayerMoeLoadBalancer::waitLastUpdateDone() } } +std::vector SingleLayerMoeLoadBalancer::computeMaskOnlyReplicaCounts( + std::vector const& deadRankMask) const +{ + TLLM_CHECK_WITH_INFO(static_cast(deadRankMask.size()) == mMetaInfo.epSize, + "deadRankMask size (%ld) must match epSize (%d)", static_cast(deadRankMask.size()), mMetaInfo.epSize); + + std::vector expertReplicaCount(mMetaInfo.expertCount, 0); + for (int rank = 0; rank < mMetaInfo.epSize; ++rank) + { + bool const rankDead = deadRankMask[rank] != 0; + for (int localSlotId = 0; localSlotId < mMetaInfo.slotCountPerRank; ++localSlotId) + { + int expertId = mCpuPlacementInfo.oldRankExpertIds[rank][localSlotId]; + TLLM_CHECK_WITH_INFO(expertId == -1 || (expertId >= 0 && expertId < mMetaInfo.expertCount), + "expertId=%d should be -1 or in range [0, %d)", expertId, mMetaInfo.expertCount); + if (!rankDead && expertId >= 0) + { + ++expertReplicaCount[expertId]; + } + } + } + + for (int expertId = 0; expertId < mMetaInfo.expertCount; ++expertId) + { + TLLM_CHECK_WITH_INFO(expertReplicaCount[expertId] > 0, + "Mask-only EPLB reconfigure would leave expert %d with no surviving replica", expertId); + } + return expertReplicaCount; +} + +void SingleLayerMoeLoadBalancer::validateMaskOnly(std::vector const& deadRankMask) const +{ + (void) computeMaskOnlyReplicaCounts(deadRankMask); +} + +void SingleLayerMoeLoadBalancer::reconfigureMaskOnly(std::vector const& deadRankMask) +{ + mCpuPlacementInfo.expertReplicaCount = computeMaskOnlyReplicaCounts(deadRankMask); + + for (int rank = 0; rank < mMetaInfo.epSize; ++rank) + { + bool const rankDead = deadRankMask[rank] != 0; + for (int localSlotId = 0; localSlotId < mMetaInfo.slotCountPerRank; ++localSlotId) + { + int expertId = mCpuPlacementInfo.oldRankExpertIds[rank][localSlotId]; + mCpuPlacementInfo.rankExpertIds[rank][localSlotId] = rankDead ? -1 : expertId; + } + } + + prepareGpuPlacementInfo(mMetaInfo, &mCpuPlacementInfo); + copyPlacementInfoToGpu(); + TLLM_CUDA_CHECK(cudaEventRecord(mUpdateWeightsDoneEvent, mMoeLoadBalancer->mStream)); + TLLM_CUDA_CHECK(cudaEventSynchronize(mUpdateWeightsDoneEvent)); +} + cudaStream_t SingleLayerMoeLoadBalancer::getStream() const { return mMoeLoadBalancer->mStream; @@ -561,8 +669,9 @@ void SingleLayerMoeLoadBalancer::copyPlacementInfoToGpuByCpu() void SingleLayerMoeLoadBalancer::updateWeightsRoutine() { - doReplication(mMetaInfo, mStatisticInfo.expertLoadFactor, &mCpuPlacementInfo); - doPlacement(mMetaInfo, mStatisticInfo.expertLoadFactor, &mCpuPlacementInfo); + auto const deadRankMask = mMoeLoadBalancer->getDeadRankMaskSnapshot(); + doReplication(mMetaInfo, mStatisticInfo.expertLoadFactor, &mCpuPlacementInfo, &deadRankMask); + doPlacement(mMetaInfo, mStatisticInfo.expertLoadFactor, &mCpuPlacementInfo, &deadRankMask); prepareGpuPlacementInfo(mMetaInfo, &mCpuPlacementInfo); mWeightUpdater->updateWeights(&mCpuPlacementInfo); copyPlacementInfoToGpu(); @@ -575,8 +684,9 @@ void SingleLayerMoeLoadBalancer::updateWeightsRoutine() void SingleLayerMoeLoadBalancer::updateWeightsRoutineByCpu() { - doReplication(mMetaInfo, mStatisticInfo.expertLoadFactor, &mCpuPlacementInfo); - doPlacement(mMetaInfo, mStatisticInfo.expertLoadFactor, &mCpuPlacementInfo); + auto const deadRankMask = mMoeLoadBalancer->getDeadRankMaskSnapshot(); + doReplication(mMetaInfo, mStatisticInfo.expertLoadFactor, &mCpuPlacementInfo, &deadRankMask); + doPlacement(mMetaInfo, mStatisticInfo.expertLoadFactor, &mCpuPlacementInfo, &deadRankMask); prepareGpuPlacementInfo(mMetaInfo, &mCpuPlacementInfo); mLastUpdateTaskId = mMoeLoadBalancer->addCopyTask( [this](int rank, int size) { mWeightUpdater->updateWeights(&mCpuPlacementInfo, rank, size); }); @@ -832,6 +942,7 @@ void HostMemoryMoeWeightUpdater::updateWeights( MoeLoadBalancer::MoeLoadBalancer(int epRank, int epSize, int layerUpdatesPerIter) : mEpRank{epRank} , mEpSize{epSize} + , mDeadRankMask(epSize, 0) , mLayerUpdatesPerIter{layerUpdatesPerIter} { TLLM_CUDA_CHECK(cudaGetDevice(&mCudaDeviceId)); @@ -947,9 +1058,11 @@ void MoeLoadBalancer::startIter(int64_t iterId, bool enableStatistic, bool enabl { std::unique_lock lock(mWorkerThreadMutex); TLLM_CHECK_WITH_INFO(mModelFinalized == true, "Model is not finalized, cannot start iteration."); + TLLM_CHECK_WITH_INFO(!mIterActive, "Cannot start iteration while another iteration is active."); TLLM_CHECK_WITH_INFO(mIterId + 1 == iterId, "Expected iterId=%ld, but got %ld", mIterId + 1, iterId); mIterId = iterId; + mIterActive = true; // disable update for warm up iters. bool isWarmUpIter = mIterId <= mWarmUpUntilIter; bool fixedUpdateWeightsEnabled = enableUpdateWeights && !isWarmUpIter; @@ -961,7 +1074,10 @@ void MoeLoadBalancer::startIter(int64_t iterId, bool enableStatistic, bool enabl void MoeLoadBalancer::endIter(int64_t iterId) { + std::unique_lock lock(mWorkerThreadMutex); + TLLM_CHECK_WITH_INFO(mIterActive, "No active iteration to end."); TLLM_CHECK_WITH_INFO(mIterId == iterId, "endIter expected iterId=%ld, but got %ld", mIterId, iterId); + mIterActive = false; } void MoeLoadBalancer::shutdown() @@ -979,6 +1095,45 @@ void MoeLoadBalancer::shutdown() } } +void MoeLoadBalancer::reconfigureMaskOnly(std::vector const& deadRanks) +{ + std::unique_lock workerLock(mWorkerThreadMutex); + TLLM_CHECK_WITH_INFO(mModelFinalized == true, "Model is not finalized, cannot reconfigure mask-only placement."); + TLLM_CHECK_WITH_INFO(!mIterActive, "Cannot reconfigure mask-only placement while an iteration is active."); + TLLM_CHECK_WITH_INFO(mIterInfoQueue.empty(), "Cannot reconfigure mask-only placement while iterations are queued."); + + std::vector candidateDeadRankMask; + { + std::lock_guard maskLock(mDeadRankMaskMutex); + candidateDeadRankMask = mDeadRankMask; + for (int deadRank : deadRanks) + { + TLLM_CHECK_WITH_INFO( + deadRank >= 0 && deadRank < mEpSize, "deadRank=%d should be in range [0, %d)", deadRank, mEpSize); + candidateDeadRankMask[deadRank] = 1; + } + TLLM_CHECK_WITH_INFO( + candidateDeadRankMask[mEpRank] == 0, "Local epRank (%d) cannot be masked by a survivor", mEpRank); + } + + for (auto& layer : mLayers) + { + layer->waitLastUpdateDone(); + } + for (auto& layer : mLayers) + { + layer->validateMaskOnly(candidateDeadRankMask); + } + { + std::lock_guard maskLock(mDeadRankMaskMutex); + mDeadRankMask = candidateDeadRankMask; + } + for (auto& layer : mLayers) + { + layer->reconfigureMaskOnly(candidateDeadRankMask); + } +} + void MoeLoadBalancer::workerThread() { TLLM_CUDA_CHECK(cudaSetDevice(mCudaDeviceId)); @@ -1064,6 +1219,12 @@ void MoeLoadBalancer::waitCopyTaskDone(int64_t taskId) } } +std::vector MoeLoadBalancer::getDeadRankMaskSnapshot() +{ + std::lock_guard lock(mDeadRankMaskMutex); + return mDeadRankMask; +} + MultiThreadWorker::MultiThreadWorker(int numThreads, int cudaDeviceId) : mNumThreads(numThreads) , mCudaDeviceId(cudaDeviceId) diff --git a/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.h b/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.h index a71e32bbe364..36908cea74bd 100644 --- a/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.h +++ b/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2022-2026, NVIDIA CORPORATION. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -191,6 +191,9 @@ class SingleLayerMoeLoadBalancer void waitCpuStage(); void maybeStartUpdateWeights(); void waitLastUpdateDone(); + void validateMaskOnly(std::vector const& deadRankMask) const; + void reconfigureMaskOnly(std::vector const& deadRankMask); + std::vector computeMaskOnlyReplicaCounts(std::vector const& deadRankMask) const; MoeLoadBalancer* mMoeLoadBalancer = nullptr; @@ -283,6 +286,11 @@ class MoeLoadBalancer // should bind to python void shutdown(); + // should bind to python + // This API only validates placement safety, i.e. every expert keeps at least one surviving replica. + // The caller must separately gate degraded-mode capacity/HBM headroom before invoking it. + void reconfigureMaskOnly(std::vector const& deadRanks); + // Test interface to use GPU to do memcpy test functionality void setUseGpuMemcpy(bool useGpuMemcpy = false) { @@ -312,6 +320,7 @@ class MoeLoadBalancer void addUpdateTask(std::function task); int64_t addCopyTask(std::function task); void waitCopyTaskDone(int64_t taskId); + std::vector getDeadRankMaskSnapshot(); std::vector> mLayers; @@ -327,6 +336,7 @@ class MoeLoadBalancer std::queue mIterInfoQueue; bool mModelFinalized = false; + bool mIterActive = false; int mEpRank = 0; int mEpSize = 1; @@ -339,6 +349,9 @@ class MoeLoadBalancer std::unique_ptr mMultiThreadWorker; + std::mutex mDeadRankMaskMutex; + std::vector mDeadRankMask; + // update plan member and function int mLayerUpdatesPerIter = 1; std::deque> mUpdateLayerQueue; @@ -349,9 +362,9 @@ class MoeLoadBalancer // functions exposed for testing void doReplication(tensorrt_llm::kernels::MoeLoadBalanceMetaInfo metaInfo, float* const expertLoadFactor, - MoePlacementCpuInfo* cpuPlacement); + MoePlacementCpuInfo* cpuPlacement, std::vector const* deadRankMask = nullptr); void doPlacement(tensorrt_llm::kernels::MoeLoadBalanceMetaInfo metaInfo, float* const expertLoadFactor, - MoePlacementCpuInfo* cpuPlacement); + MoePlacementCpuInfo* cpuPlacement, std::vector const* deadRankMask = nullptr); } // namespace tensorrt_llm::runtime diff --git a/cpp/tests/unit_tests/runtime/moeLoadBalancerTest.cpp b/cpp/tests/unit_tests/runtime/moeLoadBalancerTest.cpp index a7d4058a82f4..5e829ee7259c 100644 --- a/cpp/tests/unit_tests/runtime/moeLoadBalancerTest.cpp +++ b/cpp/tests/unit_tests/runtime/moeLoadBalancerTest.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2025, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2022-2026, NVIDIA CORPORATION. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,11 +16,10 @@ #include -#include - #include "tensorrt_llm/common/cudaUtils.h" #include "tensorrt_llm/kernels/moeLoadBalance/moeLoadBalanceKernels.h" #include "tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.h" +#include using namespace tensorrt_llm::runtime; @@ -291,6 +290,109 @@ INSTANTIATE_TEST_SUITE_P(PlacementTests, MoePlacementTest, return name; }); +TEST(MoeLoadBalancerMaskOnlyTest, ReconfigureMaskOnlyRemovesDeadRankSlots) +{ + setenv("TLLM_HOST_ACCESSIBLE_ALLOW_MANAGED_FALLBACK", "1", 1); + TLLM_CUDA_CHECK(cudaSetDevice(0)); + + constexpr int kExpertCount = 4; + constexpr int kTopK = 2; + constexpr int kEpRank = 0; + constexpr int kEpSize = 4; + constexpr int kSlotCountPerRank = 2; + constexpr int kDeadRank = 2; + constexpr int kInvalidSlotId = kEpSize * kSlotCountPerRank; + + MoeLoadBalancer loadBalancer(kEpRank, kEpSize, /*layerUpdatesPerIter=*/0); + auto layer = loadBalancer.AddLayer(kExpertCount, kTopK, kSlotCountPerRank); + + // Each expert has at least one surviving replica after rank 2 is masked. + std::vector initialAssignments{ + 0, 1, // rank 0 + 0, 2, // rank 1 + 1, 3, // rank 2 (dead) + 2, 3 // rank 3 + }; + layer->setInitialWeightAssignments(initialAssignments); + loadBalancer.finalizeModel(); + + loadBalancer.reconfigureMaskOnly({kDeadRank}); + + auto* placementCpuInfo = layer->getPlacementCpuInfo(); + for (int localSlotId = 0; localSlotId < kSlotCountPerRank; ++localSlotId) + { + EXPECT_EQ(placementCpuInfo->oldRankExpertIds[kDeadRank][localSlotId], -1); + } + + std::vector expectedReplicaCounts{2, 1, 2, 1}; + int totalReplicaCount = 0; + for (int expertId = 0; expertId < kExpertCount; ++expertId) + { + EXPECT_EQ(placementCpuInfo->placementInfoForGPU.expertReplicaCount[expertId], expectedReplicaCounts[expertId]); + totalReplicaCount += expectedReplicaCounts[expertId]; + } + + for (int replicaOffset = 0; replicaOffset < totalReplicaCount; ++replicaOffset) + { + int const globalSlotId = placementCpuInfo->placementInfoForGPU.globalSlotIds[replicaOffset]; + EXPECT_GE(globalSlotId, 0); + EXPECT_LT(globalSlotId, kInvalidSlotId); + EXPECT_NE(globalSlotId / kSlotCountPerRank, kDeadRank); + } + for (int replicaOffset = totalReplicaCount; replicaOffset < kInvalidSlotId; ++replicaOffset) + { + EXPECT_EQ(placementCpuInfo->placementInfoForGPU.globalSlotIds[replicaOffset], kInvalidSlotId); + } + + loadBalancer.shutdown(); +} + +TEST(MoeLoadBalancerMaskOnlyTest, ReconfigureMaskOnlyRejectsActiveIteration) +{ + setenv("TLLM_HOST_ACCESSIBLE_ALLOW_MANAGED_FALLBACK", "1", 1); + TLLM_CUDA_CHECK(cudaSetDevice(0)); + + MoeLoadBalancer loadBalancer(/*epRank=*/0, /*epSize=*/4, /*layerUpdatesPerIter=*/0); + loadBalancer.finalizeModel(); + loadBalancer.startIter(/*iterId=*/0, /*enableStatistic=*/true, /*enableUpdateWeights=*/false); + + EXPECT_THROW(loadBalancer.reconfigureMaskOnly({2}), tensorrt_llm::common::TllmException); + loadBalancer.endIter(/*iterId=*/0); + + loadBalancer.shutdown(); +} + +TEST(MoeLoadBalancerMaskOnlyTest, ReconfigureMaskOnlyFailsClosedForLastReplica) +{ + setenv("TLLM_HOST_ACCESSIBLE_ALLOW_MANAGED_FALLBACK", "1", 1); + TLLM_CUDA_CHECK(cudaSetDevice(0)); + + constexpr int kExpertCount = 4; + constexpr int kTopK = 2; + constexpr int kEpRank = 0; + constexpr int kEpSize = 4; + constexpr int kSlotCountPerRank = 1; + constexpr int kDeadRank = 2; + + MoeLoadBalancer loadBalancer(kEpRank, kEpSize, /*layerUpdatesPerIter=*/0); + auto layer = loadBalancer.AddLayer(kExpertCount, kTopK, kSlotCountPerRank); + + // Expert 2 only exists on rank 2, so mask-only recovery must fail closed. + std::vector initialAssignments{ + 0, // rank 0 + 1, // rank 1 + 2, // rank 2 (dead) + 3 // rank 3 + }; + layer->setInitialWeightAssignments(initialAssignments); + loadBalancer.finalizeModel(); + + EXPECT_THROW(loadBalancer.reconfigureMaskOnly({kDeadRank}), tensorrt_llm::common::TllmException); + EXPECT_EQ(layer->getPlacementCpuInfo()->oldRankExpertIds[kDeadRank][0], 2); + + loadBalancer.shutdown(); +} + // Iteration control parameter structure struct IterConfig { diff --git a/tensorrt_llm/_torch/modules/fused_moe/moe_load_balancer.py b/tensorrt_llm/_torch/modules/fused_moe/moe_load_balancer.py index eba30016e2fb..19bcec6530f1 100644 --- a/tensorrt_llm/_torch/modules/fused_moe/moe_load_balancer.py +++ b/tensorrt_llm/_torch/modules/fused_moe/moe_load_balancer.py @@ -989,6 +989,19 @@ def set_iter_info(self, enable_statistic: Optional[bool], if enable_update_weights is not None: self.enable_update_weights = enable_update_weights + def reconfigure_mask_only(self, dead_ranks: List[int]): + """ + Reconfigure EPLB routing so slots on dead EP ranks are unreachable. + + This validates that every expert still has at least one surviving + replica. The caller must separately gate degraded-mode capacity/HBM + headroom before invoking this method. + """ + if self.in_iter: + raise RuntimeError( + "Cannot reconfigure EPLB mask while an iteration is active") + self.load_balancer_impl.reconfigure_mask_only(list(dead_ranks)) + def start_iter(self): """ Start a new iteration. diff --git a/tests/unittest/_torch/modules/test_moe_load_balancer.py b/tests/unittest/_torch/modules/test_moe_load_balancer.py index a30c9f2fec28..816c26a0d88d 100644 --- a/tests/unittest/_torch/modules/test_moe_load_balancer.py +++ b/tests/unittest/_torch/modules/test_moe_load_balancer.py @@ -294,6 +294,11 @@ def test_moe_load_balancer_lifecycle_methods(self, mock_load_balancer_impl): mock_load_balancer_impl.return_value.set_warm_up_iter_count.assert_called_once_with( 10) + # reconfigure_mask_only + balancer.reconfigure_mask_only([2]) + mock_load_balancer_impl.return_value.reconfigure_mask_only.assert_called_once_with( + [2]) + balancer.set_iter_info(True, True) with MoeLoadBalancerIterContext(balancer): @@ -306,6 +311,22 @@ def test_moe_load_balancer_lifecycle_methods(self, mock_load_balancer_impl): balancer.shutdown() mock_load_balancer_impl.return_value.shutdown.assert_called_once() + @patch('tensorrt_llm.bindings.internal.runtime.MoeLoadBalancer') + def test_reconfigure_mask_only_rejects_active_iteration( + self, mock_load_balancer_impl): + """Test mask-only reconfigure is rejected during an active iteration.""" + + torch.cuda.set_device(0) + + balancer = MoeLoadBalancer(0, 4, 2) + balancer.in_iter = True + + with self.assertRaisesRegex(RuntimeError, "iteration is active"): + balancer.reconfigure_mask_only([2]) + + mock_load_balancer_impl.return_value.reconfigure_mask_only.assert_not_called( + ) + def test_real_statistic_kernel(self): """Test the real statistic kernel functionality.""" From 5f391a6b3d6a4735f54f4e5f2ca1cc4f300f6cf2 Mon Sep 17 00:00:00 2001 From: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com> Date: Tue, 23 Jun 2026 10:03:18 -0700 Subject: [PATCH 2/2] fix: address EPLB mask review feedback Signed-off-by: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com> --- .../moeLoadBalancer/moeLoadBalancer.cpp | 22 ++-- .../runtime/moeLoadBalancerTest.cpp | 103 ++++++++++++++++++ .../modules/fused_moe/moe_load_balancer.py | 2 +- 3 files changed, 118 insertions(+), 9 deletions(-) diff --git a/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.cpp b/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.cpp index 133d007d29f0..202f1d945f9e 100644 --- a/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.cpp +++ b/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.cpp @@ -59,13 +59,16 @@ namespace bool isRankMasked(std::vector const* deadRankMask, int rank) { - return deadRankMask != nullptr && rank >= 0 && rank < static_cast(deadRankMask->size()) - && ((*deadRankMask)[rank] != 0); + return deadRankMask != nullptr && ((*deadRankMask)[rank] != 0); } int getActiveSlotCount( tensorrt_llm::kernels::MoeLoadBalanceMetaInfo const& metaInfo, std::vector const* deadRankMask) { + TLLM_CHECK_WITH_INFO(deadRankMask == nullptr || static_cast(deadRankMask->size()) == metaInfo.epSize, + "deadRankMask size (%ld) must match epSize (%d)", + deadRankMask == nullptr ? 0L : static_cast(deadRankMask->size()), metaInfo.epSize); + int activeRankCount = 0; for (int rank = 0; rank < metaInfo.epSize; ++rank) { @@ -163,7 +166,8 @@ void doPlacement(tensorrt_llm::kernels::MoeLoadBalanceMetaInfo metaInfo, float* for (int expertId = 0; expertId < metaInfo.expertCount; ++expertId) { - assert(replicaCount[expertId] > 0); // Ensure replica count is positive + TLLM_CHECK_WITH_INFO(replicaCount[expertId] > 0, "Replica count (%d) for expert %d must be positive", + replicaCount[expertId], expertId); double slotSize = expertLoadFactor[expertId] / static_cast(replicaCount[expertId]); for (int replicaId = 0; replicaId < replicaCount[expertId]; ++replicaId) { @@ -172,7 +176,9 @@ void doPlacement(tensorrt_llm::kernels::MoeLoadBalanceMetaInfo metaInfo, float* } } - assert(static_cast(allReplicas.size()) == totalSlotCount); + TLLM_CHECK_WITH_INFO(static_cast(allReplicas.size()) == totalSlotCount, + "Replica count sum (%ld) must match active slot count (%d)", static_cast(allReplicas.size()), + totalSlotCount); // 2. Sort replicas by slotSize descending std::sort(allReplicas.begin(), allReplicas.end()); @@ -1124,14 +1130,14 @@ void MoeLoadBalancer::reconfigureMaskOnly(std::vector const& deadRanks) { layer->validateMaskOnly(candidateDeadRankMask); } - { - std::lock_guard maskLock(mDeadRankMaskMutex); - mDeadRankMask = candidateDeadRankMask; - } for (auto& layer : mLayers) { layer->reconfigureMaskOnly(candidateDeadRankMask); } + { + std::lock_guard maskLock(mDeadRankMaskMutex); + mDeadRankMask = candidateDeadRankMask; + } } void MoeLoadBalancer::workerThread() diff --git a/cpp/tests/unit_tests/runtime/moeLoadBalancerTest.cpp b/cpp/tests/unit_tests/runtime/moeLoadBalancerTest.cpp index 5e829ee7259c..e6c77da77e96 100644 --- a/cpp/tests/unit_tests/runtime/moeLoadBalancerTest.cpp +++ b/cpp/tests/unit_tests/runtime/moeLoadBalancerTest.cpp @@ -290,6 +290,109 @@ INSTANTIATE_TEST_SUITE_P(PlacementTests, MoePlacementTest, return name; }); +TEST(MoeLoadBalancerMaskOnlyTest, DynamicPlacementHonorsDeadRankMask) +{ + constexpr int kExpertCount = 4; + constexpr int kTopK = 2; + constexpr int kEpRank = 0; + constexpr int kEpSize = 4; + constexpr int kSlotCountPerRank = 2; + constexpr int kDeadRank = 2; + constexpr int kActiveSlotCount = (kEpSize - 1) * kSlotCountPerRank; + + tensorrt_llm::kernels::MoeLoadBalanceMetaInfo metaInfo{kExpertCount, kTopK, kEpRank, kEpSize, kSlotCountPerRank}; + std::vector expertLoadFactor{8.0F, 4.0F, 2.0F, 1.0F}; + std::vector deadRankMask{0, 0, 1, 0}; + + MoePlacementCpuInfo cpuPlacement; + doReplication(metaInfo, expertLoadFactor.data(), &cpuPlacement, &deadRankMask); + + int replicaSum = 0; + for (int replicaCount : cpuPlacement.expertReplicaCount) + { + replicaSum += replicaCount; + } + EXPECT_EQ(replicaSum, kActiveSlotCount); + + cpuPlacement.rankExpertIds.resize(kEpSize); + for (int rank = 0; rank < kEpSize; ++rank) + { + cpuPlacement.rankExpertIds[rank].resize(kSlotCountPerRank, 99); + } + + doPlacement(metaInfo, expertLoadFactor.data(), &cpuPlacement, &deadRankMask); + + std::vector placedReplicas(kExpertCount, 0); + int assignedSlotCount = 0; + for (int rank = 0; rank < kEpSize; ++rank) + { + for (int slot = 0; slot < kSlotCountPerRank; ++slot) + { + int const expertId = cpuPlacement.rankExpertIds[rank][slot]; + if (rank == kDeadRank) + { + EXPECT_EQ(expertId, -1); + continue; + } + + EXPECT_GE(expertId, 0); + EXPECT_LT(expertId, kExpertCount); + if (expertId >= 0 && expertId < kExpertCount) + { + ++placedReplicas[expertId]; + ++assignedSlotCount; + } + } + } + + EXPECT_EQ(assignedSlotCount, kActiveSlotCount); + for (int expertId = 0; expertId < kExpertCount; ++expertId) + { + EXPECT_EQ(placedReplicas[expertId], cpuPlacement.expertReplicaCount[expertId]); + } +} + +TEST(MoeLoadBalancerMaskOnlyTest, DynamicPlacementRejectsMismatchedDeadRankMask) +{ + constexpr int kExpertCount = 4; + constexpr int kTopK = 2; + constexpr int kEpRank = 0; + constexpr int kEpSize = 4; + constexpr int kSlotCountPerRank = 2; + + tensorrt_llm::kernels::MoeLoadBalanceMetaInfo metaInfo{kExpertCount, kTopK, kEpRank, kEpSize, kSlotCountPerRank}; + std::vector expertLoadFactor{1.0F, 1.0F, 1.0F, 1.0F}; + std::vector deadRankMask{0, 1}; + + MoePlacementCpuInfo cpuPlacement; + EXPECT_THROW(doReplication(metaInfo, expertLoadFactor.data(), &cpuPlacement, &deadRankMask), + tensorrt_llm::common::TllmException); +} + +TEST(MoeLoadBalancerMaskOnlyTest, DynamicPlacementRejectsReplicaCountMismatch) +{ + constexpr int kExpertCount = 4; + constexpr int kTopK = 2; + constexpr int kEpRank = 0; + constexpr int kEpSize = 4; + constexpr int kSlotCountPerRank = 2; + + tensorrt_llm::kernels::MoeLoadBalanceMetaInfo metaInfo{kExpertCount, kTopK, kEpRank, kEpSize, kSlotCountPerRank}; + std::vector expertLoadFactor{1.0F, 1.0F, 1.0F, 1.0F}; + std::vector deadRankMask{0, 0, 1, 0}; + + MoePlacementCpuInfo cpuPlacement; + cpuPlacement.expertReplicaCount = {1, 1, 1, 1}; + cpuPlacement.rankExpertIds.resize(kEpSize); + for (int rank = 0; rank < kEpSize; ++rank) + { + cpuPlacement.rankExpertIds[rank].resize(kSlotCountPerRank, -1); + } + + EXPECT_THROW(doPlacement(metaInfo, expertLoadFactor.data(), &cpuPlacement, &deadRankMask), + tensorrt_llm::common::TllmException); +} + TEST(MoeLoadBalancerMaskOnlyTest, ReconfigureMaskOnlyRemovesDeadRankSlots) { setenv("TLLM_HOST_ACCESSIBLE_ALLOW_MANAGED_FALLBACK", "1", 1); diff --git a/tensorrt_llm/_torch/modules/fused_moe/moe_load_balancer.py b/tensorrt_llm/_torch/modules/fused_moe/moe_load_balancer.py index 19bcec6530f1..77526d5de652 100644 --- a/tensorrt_llm/_torch/modules/fused_moe/moe_load_balancer.py +++ b/tensorrt_llm/_torch/modules/fused_moe/moe_load_balancer.py @@ -989,7 +989,7 @@ def set_iter_info(self, enable_statistic: Optional[bool], if enable_update_weights is not None: self.enable_update_weights = enable_update_weights - def reconfigure_mask_only(self, dead_ranks: List[int]): + def reconfigure_mask_only(self, dead_ranks: list[int]) -> None: """ Reconfigure EPLB routing so slots on dead EP ranks are unreachable.