diff --git a/cudax/include/cuda/experimental/__stf/internal/async_resources_handle.cuh b/cudax/include/cuda/experimental/__stf/internal/async_resources_handle.cuh index 4cf56af9dd1..54ac94bf40e 100644 --- a/cudax/include/cuda/experimental/__stf/internal/async_resources_handle.cuh +++ b/cudax/include/cuda/experimental/__stf/internal/async_resources_handle.cuh @@ -232,14 +232,11 @@ public: ::cuda::std::pair<::std::shared_ptr, bool> cached_graphs_query(cudaGraph_t g) { - size_t nedges; - size_t nnodes; - - cuda_safe_call(cudaGraphGetNodes(g, nullptr, &nnodes)); + const size_t nnodes = cuda_try(g, nullptr); #if _CCCL_CTK_AT_LEAST(13, 0) - cuda_safe_call(cudaGraphGetEdges(g, nullptr, nullptr, nullptr, &nedges)); + const size_t nedges = cuda_try(g, nullptr, nullptr, nullptr); #else // _CCCL_CTK_AT_LEAST(13, 0) - cuda_safe_call(cudaGraphGetEdges(g, nullptr, nullptr, &nedges)); + const size_t nedges = cuda_try(g, nullptr, nullptr); #endif // _CCCL_CTK_AT_LEAST(13, 0) _CCCL_ASSERT(pimpl, "async_resources_handle is not initialized"); diff --git a/cudax/include/cuda/experimental/__stf/internal/context.cuh b/cudax/include/cuda/experimental/__stf/internal/context.cuh index f99c0115c33..3e7468618fc 100644 --- a/cudax/include/cuda/experimental/__stf/internal/context.cuh +++ b/cudax/include/cuda/experimental/__stf/internal/context.cuh @@ -1121,18 +1121,19 @@ UNITTEST("context resources released on finalize non blocking") } }; - cudaStream_t stream; - cuda_safe_call(cudaStreamCreate(&stream)); + const cudaStream_t stream = cuda_try(); + SCOPE(exit) + { + cuda_safe_call(cudaStreamDestroy(stream)); + }; bool released = false; context ctx(stream, async_resources_handle()); ctx.add_resource(::std::make_shared(&released)); ctx.finalize(); // non-blocking: context was created with user stream EXPECT(!released); // not yet, callback not run - cuda_safe_call(cudaStreamSynchronize(stream)); + cuda_try(stream); EXPECT(released); - - cuda_safe_call(cudaStreamDestroy(stream)); }; UNITTEST("context import_resources_from") @@ -1182,8 +1183,11 @@ UNITTEST("context graph and stage") UNITTEST("context with arguments") { - cudaStream_t stream; - cuda_safe_call(cudaStreamCreate(&stream)); + const cudaStream_t stream = cuda_try(); + SCOPE(exit) + { + cuda_safe_call(cudaStreamDestroy(stream)); + }; async_resources_handle h; @@ -1198,8 +1202,6 @@ UNITTEST("context with arguments") context ctx4 = graph_ctx(stream, h); ctx4.finalize(); - - cuda_safe_call(cudaStreamDestroy(stream)); }; # if !defined(CUDASTF_DISABLE_CODE_GENERATION) && _CCCL_CUDA_COMPILATION() @@ -1564,7 +1566,7 @@ UNITTEST("context task") ctx.task(la.read(), lb.write())->*[](auto s, auto a, auto b) { // no-op - cudaMemcpyAsync(&b(0), &a(0), sizeof(int), cudaMemcpyDeviceToDevice, s); + cuda_safe_call(cudaMemcpyAsync(&b(0), &a(0), sizeof(int), cudaMemcpyDeviceToDevice, s)); }; ctx.finalize(); @@ -1706,8 +1708,7 @@ UNITTEST("make_tuple_indexwise") UNITTEST("cuda stream place") { - cudaStream_t user_stream; - cuda_safe_call(cudaStreamCreate(&user_stream)); + const cudaStream_t user_stream = cuda_try(); context ctx; @@ -1726,16 +1727,14 @@ UNITTEST("cuda stream place") UNITTEST("cuda stream place multi-gpu") { - cudaStream_t user_stream; - // Create a CUDA stream in a different device (if available) - int ndevices = cuda_try(); + const int ndevices = cuda_try(); // use the last device - int target_dev_id = ndevices - 1; + const int target_dev_id = ndevices - 1; - cuda_safe_call(cudaSetDevice(target_dev_id)); - cuda_safe_call(cudaStreamCreate(&user_stream)); - cuda_safe_call(cudaSetDevice(0)); + cuda_try(target_dev_id); + const cudaStream_t user_stream = cuda_try(); + cuda_try(0); context ctx; @@ -1826,8 +1825,7 @@ UNITTEST("get_stream graph") cudaStream_t s = t.get_stream(); // We are not capturing so there is no stream associated EXPECT(s == nullptr); - cudaGraphNode_t n; - cuda_safe_call(cudaGraphAddEmptyNode(&n, t.get_graph(), nullptr, 0)); + ::std::ignore = cuda_try(t.get_graph(), nullptr, 0); t.end(); auto t2 = ctx.task(token.rw()); diff --git a/cudax/include/cuda/experimental/__stf/internal/ctx_resource.cuh b/cudax/include/cuda/experimental/__stf/internal/ctx_resource.cuh index d0cd3831b24..79eaa4a804f 100644 --- a/cudax/include/cuda/experimental/__stf/internal/ctx_resource.cuh +++ b/cudax/include/cuda/experimental/__stf/internal/ctx_resource.cuh @@ -17,6 +17,7 @@ #include #include +#include #include #include @@ -87,43 +88,47 @@ public: { _CCCL_ASSERT(!resources_released, "Resources have already been released on this context"); - // Separate resources into stream-dependent and callback-batched - decltype(resources) callback_resources; - - for (auto& r : resources) + // Release stream-dependent resources and compact them out of `resources` by + // pulling the last element into each vacated slot. A resource leaves + // `resources` only after it has been released, so if release(stream) throws, + // `resources` still holds the failing resource plus everything not yet + // processed -- release() can be retried with nothing lost or double-released. + for (size_t i = 0; i < resources.size();) { - if (r->can_release_in_callback()) - { - callback_resources.push_back(mv(r)); - } - else + if (resources[i]->can_release_in_callback()) { - r->release(stream); + ++i; + continue; } + resources[i]->release(stream); // may throw -> resources[i] stays in place + resources[i] = mv(resources.back()); + resources.pop_back(); } - resources.clear(); - // Batch all callback resources into a single host callback for efficiency - if (!callback_resources.empty()) + if (!resources.empty()) { - // Transfer ownership of callback resources to the callback - auto* callback_list = new ::std::vector<::std::shared_ptr>(mv(callback_resources)); + // Transfer ownership of callback resources to the callback. Held in a + // unique_ptr until the callback is successfully enqueued so a throw from + // cudaStreamAddCallback does not leak the list. + auto callback_list = ::std::make_unique<::std::vector<::std::shared_ptr>>(mv(resources)); // Add a single host callback using lambda that will release all callback resources auto release_lambda = [](cudaStream_t /*stream*/, cudaError_t /*status*/, void* userData) -> void { - auto* resources = static_cast<::std::vector<::std::shared_ptr>*>(userData); + auto* resources = static_cast(userData); + SCOPE(exit) + { + delete resources; + }; // Release all callback resources for (auto& resource : *resources) { resource->release_in_callback(); } - - // Clean up the callback list itself - delete resources; }; - cuda_safe_call(cudaStreamAddCallback(stream, release_lambda, callback_list, 0)); + cuda_try(stream, release_lambda, callback_list.get(), 0); + callback_list.release(); } // Mark as released to prevent double release diff --git a/cudax/include/cuda/experimental/__stf/internal/slice.cuh b/cudax/include/cuda/experimental/__stf/internal/slice.cuh index ffa39b678f4..af97157da32 100644 --- a/cudax/include/cuda/experimental/__stf/internal/slice.cuh +++ b/cudax/include/cuda/experimental/__stf/internal/slice.cuh @@ -31,6 +31,7 @@ #include #include #include +#include #include @@ -803,83 +804,46 @@ UNITTEST("3D slice should be similar to 3D mdspan", (slice())) #endif // UNITTESTED_FILE /** - * @brief Pins a slice in host memory for efficient use with CUDA primitives + * @brief Invokes `f(base, n)` once for each maximal contiguous hunk of `s`. * - * @tparam T memory type - * @tparam dimensions slice dimension - * @param s slice to pin + * `base` points to the first element of the hunk and `n` is the number of + * contiguous elements in it. The leading `contiguous_dims(s)` dimensions form a + * single contiguous run; the remaining dimensions are enumerated recursively, + * yielding one hunk per index tuple. Together the hunks cover every element of + * `s` exactly once, assuming the STF slice convention that dimension 0 is + * unit-stride. */ -template -bool pin(mdspan& s) +namespace reserved +{ +template +void for_each_contiguous_hunk(const mdspan& s, F&& f) { - // We need the rank as a constexpr value constexpr auto rank = mdspan::extents_type::rank(); + const size_t c = contiguous_dims(s); - if (address_is_pinned(s.data_handle())) + // The contiguous prefix [0, c) is a single run of this many elements. + size_t hunk = 1; + for (size_t d = 0; d < c; ++d) { - return false; + hunk *= s.extent(d); } - if constexpr (rank == 0) - { - cuda_safe_call(pin_memory(s.data_handle(), 1)); - } - else if constexpr (rank == 1) - { - cuda_safe_call(pin_memory(s.data_handle(), s.extent(0))); - } - else if constexpr (rank == 2) - { - switch (contiguous_dims(s)) + // Walk the Cartesian product of the trailing, non-contiguous dims [c, rank); + // each index tuple is the base of one contiguous hunk. + auto rec = [&](auto&& self, T* base, size_t dim) -> void { + if (dim == rank) { - case 1: - for (size_t index_1 = 0; index_1 < s.extent(1); index_1++) - { - cuda_safe_call(pin_memory(&s(0, index_1) + index_1 * s.stride(1), s.extent(0))); - } - break; - case 2: - // fprintf(stderr, "PIN 2D - contiguous\n"); - cuda_safe_call(pin_memory(s.data_handle(), s.extent(0) * s.extent(1))); - break; - default: - assert(false); - abort(); + f(base, hunk); + return; } - } - else - { - static_assert(rank == 3, "Dimensionality not supported."); - switch (contiguous_dims(s)) + for (size_t i = 0; i < s.extent(dim); ++i) { - case 1: - for (size_t index_2 = 0; index_2 < s.extent(2); index_2++) - { - for (size_t index_1 = 0; index_1 < s.extent(1); index_1++) - { - // fprintf(stderr, "ADDR %d,%d,0 = %p \n", index_2, index_1, &s(index_2, index_1, 0)); - cuda_safe_call(pin_memory(&s(0, index_1, index_2), s.extent(0))); - } - } - break; - case 2: - for (size_t index_2 = 0; index_2 < s.extent(2); index_2++) - { - cuda_safe_call(pin_memory(&s(0, 0, index_2), s.extent(0) * s.extent(1))); - } - break; - case 3: - // fprintf(stderr, "PIN 3D - contiguous\n"); - cuda_safe_call(pin_memory(s.data_handle(), s.extent(0) * s.extent(1) * s.extent(2))); - break; - default: - assert(false); - abort(); + self(self, base + i * s.stride(dim), dim + 1); } - } - - return true; + }; + rec(rec, s.data_handle(), c); } +} // namespace reserved /** * @brief Unpin the memory associated with an mdspan object. @@ -887,70 +851,48 @@ bool pin(mdspan& s) * @tparam T The type of elements in the mdspan. * @tparam P The properties of the mdspan. * @param s The mdspan object to unpin memory for. + * + * `unpin_memory` silently ignores regions that are not currently registered, so + * this is safe on fully- *or* partially-pinned slices (e.g. a `pin()` that + * failed partway and rolled back, or an already-unpinned slice). */ template void unpin(mdspan& s) { - // We need the rank as a constexpr value - constexpr auto rank = mdspan::extents_type::rank(); + reserved::for_each_contiguous_hunk(s, [](T* base, size_t /*n*/) { + unpin_memory(base); + }); +} - if constexpr (rank == 0) - { - unpin_memory(s.data_handle()); - } - else if constexpr (rank == 1) - { - unpin_memory(s.data_handle()); - } - else if constexpr (rank == 2) +/** + * @brief Pins a slice in host memory for efficient use with CUDA primitives + * + * @tparam T memory type + * @tparam P slice properties + * @param s slice to pin + * @return true if the slice was newly pinned, false if it was already pinned + */ +template +bool pin(mdspan& s) +{ + if (address_is_pinned(s.data_handle())) { - switch (contiguous_dims(s)) - { - case 1: - for (size_t index_1 = 0; index_1 < s.extent(1); index_1++) - { - unpin_memory(&s(0, index_1) + index_1 * s.extent(0)); - } - break; - case 2: - // fprintf(stderr, "PIN 2D - contiguous\n"); - unpin_memory(s.data_handle()); - break; - default: - assert(false); - abort(); - } + return false; } - else + + // Roll back on any failure. unpin() tolerates hunks that were never pinned + // (the one that threw, plus the ones we never reached), so this leaves the + // slice fully unpinned -- consistent with the address_is_pinned() proxy above. + SCOPE(fail) { - static_assert(rank == 3, "Dimensionality not supported."); - switch (contiguous_dims(s)) - { - case 1: - for (size_t index_2 = 0; index_2 < s.extent(2); index_2++) - { - for (size_t index_1 = 0; index_1 < s.extent(1); index_1++) - { - // fprintf(stderr, "ADDR %d,%d,0 = %p \n", index_2, index_1, &s(index_2, index_1, 0)); - unpin_memory(&s(0, index_1, index_2)); - } - } - break; - case 2: - for (size_t index_2 = 0; index_2 < s.extent(2); index_2++) - { - unpin_memory(&s(0, 0, index_2)); - } - break; - case 3: - // fprintf(stderr, "PIN 3D - contiguous\n"); - unpin_memory(s.data_handle()); - break; - default: - assert(false); - abort(); - } - } + unpin(s); + }; + + reserved::for_each_contiguous_hunk(s, [](T* base, size_t n) { + cuda_try(pin_memory(base, n)); + }); + + return true; } _CCCL_DIAG_PUSH