Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -232,14 +232,11 @@ public:

::cuda::std::pair<::std::shared_ptr<cudaGraphExec_t>, bool> cached_graphs_query(cudaGraph_t g)
{
size_t nedges;
size_t nnodes;

cuda_safe_call(cudaGraphGetNodes(g, nullptr, &nnodes));
const size_t nnodes = cuda_try<cudaGraphGetNodes>(g, nullptr);
#if _CCCL_CTK_AT_LEAST(13, 0)
cuda_safe_call(cudaGraphGetEdges(g, nullptr, nullptr, nullptr, &nedges));
const size_t nedges = cuda_try<cudaGraphGetEdges>(g, nullptr, nullptr, nullptr);
#else // _CCCL_CTK_AT_LEAST(13, 0)
cuda_safe_call(cudaGraphGetEdges(g, nullptr, nullptr, &nedges));
const size_t nedges = cuda_try<cudaGraphGetEdges>(g, nullptr, nullptr);
#endif // _CCCL_CTK_AT_LEAST(13, 0)

_CCCL_ASSERT(pimpl, "async_resources_handle is not initialized");
Expand Down
40 changes: 19 additions & 21 deletions cudax/include/cuda/experimental/__stf/internal/context.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -1121,18 +1121,19 @@ UNITTEST("context resources released on finalize non blocking")
}
};

cudaStream_t stream;
cuda_safe_call(cudaStreamCreate(&stream));
const cudaStream_t stream = cuda_try<cudaStreamCreate>();
SCOPE(exit)
{
cuda_safe_call(cudaStreamDestroy(stream));
};

bool released = false;
context ctx(stream, async_resources_handle());
ctx.add_resource(::std::make_shared<dummy_released_resource>(&released));
ctx.finalize(); // non-blocking: context was created with user stream
EXPECT(!released); // not yet, callback not run
cuda_safe_call(cudaStreamSynchronize(stream));
cuda_try<cudaStreamSynchronize>(stream);
EXPECT(released);

cuda_safe_call(cudaStreamDestroy(stream));
};

UNITTEST("context import_resources_from")
Expand Down Expand Up @@ -1182,8 +1183,11 @@ UNITTEST("context graph and stage")

UNITTEST("context with arguments")
{
cudaStream_t stream;
cuda_safe_call(cudaStreamCreate(&stream));
const cudaStream_t stream = cuda_try<cudaStreamCreate>();
SCOPE(exit)
{
cuda_safe_call(cudaStreamDestroy(stream));
};

async_resources_handle h;

Expand All @@ -1198,8 +1202,6 @@ UNITTEST("context with arguments")

context ctx4 = graph_ctx(stream, h);
ctx4.finalize();

cuda_safe_call(cudaStreamDestroy(stream));
};

# if !defined(CUDASTF_DISABLE_CODE_GENERATION) && _CCCL_CUDA_COMPILATION()
Expand Down Expand Up @@ -1564,7 +1566,7 @@ UNITTEST("context task")

ctx.task(la.read(), lb.write())->*[](auto s, auto a, auto b) {
// no-op
cudaMemcpyAsync(&b(0), &a(0), sizeof(int), cudaMemcpyDeviceToDevice, s);
cuda_safe_call(cudaMemcpyAsync(&b(0), &a(0), sizeof(int), cudaMemcpyDeviceToDevice, s));
};

ctx.finalize();
Expand Down Expand Up @@ -1706,8 +1708,7 @@ UNITTEST("make_tuple_indexwise")

UNITTEST("cuda stream place")
{
cudaStream_t user_stream;
cuda_safe_call(cudaStreamCreate(&user_stream));
const cudaStream_t user_stream = cuda_try<cudaStreamCreate>();
Comment thread
coderabbitai[bot] marked this conversation as resolved.

context ctx;

Expand All @@ -1726,16 +1727,14 @@ UNITTEST("cuda stream place")

UNITTEST("cuda stream place multi-gpu")
{
cudaStream_t user_stream;

// Create a CUDA stream in a different device (if available)
int ndevices = cuda_try<cudaGetDeviceCount>();
const int ndevices = cuda_try<cudaGetDeviceCount>();
// use the last device
int target_dev_id = ndevices - 1;
const int target_dev_id = ndevices - 1;

cuda_safe_call(cudaSetDevice(target_dev_id));
cuda_safe_call(cudaStreamCreate(&user_stream));
cuda_safe_call(cudaSetDevice(0));
cuda_try<cudaSetDevice>(target_dev_id);
const cudaStream_t user_stream = cuda_try<cudaStreamCreate>();
cuda_try<cudaSetDevice>(0);

context ctx;

Expand Down Expand Up @@ -1826,8 +1825,7 @@ UNITTEST("get_stream graph")
cudaStream_t s = t.get_stream();
// We are not capturing so there is no stream associated
EXPECT(s == nullptr);
cudaGraphNode_t n;
cuda_safe_call(cudaGraphAddEmptyNode(&n, t.get_graph(), nullptr, 0));
::std::ignore = cuda_try<cudaGraphAddEmptyNode>(t.get_graph(), nullptr, 0);
t.end();

auto t2 = ctx.task(token.rw());
Expand Down
45 changes: 25 additions & 20 deletions cudax/include/cuda/experimental/__stf/internal/ctx_resource.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <cuda/experimental/__stf/utility/core.cuh>
#include <cuda/experimental/__stf/utility/cuda_safe_call.cuh>
#include <cuda/experimental/__stf/utility/scope_guard.cuh>

#include <functional>
#include <memory>
Expand Down Expand Up @@ -87,43 +88,47 @@ public:
{
_CCCL_ASSERT(!resources_released, "Resources have already been released on this context");

// Separate resources into stream-dependent and callback-batched
decltype(resources) callback_resources;

for (auto& r : resources)
// Release stream-dependent resources and compact them out of `resources` by
// pulling the last element into each vacated slot. A resource leaves
// `resources` only after it has been released, so if release(stream) throws,
// `resources` still holds the failing resource plus everything not yet
// processed -- release() can be retried with nothing lost or double-released.
for (size_t i = 0; i < resources.size();)
{
if (r->can_release_in_callback())
{
callback_resources.push_back(mv(r));
}
else
if (resources[i]->can_release_in_callback())
{
r->release(stream);
++i;
continue;
}
resources[i]->release(stream); // may throw -> resources[i] stays in place
resources[i] = mv(resources.back());
resources.pop_back();
}
resources.clear();

// Batch all callback resources into a single host callback for efficiency
if (!callback_resources.empty())
if (!resources.empty())
{
// Transfer ownership of callback resources to the callback
auto* callback_list = new ::std::vector<::std::shared_ptr<ctx_resource>>(mv(callback_resources));
// Transfer ownership of callback resources to the callback. Held in a
// unique_ptr until the callback is successfully enqueued so a throw from
// cudaStreamAddCallback does not leak the list.
auto callback_list = ::std::make_unique<::std::vector<::std::shared_ptr<ctx_resource>>>(mv(resources));

// Add a single host callback using lambda that will release all callback resources
auto release_lambda = [](cudaStream_t /*stream*/, cudaError_t /*status*/, void* userData) -> void {
auto* resources = static_cast<::std::vector<::std::shared_ptr<ctx_resource>>*>(userData);
auto* resources = static_cast<decltype(callback_list.get())>(userData);
SCOPE(exit)
{
delete resources;
};

// Release all callback resources
for (auto& resource : *resources)
{
resource->release_in_callback();
}

// Clean up the callback list itself
delete resources;
};

cuda_safe_call(cudaStreamAddCallback(stream, release_lambda, callback_list, 0));
cuda_try<cudaStreamAddCallback>(stream, release_lambda, callback_list.get(), 0);
callback_list.release();
}

// Mark as released to prevent double release
Expand Down
Loading
Loading