Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 81 additions & 19 deletions cpp/tensorrt_llm/common/ncclUtils.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2025-2026, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,7 +27,7 @@
namespace
{

// RAII guard for cudaMalloc — frees the pointer on destruction, logging a warning on failure.
// RAII guard for cudaMalloc. Frees the pointer on destruction, logging a warning on failure.
struct CudaMallocGuard
{
void* ptr{nullptr};
Expand Down Expand Up @@ -56,7 +56,7 @@ struct CudaMallocGuard
CudaMallocGuard& operator=(CudaMallocGuard const&) = delete;
};

// RAII guard for ncclMemAlloc — frees the pointer on destruction, logging a warning on failure.
// RAII guard for ncclMemAlloc. Frees the pointer on destruction, logging a warning on failure.
struct NcclMemGuard
{
void* ptr{nullptr};
Expand Down Expand Up @@ -344,6 +344,17 @@ NCCLWindowBuffer NCCLWindowAllocator::requestBuffer(ncclComm_t comm, size_t size
return bestFit->buffer;
}

// If a previous allocateAndRegisterBuffer call collectively failed for this comm at a size
// no larger than this request, do not retry the known-failing new allocation path. Smaller
// requests and already-pooled buffers can still use NCCL windows.
auto const failureIt = mMinSymmetricFailureSize.find(comm);
if (failureIt != mMinSymmetricFailureSize.end() && size >= failureIt->second)
{
TLLM_LOG_DEBUG("[NCCLUtil] Skipping NCCL window allocation for comm %p, size=%zu; known failure threshold=%zu",
static_cast<void*>(comm), size, failureIt->second);
return NCCLWindowBuffer();
}

// No available buffer found, avoid registration during CUDA graph capture
auto stream = at::cuda::getCurrentCUDAStream();
cudaStreamCaptureStatus capture_status = cudaStreamCaptureStatusNone;
Expand All @@ -364,11 +375,47 @@ NCCLWindowBuffer NCCLWindowAllocator::requestBuffer(ncclComm_t comm, size_t size
"[NCCLUtil] Allocating new NCCL window buffer for comm %p, size=%zu", static_cast<void*>(comm), size);
int handle = static_cast<int>(commBuffers.size());
NCCLWindowBuffer buffer = allocateAndRegisterBuffer(comm, size, handle);
commBuffers.push_back({buffer, true});
// Only cache valid buffers. allocateAndRegisterBuffer returns an empty buffer when any rank
// failed ncclMemAlloc (collective fallback to plain allreduce); caching it would leak a
// permanently "in use" empty entry per request because releaseBuffer is a no-op for nullptr.
if (buffer.isValid())
{
commBuffers.push_back({buffer, true});
}
else
{
// The collective allreduce inside allocateAndRegisterBuffer agreed that this request
// cannot use symmetric memory on at least one rank. Remember the smallest failing
// request size so repeated too-large autotuner probes do not keep stressing this path.
recordSymmetricFailureLocked(comm, size);
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

return buffer;
}

void NCCLWindowAllocator::recordSymmetricFailureLocked(ncclComm_t comm, size_t size)
{
auto failureIt = mMinSymmetricFailureSize.find(comm);
if (failureIt == mMinSymmetricFailureSize.end())
{
mMinSymmetricFailureSize.emplace(comm, size);
}
else if (size < failureIt->second)
{
failureIt->second = size;
}
}

cudaError_t NCCLWindowAllocator::clearCudaErrorIfSymmetricAllocationFailed(
int localAllocOk, CudaGetLastErrorFunc getLastError) noexcept
{
if (localAllocOk == 0)
{
return getLastError();
}
return cudaSuccess;
}

NCCLWindowBuffer NCCLWindowAllocator::searchBuffer(ncclComm_t comm, void* ptr) const
{
if (!comm || !ptr)
Expand Down Expand Up @@ -466,24 +513,37 @@ bool NCCLWindowAllocator::isCommValid(ncclComm_t comm) const noexcept

NCCLWindowBuffer NCCLWindowAllocator::allocateAndRegisterBuffer(ncclComm_t comm, size_t size, int handle)
{
// Step 1: Allocate symmetric memory (per-rank, non-collective — can fail asymmetrically).
// Step 1: Pre-allocate the rank-sync flag before ncclMemAlloc. ncclMemAlloc can fail
// asymmetrically with ncclUnhandledCudaError on configurations where the symmetric/VMM path
// is unavailable; that failure may leave a sticky CUDA last-error on the device. If we
// deferred this cudaMalloc until after the failure, the sticky error would propagate into
// cudaMalloc, TLLM_CUDA_CHECK would throw, and the failing rank would never reach the
// collective ncclAllReduce(min) below, hanging every other rank that did succeed.
int* rankSyncFlag = nullptr;
TLLM_CUDA_CHECK(cudaMalloc(&rankSyncFlag, sizeof(int)));
CudaMallocGuard flagGuard{rankSyncFlag}; // frees rankSyncFlag on any early return or exception
auto stream = at::cuda::getCurrentCUDAStream().stream();
TLLM_CUDA_CHECK(cudaMemsetAsync(rankSyncFlag, 0, sizeof(int), stream));

// Step 2: Allocate symmetric memory. This per-rank, non-collective call can fail
// asymmetrically. When it fails, NCCL may leave a sticky CUDA error behind; clear it before
// the stream-ordered flag copy and collective fallback so the failing rank still reaches
// ncclAllReduce with the other ranks.
void* ncclPtr = nullptr;
TLLM_NCCL_CHECK_WARN(ncclMemAlloc(&ncclPtr, size));
int const localAllocOk = (ncclPtr != nullptr) ? 1 : 0;
NcclMemGuard ncclGuard{ncclPtr}; // frees ncclPtr on any early return or exception
clearCudaErrorIfSymmetricAllocationFailed(localAllocOk);

// Step 2: ncclCommWindowRegister is collective — if any rank skips it, all other ranks hang.
// Synchronize the per-rank alloc status using a small cudaMalloc flag (not ncclMemAlloc, so
// OOM on symmetric memory does not prevent us from allocating the flag).
int* rankSyncFlag = nullptr;
TLLM_CUDA_CHECK(cudaMalloc(&rankSyncFlag, sizeof(int)));
CudaMallocGuard flagGuard{rankSyncFlag}; // frees rankSyncFlag on any early return or exception

// Step 3: Populate flag, reduce with min across ranks (0 if any rank failed), then read back.
// H2D failure is non-fatal: warn and continue — device flag may be stale but the allreduce
// must still be reached by all ranks. allreduce and D2H failures are catastrophic (throw).
auto stream = at::cuda::getCurrentCUDAStream().stream();
TLLM_CUDA_CHECK_WARN(cudaMemcpy(rankSyncFlag, &localAllocOk, sizeof(int), cudaMemcpyHostToDevice));
// Step 3: ncclCommWindowRegister is collective. If any rank skips it, all other ranks hang.
// Populate flag, reduce with min across ranks (0 if any rank failed), then read back.
// The flag is initialized to 0, so H2D failure is non-fatal and conservatively falls back
// to regular NCCL while still reaching the collective. allreduce and D2H failures throw.
if (localAllocOk != 0)
{
TLLM_CUDA_CHECK_WARN(
cudaMemcpyAsync(rankSyncFlag, &localAllocOk, sizeof(localAllocOk), cudaMemcpyHostToDevice, stream));
}
TLLM_NCCL_CHECK(ncclAllReduce(rankSyncFlag, rankSyncFlag, 1, ncclInt32, ncclMin, comm, stream));
TLLM_CUDA_CHECK_WARN(cudaStreamSynchronize(stream));

Expand All @@ -503,7 +563,7 @@ NCCLWindowBuffer NCCLWindowAllocator::allocateAndRegisterBuffer(ncclComm_t comm,
return NCCLWindowBuffer{}; // ncclGuard frees ncclPtr
}

// Step 4: Register with NCCL as a window (collective all ranks must reach this call).
// Step 4: Register with NCCL as a window. This is collective, so all ranks must reach it.
// Failure here is non-fatal: warn and fall back to regular allreduce.
// ncclGuard frees ncclPtr on return.
ncclWindow_t window = nullptr;
Expand All @@ -514,7 +574,7 @@ NCCLWindowBuffer NCCLWindowAllocator::allocateAndRegisterBuffer(ncclComm_t comm,
return NCCLWindowBuffer{};
}

// Step 5: Success — transfer ownership to the returned buffer.
// Step 5: Success. Transfer ownership to the returned buffer.
ncclGuard.release();
NCCLWindowBuffer buffer{ncclPtr, handle, size, window};
TLLM_LOG_TRACE("[NCCLUtil] Allocated and registered NCCL window buffer: handle=%d, ptr=%p, size=%zu, window=%p",
Expand Down Expand Up @@ -587,6 +647,7 @@ void NCCLWindowAllocator::cleanupBuffersForComm(ncclComm_t comm) noexcept
{
// No buffers to clean up, but mark as cleaned
mRegisteredComms.erase(comm);
mMinSymmetricFailureSize.erase(comm);
return;
}

Expand Down Expand Up @@ -662,6 +723,7 @@ void NCCLWindowAllocator::cleanupBuffersForComm(ncclComm_t comm) noexcept

mBufferPool.erase(commIt);
mRegisteredComms.erase(comm);
mMinSymmetricFailureSize.erase(comm);
}

#endif // NCCL_VERSION_CODE >= NCCL_VERSION(2, 28, 0)
Expand Down
17 changes: 16 additions & 1 deletion cpp/tensorrt_llm/common/ncclUtils.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2025-2026, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -265,12 +265,23 @@ class NCCLWindowAllocator
NCCLWindowAllocator& operator=(NCCLWindowAllocator&&) = delete;

private:
friend class NCCLWindowAllocatorTestAccess;

NCCLWindowAllocator() = default;
~NCCLWindowAllocator() = default;

// Allocate a new buffer and register it with NCCL as a window
NCCLWindowBuffer allocateAndRegisterBuffer(ncclComm_t comm, size_t size, int handle);

// Record a failed new symmetric allocation (assumes mMutex is already locked).
void recordSymmetricFailureLocked(ncclComm_t comm, size_t size);

using CudaGetLastErrorFunc = cudaError_t (*)();

// Drain the sticky CUDA error left by a failed symmetric allocation.
static cudaError_t clearCudaErrorIfSymmetricAllocationFailed(
int localAllocOk, CudaGetLastErrorFunc getLastError = cudaGetLastError) noexcept;

// Search for a buffer by pointer (assumes mMutex is already locked)
NCCLWindowBuffer searchBufferLocked(ncclComm_t comm, void* ptr) const;

Expand All @@ -289,6 +300,10 @@ class NCCLWindowAllocator
mutable std::mutex mMutex;
std::unordered_map<ncclComm_t, std::vector<BufferEntry>> mBufferPool;
std::unordered_set<ncclComm_t> mRegisteredComms;
// Smallest request size that is known to fail collectively for each communicator.
// Requests below the recorded size may still succeed and already-pooled buffers are always
// reused before consulting this cache.
std::unordered_map<ncclComm_t, size_t> mMinSymmetricFailureSize;
};

// RAII wrapper for NCCL window buffers
Expand Down
114 changes: 113 additions & 1 deletion cpp/tests/unit_tests/multi_gpu/ncclUtilsTest.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2025-2026, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@
#include "tensorrt_llm/runtime/utils/mpiUtils.h"

#include <gtest/gtest.h>
#include <mutex>
#include <nccl.h>
#include <thread>
#include <vector>
Expand All @@ -38,6 +39,36 @@ namespace nccl_util = tensorrt_llm::common::nccl_util;

using tensorrt_llm::getComm;

namespace tensorrt_llm::common::nccl_util
{
class NCCLWindowAllocatorTestAccess
{
public:
static void recordSymmetricFailure(NCCLWindowAllocator& allocator, ncclComm_t comm, size_t size)
{
std::lock_guard<std::mutex> lock(allocator.mMutex);
allocator.recordSymmetricFailureLocked(comm, size);
}

static cudaError_t clearCudaErrorIfSymmetricAllocationFailed(
int localAllocOk, NCCLWindowAllocator::CudaGetLastErrorFunc getLastError = cudaGetLastError)
{
return NCCLWindowAllocator::clearCudaErrorIfSymmetricAllocationFailed(localAllocOk, getLastError);
}
};
} // namespace tensorrt_llm::common::nccl_util

namespace
{
int gCudaGetLastErrorCallCount = 0;

cudaError_t fakeCudaGetLastError()
{
++gCudaGetLastErrorCallCount;
return cudaErrorLaunchFailure;
}
} // namespace

// Helper function to create a split communicator for testing
// This allows us to test cleanup behavior explicitly by controlling the lifetime
std::shared_ptr<ncclComm_t> createSplitComm(ncclComm_t parentComm, int color, int key)
Expand Down Expand Up @@ -321,6 +352,87 @@ TEST_F(NCCLWindowAllocatorTest, BestFitReuse)
allocator.releaseBuffer(*mComm, buffer768KB.ptr);
}

TEST_F(NCCLWindowAllocatorTest, FailureCacheIsSizeAwareForNewAllocations)
{
auto& allocator = nccl_util::NCCLWindowAllocator::getInstance();
auto testComm = createSplitComm(*mComm, 0, mRank);

constexpr size_t failureSize = 1024 * 1024;
nccl_util::NCCLWindowAllocatorTestAccess::recordSymmetricFailure(allocator, *testComm, failureSize);

auto smallBuffer = allocator.requestBuffer(*testComm, failureSize / 2);
ASSERT_TRUE(smallBuffer.isValid());
EXPECT_EQ(allocator.getBufferCount(*testComm), 1);

auto failedBuffer = allocator.requestBuffer(*testComm, failureSize);
EXPECT_FALSE(failedBuffer.isValid());
EXPECT_EQ(allocator.getBufferCount(*testComm), 1);

allocator.releaseBuffer(*testComm, smallBuffer.ptr);
testComm.reset();
}

TEST_F(NCCLWindowAllocatorTest, FailureCacheDoesNotDisableReusableBuffers)
{
auto& allocator = nccl_util::NCCLWindowAllocator::getInstance();
auto testComm = createSplitComm(*mComm, 0, mRank);

auto buffer1MB = allocator.requestBuffer(*testComm, 1024 * 1024);
ASSERT_TRUE(buffer1MB.isValid());
void* ptr1MB = buffer1MB.ptr;
allocator.releaseBuffer(*testComm, ptr1MB);

nccl_util::NCCLWindowAllocatorTestAccess::recordSymmetricFailure(allocator, *testComm, 512 * 1024);

auto reusedBuffer = allocator.requestBuffer(*testComm, 768 * 1024);
ASSERT_TRUE(reusedBuffer.isValid());
EXPECT_EQ(reusedBuffer.ptr, ptr1MB);
EXPECT_EQ(allocator.getBufferCount(*testComm), 1);
allocator.releaseBuffer(*testComm, reusedBuffer.ptr);

auto failedBuffer = allocator.requestBuffer(*testComm, 2 * 1024 * 1024);
EXPECT_FALSE(failedBuffer.isValid());
EXPECT_EQ(allocator.getBufferCount(*testComm), 1);

testComm.reset();
}

TEST_F(NCCLWindowAllocatorTest, FailureCacheKeepsSmallestFailureSize)
{
auto& allocator = nccl_util::NCCLWindowAllocator::getInstance();
auto testComm = createSplitComm(*mComm, 0, mRank);

nccl_util::NCCLWindowAllocatorTestAccess::recordSymmetricFailure(allocator, *testComm, 2 * 1024 * 1024);
nccl_util::NCCLWindowAllocatorTestAccess::recordSymmetricFailure(allocator, *testComm, 1024 * 1024);

auto smallBuffer = allocator.requestBuffer(*testComm, 768 * 1024);
ASSERT_TRUE(smallBuffer.isValid());
EXPECT_EQ(allocator.getBufferCount(*testComm), 1);

auto failedBuffer = allocator.requestBuffer(*testComm, 1536 * 1024);
EXPECT_FALSE(failedBuffer.isValid());
EXPECT_EQ(allocator.getBufferCount(*testComm), 1);

allocator.releaseBuffer(*testComm, smallBuffer.ptr);
testComm.reset();
}

TEST_F(NCCLWindowAllocatorTest, ClearsCudaErrorAfterLocalAllocationFailure)
{
auto const clearCudaErrorIfFailed = [](int localAllocOk)
{
return nccl_util::NCCLWindowAllocatorTestAccess::clearCudaErrorIfSymmetricAllocationFailed(
localAllocOk, fakeCudaGetLastError);
};

gCudaGetLastErrorCallCount = 0;
EXPECT_EQ(clearCudaErrorIfFailed(1), cudaSuccess);
EXPECT_EQ(gCudaGetLastErrorCallCount, 0);

EXPECT_EQ(clearCudaErrorIfFailed(0), cudaErrorLaunchFailure);
EXPECT_EQ(gCudaGetLastErrorCallCount, 1);
}

TEST_F(NCCLWindowAllocatorTest, MultipleBuffers)
{
auto& allocator = nccl_util::NCCLWindowAllocator::getInstance();
Expand Down
Loading