diff --git a/cpp/tensorrt_llm/common/ncclUtils.cpp b/cpp/tensorrt_llm/common/ncclUtils.cpp index 243cc7b2884..e36cfddcd40 100644 --- a/cpp/tensorrt_llm/common/ncclUtils.cpp +++ b/cpp/tensorrt_llm/common/ncclUtils.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2025-2026, NVIDIA CORPORATION. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,7 +27,7 @@ namespace { -// RAII guard for cudaMalloc — frees the pointer on destruction, logging a warning on failure. +// RAII guard for cudaMalloc. Frees the pointer on destruction, logging a warning on failure. struct CudaMallocGuard { void* ptr{nullptr}; @@ -56,7 +56,7 @@ struct CudaMallocGuard CudaMallocGuard& operator=(CudaMallocGuard const&) = delete; }; -// RAII guard for ncclMemAlloc — frees the pointer on destruction, logging a warning on failure. +// RAII guard for ncclMemAlloc. Frees the pointer on destruction, logging a warning on failure. struct NcclMemGuard { void* ptr{nullptr}; @@ -440,6 +440,17 @@ NCCLWindowBuffer NCCLWindowAllocator::requestBuffer(ncclComm_t comm, size_t size return bestFit->buffer; } + // If a previous allocateAndRegisterBuffer call collectively failed for this comm at a size + // no larger than this request, do not retry the known-failing new allocation path. Smaller + // requests and already-pooled buffers can still use NCCL windows. + auto const failureIt = mMinSymmetricFailureSize.find(comm); + if (failureIt != mMinSymmetricFailureSize.end() && size >= failureIt->second) + { + TLLM_LOG_DEBUG("[NCCLUtil] Skipping NCCL window allocation for comm %p, size=%zu; known failure threshold=%zu", + static_cast(comm), size, failureIt->second); + return NCCLWindowBuffer(); + } + // No available buffer found, avoid registration during CUDA graph capture auto stream = at::cuda::getCurrentCUDAStream(); cudaStreamCaptureStatus capture_status = cudaStreamCaptureStatusNone; @@ -460,11 +471,47 @@ NCCLWindowBuffer NCCLWindowAllocator::requestBuffer(ncclComm_t comm, size_t size "[NCCLUtil] Allocating new NCCL window buffer for comm %p, size=%zu", static_cast(comm), size); int handle = static_cast(commBuffers.size()); NCCLWindowBuffer buffer = allocateAndRegisterBuffer(comm, size, handle); - commBuffers.push_back({buffer, true}); + // Only cache valid buffers. allocateAndRegisterBuffer returns an empty buffer when any rank + // failed ncclMemAlloc (collective fallback to plain allreduce); caching it would leak a + // permanently "in use" empty entry per request because releaseBuffer is a no-op for nullptr. + if (buffer.isValid()) + { + commBuffers.push_back({buffer, true}); + } + else + { + // The collective allreduce inside allocateAndRegisterBuffer agreed that this request + // cannot use symmetric memory on at least one rank. Remember the smallest failing + // request size so repeated too-large autotuner probes do not keep stressing this path. + recordSymmetricFailureLocked(comm, size); + } return buffer; } +void NCCLWindowAllocator::recordSymmetricFailureLocked(ncclComm_t comm, size_t size) +{ + auto failureIt = mMinSymmetricFailureSize.find(comm); + if (failureIt == mMinSymmetricFailureSize.end()) + { + mMinSymmetricFailureSize.emplace(comm, size); + } + else if (size < failureIt->second) + { + failureIt->second = size; + } +} + +cudaError_t NCCLWindowAllocator::clearCudaErrorIfSymmetricAllocationFailed( + int localAllocOk, CudaGetLastErrorFunc getLastError) noexcept +{ + if (localAllocOk == 0) + { + return getLastError(); + } + return cudaSuccess; +} + NCCLWindowBuffer NCCLWindowAllocator::searchBuffer(ncclComm_t comm, void* ptr) const { if (!comm || !ptr) @@ -562,24 +609,37 @@ bool NCCLWindowAllocator::isCommValid(ncclComm_t comm) const noexcept NCCLWindowBuffer NCCLWindowAllocator::allocateAndRegisterBuffer(ncclComm_t comm, size_t size, int handle) { - // Step 1: Allocate symmetric memory (per-rank, non-collective — can fail asymmetrically). + // Step 1: Pre-allocate the rank-sync flag before ncclMemAlloc. ncclMemAlloc can fail + // asymmetrically with ncclUnhandledCudaError on configurations where the symmetric/VMM path + // is unavailable; that failure may leave a sticky CUDA last-error on the device. If we + // deferred this cudaMalloc until after the failure, the sticky error would propagate into + // cudaMalloc, TLLM_CUDA_CHECK would throw, and the failing rank would never reach the + // collective ncclAllReduce(min) below, hanging every other rank that did succeed. + int* rankSyncFlag = nullptr; + TLLM_CUDA_CHECK(cudaMalloc(&rankSyncFlag, sizeof(int))); + CudaMallocGuard flagGuard{rankSyncFlag}; // frees rankSyncFlag on any early return or exception + auto stream = at::cuda::getCurrentCUDAStream().stream(); + TLLM_CUDA_CHECK(cudaMemsetAsync(rankSyncFlag, 0, sizeof(int), stream)); + + // Step 2: Allocate symmetric memory. This per-rank, non-collective call can fail + // asymmetrically. When it fails, NCCL may leave a sticky CUDA error behind; clear it before + // the stream-ordered flag copy and collective fallback so the failing rank still reaches + // ncclAllReduce with the other ranks. void* ncclPtr = nullptr; TLLM_NCCL_CHECK_WARN(ncclMemAlloc(&ncclPtr, size)); int const localAllocOk = (ncclPtr != nullptr) ? 1 : 0; NcclMemGuard ncclGuard{ncclPtr}; // frees ncclPtr on any early return or exception + clearCudaErrorIfSymmetricAllocationFailed(localAllocOk); - // Step 2: ncclCommWindowRegister is collective — if any rank skips it, all other ranks hang. - // Synchronize the per-rank alloc status using a small cudaMalloc flag (not ncclMemAlloc, so - // OOM on symmetric memory does not prevent us from allocating the flag). - int* rankSyncFlag = nullptr; - TLLM_CUDA_CHECK(cudaMalloc(&rankSyncFlag, sizeof(int))); - CudaMallocGuard flagGuard{rankSyncFlag}; // frees rankSyncFlag on any early return or exception - - // Step 3: Populate flag, reduce with min across ranks (0 if any rank failed), then read back. - // H2D failure is non-fatal: warn and continue — device flag may be stale but the allreduce - // must still be reached by all ranks. allreduce and D2H failures are catastrophic (throw). - auto stream = at::cuda::getCurrentCUDAStream().stream(); - TLLM_CUDA_CHECK_WARN(cudaMemcpy(rankSyncFlag, &localAllocOk, sizeof(int), cudaMemcpyHostToDevice)); + // Step 3: ncclCommWindowRegister is collective. If any rank skips it, all other ranks hang. + // Populate flag, reduce with min across ranks (0 if any rank failed), then read back. + // The flag is initialized to 0, so H2D failure is non-fatal and conservatively falls back + // to regular NCCL while still reaching the collective. allreduce and D2H failures throw. + if (localAllocOk != 0) + { + TLLM_CUDA_CHECK_WARN( + cudaMemcpyAsync(rankSyncFlag, &localAllocOk, sizeof(localAllocOk), cudaMemcpyHostToDevice, stream)); + } TLLM_NCCL_CHECK(ncclAllReduce(rankSyncFlag, rankSyncFlag, 1, ncclInt32, ncclMin, comm, stream)); TLLM_CUDA_CHECK_WARN(cudaStreamSynchronize(stream)); @@ -599,7 +659,7 @@ NCCLWindowBuffer NCCLWindowAllocator::allocateAndRegisterBuffer(ncclComm_t comm, return NCCLWindowBuffer{}; // ncclGuard frees ncclPtr } - // Step 4: Register with NCCL as a window (collective — all ranks must reach this call). + // Step 4: Register with NCCL as a window. This is collective, so all ranks must reach it. // Failure here is non-fatal: warn and fall back to regular allreduce. // ncclGuard frees ncclPtr on return. ncclWindow_t window = nullptr; @@ -610,7 +670,7 @@ NCCLWindowBuffer NCCLWindowAllocator::allocateAndRegisterBuffer(ncclComm_t comm, return NCCLWindowBuffer{}; } - // Step 5: Success — transfer ownership to the returned buffer. + // Step 5: Success. Transfer ownership to the returned buffer. ncclGuard.release(); NCCLWindowBuffer buffer{ncclPtr, handle, size, window}; TLLM_LOG_TRACE("[NCCLUtil] Allocated and registered NCCL window buffer: handle=%d, ptr=%p, size=%zu, window=%p", @@ -683,6 +743,7 @@ void NCCLWindowAllocator::cleanupBuffersForComm(ncclComm_t comm) noexcept { // No buffers to clean up, but mark as cleaned mRegisteredComms.erase(comm); + mMinSymmetricFailureSize.erase(comm); return; } @@ -758,6 +819,7 @@ void NCCLWindowAllocator::cleanupBuffersForComm(ncclComm_t comm) noexcept mBufferPool.erase(commIt); mRegisteredComms.erase(comm); + mMinSymmetricFailureSize.erase(comm); } #endif // NCCL_VERSION_CODE >= NCCL_VERSION(2, 28, 0) diff --git a/cpp/tensorrt_llm/common/ncclUtils.h b/cpp/tensorrt_llm/common/ncclUtils.h index 672613f32e4..fdc88f1aaab 100644 --- a/cpp/tensorrt_llm/common/ncclUtils.h +++ b/cpp/tensorrt_llm/common/ncclUtils.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2025-2026, NVIDIA CORPORATION. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -258,12 +258,23 @@ class NCCLWindowAllocator NCCLWindowAllocator& operator=(NCCLWindowAllocator&&) = delete; private: + friend class NCCLWindowAllocatorTestAccess; + NCCLWindowAllocator() = default; ~NCCLWindowAllocator() = default; // Allocate a new buffer and register it with NCCL as a window NCCLWindowBuffer allocateAndRegisterBuffer(ncclComm_t comm, size_t size, int handle); + // Record a failed new symmetric allocation (assumes mMutex is already locked). + void recordSymmetricFailureLocked(ncclComm_t comm, size_t size); + + using CudaGetLastErrorFunc = cudaError_t (*)(); + + // Drain the sticky CUDA error left by a failed symmetric allocation. + static cudaError_t clearCudaErrorIfSymmetricAllocationFailed( + int localAllocOk, CudaGetLastErrorFunc getLastError = cudaGetLastError) noexcept; + // Search for a buffer by pointer (assumes mMutex is already locked) NCCLWindowBuffer searchBufferLocked(ncclComm_t comm, void* ptr) const; @@ -282,6 +293,10 @@ class NCCLWindowAllocator mutable std::mutex mMutex; std::unordered_map> mBufferPool; std::unordered_set mRegisteredComms; + // Smallest request size that is known to fail collectively for each communicator. + // Requests below the recorded size may still succeed and already-pooled buffers are always + // reused before consulting this cache. + std::unordered_map mMinSymmetricFailureSize; }; // RAII wrapper for NCCL window buffers diff --git a/cpp/tests/unit_tests/multi_gpu/ncclUtilsTest.cpp b/cpp/tests/unit_tests/multi_gpu/ncclUtilsTest.cpp index 2dca1418ecf..4f3309431a2 100644 --- a/cpp/tests/unit_tests/multi_gpu/ncclUtilsTest.cpp +++ b/cpp/tests/unit_tests/multi_gpu/ncclUtilsTest.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2025-2026, NVIDIA CORPORATION. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ #include "tensorrt_llm/runtime/utils/mpiUtils.h" #include +#include #include #include #include @@ -38,6 +39,36 @@ namespace nccl_util = tensorrt_llm::common::nccl_util; using tensorrt_llm::getComm; +namespace tensorrt_llm::common::nccl_util +{ +class NCCLWindowAllocatorTestAccess +{ +public: + static void recordSymmetricFailure(NCCLWindowAllocator& allocator, ncclComm_t comm, size_t size) + { + std::lock_guard lock(allocator.mMutex); + allocator.recordSymmetricFailureLocked(comm, size); + } + + static cudaError_t clearCudaErrorIfSymmetricAllocationFailed( + int localAllocOk, NCCLWindowAllocator::CudaGetLastErrorFunc getLastError = cudaGetLastError) + { + return NCCLWindowAllocator::clearCudaErrorIfSymmetricAllocationFailed(localAllocOk, getLastError); + } +}; +} // namespace tensorrt_llm::common::nccl_util + +namespace +{ +int gCudaGetLastErrorCallCount = 0; + +cudaError_t fakeCudaGetLastError() +{ + ++gCudaGetLastErrorCallCount; + return cudaErrorLaunchFailure; +} +} // namespace + TEST(NCCLWindowSupportTest, RuntimeVersionAndGB10Gate) { #if NCCL_VERSION_CODE >= NCCL_VERSION(2, 28, 0) @@ -337,6 +368,87 @@ TEST_F(NCCLWindowAllocatorTest, BestFitReuse) allocator.releaseBuffer(*mComm, buffer768KB.ptr); } +TEST_F(NCCLWindowAllocatorTest, FailureCacheIsSizeAwareForNewAllocations) +{ + auto& allocator = nccl_util::NCCLWindowAllocator::getInstance(); + auto testComm = createSplitComm(*mComm, 0, mRank); + + constexpr size_t failureSize = 1024 * 1024; + nccl_util::NCCLWindowAllocatorTestAccess::recordSymmetricFailure(allocator, *testComm, failureSize); + + auto smallBuffer = allocator.requestBuffer(*testComm, failureSize / 2); + ASSERT_TRUE(smallBuffer.isValid()); + EXPECT_EQ(allocator.getBufferCount(*testComm), 1); + + auto failedBuffer = allocator.requestBuffer(*testComm, failureSize); + EXPECT_FALSE(failedBuffer.isValid()); + EXPECT_EQ(allocator.getBufferCount(*testComm), 1); + + allocator.releaseBuffer(*testComm, smallBuffer.ptr); + testComm.reset(); +} + +TEST_F(NCCLWindowAllocatorTest, FailureCacheDoesNotDisableReusableBuffers) +{ + auto& allocator = nccl_util::NCCLWindowAllocator::getInstance(); + auto testComm = createSplitComm(*mComm, 0, mRank); + + auto buffer1MB = allocator.requestBuffer(*testComm, 1024 * 1024); + ASSERT_TRUE(buffer1MB.isValid()); + void* ptr1MB = buffer1MB.ptr; + allocator.releaseBuffer(*testComm, ptr1MB); + + nccl_util::NCCLWindowAllocatorTestAccess::recordSymmetricFailure(allocator, *testComm, 512 * 1024); + + auto reusedBuffer = allocator.requestBuffer(*testComm, 768 * 1024); + ASSERT_TRUE(reusedBuffer.isValid()); + EXPECT_EQ(reusedBuffer.ptr, ptr1MB); + EXPECT_EQ(allocator.getBufferCount(*testComm), 1); + allocator.releaseBuffer(*testComm, reusedBuffer.ptr); + + auto failedBuffer = allocator.requestBuffer(*testComm, 2 * 1024 * 1024); + EXPECT_FALSE(failedBuffer.isValid()); + EXPECT_EQ(allocator.getBufferCount(*testComm), 1); + + testComm.reset(); +} + +TEST_F(NCCLWindowAllocatorTest, FailureCacheKeepsSmallestFailureSize) +{ + auto& allocator = nccl_util::NCCLWindowAllocator::getInstance(); + auto testComm = createSplitComm(*mComm, 0, mRank); + + nccl_util::NCCLWindowAllocatorTestAccess::recordSymmetricFailure(allocator, *testComm, 2 * 1024 * 1024); + nccl_util::NCCLWindowAllocatorTestAccess::recordSymmetricFailure(allocator, *testComm, 1024 * 1024); + + auto smallBuffer = allocator.requestBuffer(*testComm, 768 * 1024); + ASSERT_TRUE(smallBuffer.isValid()); + EXPECT_EQ(allocator.getBufferCount(*testComm), 1); + + auto failedBuffer = allocator.requestBuffer(*testComm, 1536 * 1024); + EXPECT_FALSE(failedBuffer.isValid()); + EXPECT_EQ(allocator.getBufferCount(*testComm), 1); + + allocator.releaseBuffer(*testComm, smallBuffer.ptr); + testComm.reset(); +} + +TEST_F(NCCLWindowAllocatorTest, ClearsCudaErrorAfterLocalAllocationFailure) +{ + auto const clearCudaErrorIfFailed = [](int localAllocOk) + { + return nccl_util::NCCLWindowAllocatorTestAccess::clearCudaErrorIfSymmetricAllocationFailed( + localAllocOk, fakeCudaGetLastError); + }; + + gCudaGetLastErrorCallCount = 0; + EXPECT_EQ(clearCudaErrorIfFailed(1), cudaSuccess); + EXPECT_EQ(gCudaGetLastErrorCallCount, 0); + + EXPECT_EQ(clearCudaErrorIfFailed(0), cudaErrorLaunchFailure); + EXPECT_EQ(gCudaGetLastErrorCallCount, 1); +} + TEST_F(NCCLWindowAllocatorTest, MultipleBuffers) { auto& allocator = nccl_util::NCCLWindowAllocator::getInstance();