From 4f73bbe18e4a94d02253d3913e5ca8f7d67cad15 Mon Sep 17 00:00:00 2001 From: Michael Tsukanov Date: Mon, 27 Apr 2026 02:33:48 +0300 Subject: [PATCH 1/3] Add semaphore-aware Sequence evalAsync overloads Allow submit-level GPU synchronization by letting Sequence submissions wait on and signal Vulkan semaphores without CPU-side waits. This keeps existing evalAsync behavior while adding validation and coverage for the new overload path. Signed-off-by: Michael Tsukanov --- src/Sequence.cpp | 50 ++++++++++++++++++++++++-- src/include/kompute/Sequence.hpp | 40 +++++++++++++++++++++ test/TestSequence.cpp | 60 ++++++++++++++++++++++++++++++++ 3 files changed, 147 insertions(+), 3 deletions(-) diff --git a/src/Sequence.cpp b/src/Sequence.cpp index 120f9c99..ed199d1f 100644 --- a/src/Sequence.cpp +++ b/src/Sequence.cpp @@ -110,6 +110,14 @@ Sequence::eval(std::shared_ptr op) std::shared_ptr Sequence::evalAsync() +{ + return this->evalAsync({}, {}, {}); +} + +std::shared_ptr +Sequence::evalAsync(const std::vector& waitSemaphores, + const std::vector& waitDstStageMasks, + const std::vector& signalSemaphores) { if (this->isRecording()) { this->end(); @@ -127,8 +135,35 @@ Sequence::evalAsync() this->mOperations[i]->preEval(*this->mCommandBuffer); } + if (!waitDstStageMasks.empty() && + waitSemaphores.size() != waitDstStageMasks.size()) { + throw std::runtime_error("Kompute Sequence evalAsync wait semaphore " + "count must match wait dst stage mask count"); + } + + std::vector resolvedWaitDstStageMasks = + waitDstStageMasks; + if (resolvedWaitDstStageMasks.empty() && !waitSemaphores.empty()) { + resolvedWaitDstStageMasks.resize(waitSemaphores.size(), + vk::PipelineStageFlagBits::eAllCommands); + } + + const vk::Semaphore* waitSemaphoresPtr = + waitSemaphores.empty() ? nullptr : waitSemaphores.data(); + const vk::PipelineStageFlags* waitDstStageMasksPtr = + resolvedWaitDstStageMasks.empty() ? nullptr + : resolvedWaitDstStageMasks.data(); + const vk::Semaphore* signalSemaphoresPtr = + signalSemaphores.empty() ? nullptr : signalSemaphores.data(); + vk::SubmitInfo submitInfo( - 0, nullptr, nullptr, 1, this->mCommandBuffer.get()); + static_cast(waitSemaphores.size()), + waitSemaphoresPtr, + waitDstStageMasksPtr, + 1, + this->mCommandBuffer.get(), + static_cast(signalSemaphores.size()), + signalSemaphoresPtr); KP_LOG_DEBUG( "Kompute sequence submitting command buffer into compute queue"); @@ -153,11 +188,20 @@ Sequence::submitCommandBuffer(const vk::SubmitInfo& submitInfo) std::shared_ptr Sequence::evalAsync(std::shared_ptr op) +{ + return this->evalAsync(op, {}, {}, {}); +} + +std::shared_ptr +Sequence::evalAsync(std::shared_ptr op, + const std::vector& waitSemaphores, + const std::vector& waitDstStageMasks, + const std::vector& signalSemaphores) { this->clear(); this->record(op); - this->evalAsync(); - return shared_from_this(); + return this->evalAsync( + waitSemaphores, waitDstStageMasks, signalSemaphores); } std::shared_ptr diff --git a/src/include/kompute/Sequence.hpp b/src/include/kompute/Sequence.hpp index bea23fc2..39eb9a81 100644 --- a/src/include/kompute/Sequence.hpp +++ b/src/include/kompute/Sequence.hpp @@ -164,6 +164,25 @@ class Sequence : public std::enable_shared_from_this * @return Boolean stating whether execution was successful. */ std::shared_ptr evalAsync(); + /** + * Eval Async sends all recorded operations as a submit job and allows + * submit-level GPU synchronization by providing wait and signal semaphores. + * EvalAwait() must ALWAYS be called after to ensure the sequence is + * terminated correctly. + * + * @param waitSemaphores Semaphores that must be signaled before this submit + * starts executing. + * @param waitDstStageMasks Pipeline stages at which to wait for each + * semaphore. If empty and waitSemaphores is not empty, defaults to + * vk::PipelineStageFlagBits::eAllCommands for each wait semaphore. + * @param signalSemaphores Semaphores that this submit will signal when it + * completes. + * @return shared_ptr of the Sequence class itself + */ + std::shared_ptr evalAsync( + const std::vector& waitSemaphores, + const std::vector& waitDstStageMasks, + const std::vector& signalSemaphores); /** * Clears currnet operations to record provided one in the vector of * operations into the gpu as a submit job without a barrier. EvalAwait() @@ -173,6 +192,27 @@ class Sequence : public std::enable_shared_from_this * @return Boolean stating whether execution was successful. */ std::shared_ptr evalAsync(std::shared_ptr op); + /** + * Clears current operations, records the provided one and submits with + * optional wait/signal semaphores for submit-level GPU synchronization. + * EvalAwait() must ALWAYS be called after to ensure the sequence is + * terminated correctly. + * + * @param op Operation to record prior to submit. + * @param waitSemaphores Semaphores that must be signaled before this submit + * starts executing. + * @param waitDstStageMasks Pipeline stages at which to wait for each + * semaphore. If empty and waitSemaphores is not empty, defaults to + * vk::PipelineStageFlagBits::eAllCommands for each wait semaphore. + * @param signalSemaphores Semaphores that this submit will signal when it + * completes. + * @return shared_ptr of the Sequence class itself + */ + std::shared_ptr evalAsync( + std::shared_ptr op, + const std::vector& waitSemaphores, + const std::vector& waitDstStageMasks, + const std::vector& signalSemaphores); /** * Eval sends all the recorded and stored operations in the vector of * operations into the gpu as a submit job with a barrier. diff --git a/test/TestSequence.cpp b/test/TestSequence.cpp index 3a4cce66..6892a7a3 100644 --- a/test/TestSequence.cpp +++ b/test/TestSequence.cpp @@ -243,3 +243,63 @@ TEST(TestSequence, CorrectSequenceRunningError) EXPECT_EQ(tensorOut->vector(), std::vector({ 2, 4, 6 })); } + +TEST(TestSequence, EvalAsyncSemaphoreOverloadSupportsEmptySyncLists) +{ + kp::Manager mgr; + + std::shared_ptr sq = mgr.sequence(); + + std::shared_ptr> tensorA = mgr.tensor({ 1, 2, 3 }); + std::shared_ptr> tensorB = mgr.tensor({ 2, 2, 2 }); + std::shared_ptr> tensorOut = mgr.tensor({ 0, 0, 0 }); + + sq->eval({ tensorA, tensorB, tensorOut }); + + std::vector spirv = compileSource(R"( + #version 450 + + layout (local_size_x = 1) in; + + layout(set = 0, binding = 0) buffer bina { float tina[]; }; + layout(set = 0, binding = 1) buffer binb { float tinb[]; }; + layout(set = 0, binding = 2) buffer bout { float tout[]; }; + + void main() { + uint index = gl_GlobalInvocationID.x; + tout[index] = tina[index] * tinb[index]; + } + )"); + + std::shared_ptr algo = + mgr.algorithm({ tensorA, tensorB, tensorOut }, spirv); + + sq->record(algo)->record( + { tensorA, tensorB, tensorOut }); + + EXPECT_NO_THROW(sq->evalAsync({}, {}, {})); + EXPECT_NO_THROW(sq->evalAwait()); + + EXPECT_EQ(tensorOut->vector(), std::vector({ 2, 4, 6 })); +} + +TEST(TestSequence, EvalAsyncSemaphoreOverloadValidatesWaitMaskCount) +{ + kp::Manager mgr; + + std::shared_ptr sq = mgr.sequence(); + + std::shared_ptr> tensorA = mgr.tensor({ 1, 2, 3 }); + + sq->record({ tensorA }); + + std::vector waitSemaphores = { vk::Semaphore{} }; + std::vector waitDstStageMasks = { + vk::PipelineStageFlagBits::eComputeShader, + vk::PipelineStageFlagBits::eTransfer + }; + std::vector signalSemaphores = {}; + + EXPECT_ANY_THROW( + sq->evalAsync(waitSemaphores, waitDstStageMasks, signalSemaphores)); +} From 7a928ff9abb958eecaf042d62ba98bca2d66b927 Mon Sep 17 00:00:00 2001 From: Michael Tsukanov Date: Mon, 27 Apr 2026 02:55:59 +0300 Subject: [PATCH 2/3] Update docs for semaphore-aware Sequence evalAsync overload Signed-off-by: Michael Tsukanov --- docs/overview/advanced-examples.rst | 19 ++++++++++++++++--- docs/overview/custom-operations.rst | 2 +- src/include/kompute/Sequence.hpp | 28 ++++++++++++++++++---------- 3 files changed, 35 insertions(+), 14 deletions(-) diff --git a/docs/overview/advanced-examples.rst b/docs/overview/advanced-examples.rst index 8c8407f5..315ce167 100644 --- a/docs/overview/advanced-examples.rst +++ b/docs/overview/advanced-examples.rst @@ -162,6 +162,9 @@ Async/Await Example A simple example of asynchronous submission can be found below. +You can also use async submissions with Vulkan semaphores to synchronize +Kompute-generated submits with user-managed queue submits. + First we are able to create the manager as we normally would. .. code-block:: cpp @@ -233,15 +236,25 @@ The parameter provided is the maximum amount of time to wait in nanoseconds. Whe auto sq = mgr.sequence(); - // Run Async Kompute operation on the parameters provided - sq->evalAsync(algo); + // Optional: pass submit-level synchronization primitives so this submit + // waits/signals alongside user-managed queue work + std::vector waitSemaphores = { externalWaitSemaphore }; + std::vector waitDstStageMasks = { + vk::PipelineStageFlagBits::eComputeShader + }; + std::vector signalSemaphores = { externalSignalSemaphore }; + auto opAlgo = std::make_shared(algo); + sq->evalAsync(opAlgo, waitSemaphores, waitDstStageMasks, signalSemaphores); // Here we can do other work - // When we're ready we can wait + // When we're ready we can wait // The default wait time is UINT64_MAX sq->evalAwait(); +``evalAwait()`` must be called before invoking ``evalAsync()`` again on the +same ``Sequence``. + Finally, below you can see that we can also run syncrhonous commands without having to change anything. diff --git a/docs/overview/custom-operations.rst b/docs/overview/custom-operations.rst index d5ac07b6..ed4bb93f 100644 --- a/docs/overview/custom-operations.rst +++ b/docs/overview/custom-operations.rst @@ -33,7 +33,7 @@ Below you * - preEval() - When the Sequence is Evaluated this preEval is called across all operations before dispatching the batch of recorded commands to the GPU. This is useful for example if you need to copy data from local to host memory. * - postEval() - - After the sequence is Evaluated this postEval is called across all operations. When running asynchronously the postEval is called when you call `evalAwait()`, which is why it's important to always run evalAwait() to ensure the process doesn't go into inconsistent state. + - After the sequence is Evaluated this postEval is called across all operations. In asynchronous flows postEval is called when you run `evalAwait()`, and `evalAwait()` must be called before triggering `evalAsync()` again on the same sequence to avoid inconsistent state. Simple Operation Extending OpAlgoBase diff --git a/src/include/kompute/Sequence.hpp b/src/include/kompute/Sequence.hpp index 39eb9a81..8064a7e3 100644 --- a/src/include/kompute/Sequence.hpp +++ b/src/include/kompute/Sequence.hpp @@ -157,9 +157,10 @@ class Sequence : public std::enable_shared_from_this /** * Eval Async sends all the recorded and stored operations in the vector of - * operations into the gpu as a submit job without a barrier. EvalAwait() - * must ALWAYS be called after to ensure the sequence is terminated - * correctly. + * operations into the gpu as a submit job without a barrier. + * + * evalAwait() must be called before invoking evalAsync() again on this same + * Sequence to complete the previous async run and reset internal state. * * @return Boolean stating whether execution was successful. */ @@ -167,8 +168,11 @@ class Sequence : public std::enable_shared_from_this /** * Eval Async sends all recorded operations as a submit job and allows * submit-level GPU synchronization by providing wait and signal semaphores. - * EvalAwait() must ALWAYS be called after to ensure the sequence is - * terminated correctly. + * + * This overload is useful for synchronizing Kompute submissions with + * user-managed queue submissions without forcing CPU-side synchronization. + * evalAwait() must be called before invoking evalAsync() again on this same + * Sequence to complete the previous async run and reset internal state. * * @param waitSemaphores Semaphores that must be signaled before this submit * starts executing. @@ -185,9 +189,10 @@ class Sequence : public std::enable_shared_from_this const std::vector& signalSemaphores); /** * Clears currnet operations to record provided one in the vector of - * operations into the gpu as a submit job without a barrier. EvalAwait() - * must ALWAYS be called after to ensure the sequence is terminated - * correctly. + * operations into the gpu as a submit job without a barrier. + * + * evalAwait() must be called before invoking evalAsync() again on this same + * Sequence to complete the previous async run and reset internal state. * * @return Boolean stating whether execution was successful. */ @@ -195,8 +200,11 @@ class Sequence : public std::enable_shared_from_this /** * Clears current operations, records the provided one and submits with * optional wait/signal semaphores for submit-level GPU synchronization. - * EvalAwait() must ALWAYS be called after to ensure the sequence is - * terminated correctly. + * + * This overload is useful for synchronizing Kompute submissions with + * user-managed queue submissions without forcing CPU-side synchronization. + * evalAwait() must be called before invoking evalAsync() again on this same + * Sequence to complete the previous async run and reset internal state. * * @param op Operation to record prior to submit. * @param waitSemaphores Semaphores that must be signaled before this submit From 0857de65615b184dfd876b97ff3fe5ed2025e7d1 Mon Sep 17 00:00:00 2001 From: Michael Tsukanov Date: Wed, 13 May 2026 14:32:18 +0300 Subject: [PATCH 3/3] Move semaphore passing to sequence constructor --- docs/overview/advanced-examples.rst | 10 +++--- python/src/main.cpp | 4 ++- src/Manager.cpp | 15 +++++++- src/Sequence.cpp | 53 ++++++++++----------------- src/include/kompute/Manager.hpp | 15 +++++++- src/include/kompute/Sequence.hpp | 56 ++++++----------------------- test/TestSequence.cpp | 16 +++------ 7 files changed, 70 insertions(+), 99 deletions(-) diff --git a/docs/overview/advanced-examples.rst b/docs/overview/advanced-examples.rst index 315ce167..7611a2ee 100644 --- a/docs/overview/advanced-examples.rst +++ b/docs/overview/advanced-examples.rst @@ -234,17 +234,17 @@ The parameter provided is the maximum amount of time to wait in nanoseconds. Whe .. code-block:: cpp :linenos: - auto sq = mgr.sequence(); - - // Optional: pass submit-level synchronization primitives so this submit - // waits/signals alongside user-managed queue work + // Optional: pass submit-level synchronization primitives once when + // creating the sequence so every submit waits/signals alongside + // user-managed queue work std::vector waitSemaphores = { externalWaitSemaphore }; std::vector waitDstStageMasks = { vk::PipelineStageFlagBits::eComputeShader }; std::vector signalSemaphores = { externalSignalSemaphore }; + auto sq = mgr.sequence(0, 0, waitSemaphores, waitDstStageMasks, signalSemaphores); auto opAlgo = std::make_shared(algo); - sq->evalAsync(opAlgo, waitSemaphores, waitDstStageMasks, signalSemaphores); + sq->evalAsync(opAlgo); // Here we can do other work diff --git a/python/src/main.cpp b/python/src/main.cpp index 8bec2970..92b9b48b 100644 --- a/python/src/main.cpp +++ b/python/src/main.cpp @@ -336,7 +336,9 @@ PYBIND11_MODULE(kp, m) py::arg("desired_extensions") = std::vector()) .def("destroy", &kp::Manager::destroy, DOC(kp, Manager, destroy)) .def("sequence", - &kp::Manager::sequence, + [](kp::Manager& self, uint32_t queueIndex, uint32_t totalTimestamps) { + return self.sequence(queueIndex, totalTimestamps, {}, {}, {}); + }, DOC(kp, Manager, sequence), py::arg("queue_index") = 0, py::arg("total_timestamps") = 0) diff --git a/src/Manager.cpp b/src/Manager.cpp index e5a56d0e..720baa9d 100644 --- a/src/Manager.cpp +++ b/src/Manager.cpp @@ -499,10 +499,20 @@ Manager::createDevice(const std::vector& familyQueueIndices, } std::shared_ptr -Manager::sequence(uint32_t queueIndex, uint32_t totalTimestamps) +Manager::sequence(uint32_t queueIndex, + uint32_t totalTimestamps, + const std::vector& waitSemaphores, + const std::vector& waitDstStageMasks, + const std::vector& signalSemaphores) { KP_LOG_DEBUG("Kompute Manager sequence() with queueIndex: {}", queueIndex); + if (!waitDstStageMasks.empty() && + waitSemaphores.size() != waitDstStageMasks.size()) { + throw std::runtime_error("Kompute Manager sequence() wait semaphore " + "count must match wait dst stage mask count"); + } + std::shared_ptr submitMutex = nullptr; #ifdef KOMPUTE_OPT_THREAD_SAFE_COMPUTE_QUEUE submitMutex = this->mSequenceSubmitMutex; @@ -514,6 +524,9 @@ Manager::sequence(uint32_t queueIndex, uint32_t totalTimestamps) this->mComputeQueues[queueIndex], this->mComputeQueueFamilyIndices[queueIndex], totalTimestamps, + waitSemaphores, + waitDstStageMasks, + signalSemaphores, submitMutex) }; if (this->mManageResources) { diff --git a/src/Sequence.cpp b/src/Sequence.cpp index ed199d1f..5b9e5eb3 100644 --- a/src/Sequence.cpp +++ b/src/Sequence.cpp @@ -9,6 +9,9 @@ Sequence::Sequence(std::shared_ptr physicalDevice, std::shared_ptr computeQueue, uint32_t queueIndex, uint32_t totalTimestamps, + const std::vector& waitSemaphores, + const std::vector& waitDstStageMasks, + const std::vector& signalSemaphores, std::shared_ptr submitMutex) noexcept { KP_LOG_DEBUG("Kompute Sequence Constructor with existing device & queue"); @@ -19,6 +22,14 @@ Sequence::Sequence(std::shared_ptr physicalDevice, this->mQueueIndex = queueIndex; this->mFence = this->mDevice->createFence(vk::FenceCreateInfo()); this->mSubmitMutex = submitMutex; + this->mWaitSemaphores = waitSemaphores; + this->mWaitDstStageMasks = waitDstStageMasks; + this->mSignalSemaphores = signalSemaphores; + + if (this->mWaitDstStageMasks.empty() && !this->mWaitSemaphores.empty()) { + this->mWaitDstStageMasks.resize(this->mWaitSemaphores.size(), + vk::PipelineStageFlagBits::eAllCommands); + } this->createCommandPool(); this->createCommandBuffer(); @@ -110,14 +121,6 @@ Sequence::eval(std::shared_ptr op) std::shared_ptr Sequence::evalAsync() -{ - return this->evalAsync({}, {}, {}); -} - -std::shared_ptr -Sequence::evalAsync(const std::vector& waitSemaphores, - const std::vector& waitDstStageMasks, - const std::vector& signalSemaphores) { if (this->isRecording()) { this->end(); @@ -135,34 +138,26 @@ Sequence::evalAsync(const std::vector& waitSemaphores, this->mOperations[i]->preEval(*this->mCommandBuffer); } - if (!waitDstStageMasks.empty() && - waitSemaphores.size() != waitDstStageMasks.size()) { + if (!this->mWaitDstStageMasks.empty() && + this->mWaitSemaphores.size() != this->mWaitDstStageMasks.size()) { throw std::runtime_error("Kompute Sequence evalAsync wait semaphore " "count must match wait dst stage mask count"); } - std::vector resolvedWaitDstStageMasks = - waitDstStageMasks; - if (resolvedWaitDstStageMasks.empty() && !waitSemaphores.empty()) { - resolvedWaitDstStageMasks.resize(waitSemaphores.size(), - vk::PipelineStageFlagBits::eAllCommands); - } - const vk::Semaphore* waitSemaphoresPtr = - waitSemaphores.empty() ? nullptr : waitSemaphores.data(); + this->mWaitSemaphores.empty() ? nullptr : this->mWaitSemaphores.data(); const vk::PipelineStageFlags* waitDstStageMasksPtr = - resolvedWaitDstStageMasks.empty() ? nullptr - : resolvedWaitDstStageMasks.data(); + this->mWaitDstStageMasks.empty() ? nullptr : this->mWaitDstStageMasks.data(); const vk::Semaphore* signalSemaphoresPtr = - signalSemaphores.empty() ? nullptr : signalSemaphores.data(); + this->mSignalSemaphores.empty() ? nullptr : this->mSignalSemaphores.data(); vk::SubmitInfo submitInfo( - static_cast(waitSemaphores.size()), + static_cast(this->mWaitSemaphores.size()), waitSemaphoresPtr, waitDstStageMasksPtr, 1, this->mCommandBuffer.get(), - static_cast(signalSemaphores.size()), + static_cast(this->mSignalSemaphores.size()), signalSemaphoresPtr); KP_LOG_DEBUG( @@ -188,20 +183,10 @@ Sequence::submitCommandBuffer(const vk::SubmitInfo& submitInfo) std::shared_ptr Sequence::evalAsync(std::shared_ptr op) -{ - return this->evalAsync(op, {}, {}, {}); -} - -std::shared_ptr -Sequence::evalAsync(std::shared_ptr op, - const std::vector& waitSemaphores, - const std::vector& waitDstStageMasks, - const std::vector& signalSemaphores) { this->clear(); this->record(op); - return this->evalAsync( - waitSemaphores, waitDstStageMasks, signalSemaphores); + return this->evalAsync(); } std::shared_ptr diff --git a/src/include/kompute/Manager.hpp b/src/include/kompute/Manager.hpp index d76b10d0..49f2e1b4 100644 --- a/src/include/kompute/Manager.hpp +++ b/src/include/kompute/Manager.hpp @@ -77,10 +77,23 @@ class Manager * @param queueIndex The queue to use from the available queues * @param nrOfTimestamps The maximum number of timestamps to allocate. * If zero (default), disables latching of timestamps. + * @param waitSemaphores Semaphores to wait on before each submit from this + * sequence. + * @param waitDstStageMasks Pipeline stages to use for each wait semaphore. + * If empty and waitSemaphores is not empty, defaults to + * vk::PipelineStageFlagBits::eAllCommands for every wait semaphore. + * @param signalSemaphores Semaphores to signal after each submit from this + * sequence. * @returns Shared pointer with initialised sequence */ std::shared_ptr sequence(uint32_t queueIndex = 0, - uint32_t totalTimestamps = 0); + uint32_t totalTimestamps = 0, + const std::vector& + waitSemaphores = {}, + const std::vector& + waitDstStageMasks = {}, + const std::vector& + signalSemaphores = {}); /** * Create a managed tensor that will be destroyed by this manager diff --git a/src/include/kompute/Sequence.hpp b/src/include/kompute/Sequence.hpp index 8064a7e3..489030a8 100644 --- a/src/include/kompute/Sequence.hpp +++ b/src/include/kompute/Sequence.hpp @@ -31,6 +31,9 @@ class Sequence : public std::enable_shared_from_this std::shared_ptr computeQueue, uint32_t queueIndex, uint32_t totalTimestamps = 0, + const std::vector& waitSemaphores = {}, + const std::vector& waitDstStageMasks = {}, + const std::vector& signalSemaphores = {}, std::shared_ptr submitMutex = nullptr) noexcept; /** @@ -158,6 +161,8 @@ class Sequence : public std::enable_shared_from_this /** * Eval Async sends all the recorded and stored operations in the vector of * operations into the gpu as a submit job without a barrier. + * Submit-level wait/signal semaphores are configured when creating the + * Sequence. * * evalAwait() must be called before invoking evalAsync() again on this same * Sequence to complete the previous async run and reset internal state. @@ -165,31 +170,11 @@ class Sequence : public std::enable_shared_from_this * @return Boolean stating whether execution was successful. */ std::shared_ptr evalAsync(); - /** - * Eval Async sends all recorded operations as a submit job and allows - * submit-level GPU synchronization by providing wait and signal semaphores. - * - * This overload is useful for synchronizing Kompute submissions with - * user-managed queue submissions without forcing CPU-side synchronization. - * evalAwait() must be called before invoking evalAsync() again on this same - * Sequence to complete the previous async run and reset internal state. - * - * @param waitSemaphores Semaphores that must be signaled before this submit - * starts executing. - * @param waitDstStageMasks Pipeline stages at which to wait for each - * semaphore. If empty and waitSemaphores is not empty, defaults to - * vk::PipelineStageFlagBits::eAllCommands for each wait semaphore. - * @param signalSemaphores Semaphores that this submit will signal when it - * completes. - * @return shared_ptr of the Sequence class itself - */ - std::shared_ptr evalAsync( - const std::vector& waitSemaphores, - const std::vector& waitDstStageMasks, - const std::vector& signalSemaphores); /** * Clears currnet operations to record provided one in the vector of * operations into the gpu as a submit job without a barrier. + * Submit-level wait/signal semaphores are configured when creating the + * Sequence. * * evalAwait() must be called before invoking evalAsync() again on this same * Sequence to complete the previous async run and reset internal state. @@ -197,30 +182,6 @@ class Sequence : public std::enable_shared_from_this * @return Boolean stating whether execution was successful. */ std::shared_ptr evalAsync(std::shared_ptr op); - /** - * Clears current operations, records the provided one and submits with - * optional wait/signal semaphores for submit-level GPU synchronization. - * - * This overload is useful for synchronizing Kompute submissions with - * user-managed queue submissions without forcing CPU-side synchronization. - * evalAwait() must be called before invoking evalAsync() again on this same - * Sequence to complete the previous async run and reset internal state. - * - * @param op Operation to record prior to submit. - * @param waitSemaphores Semaphores that must be signaled before this submit - * starts executing. - * @param waitDstStageMasks Pipeline stages at which to wait for each - * semaphore. If empty and waitSemaphores is not empty, defaults to - * vk::PipelineStageFlagBits::eAllCommands for each wait semaphore. - * @param signalSemaphores Semaphores that this submit will signal when it - * completes. - * @return shared_ptr of the Sequence class itself - */ - std::shared_ptr evalAsync( - std::shared_ptr op, - const std::vector& waitSemaphores, - const std::vector& waitDstStageMasks, - const std::vector& signalSemaphores); /** * Eval sends all the recorded and stored operations in the vector of * operations into the gpu as a submit job with a barrier. @@ -345,6 +306,9 @@ class Sequence : public std::enable_shared_from_this std::vector> mOperations{}; std::shared_ptr timestampQueryPool = nullptr; std::shared_ptr mSubmitMutex = nullptr; + std::vector mWaitSemaphores{}; + std::vector mWaitDstStageMasks{}; + std::vector mSignalSemaphores{}; // State bool mRecording = false; diff --git a/test/TestSequence.cpp b/test/TestSequence.cpp index 6892a7a3..26641bd4 100644 --- a/test/TestSequence.cpp +++ b/test/TestSequence.cpp @@ -244,7 +244,7 @@ TEST(TestSequence, CorrectSequenceRunningError) EXPECT_EQ(tensorOut->vector(), std::vector({ 2, 4, 6 })); } -TEST(TestSequence, EvalAsyncSemaphoreOverloadSupportsEmptySyncLists) +TEST(TestSequence, SequenceSubmitSyncSupportsEmptySyncLists) { kp::Manager mgr; @@ -277,22 +277,16 @@ TEST(TestSequence, EvalAsyncSemaphoreOverloadSupportsEmptySyncLists) sq->record(algo)->record( { tensorA, tensorB, tensorOut }); - EXPECT_NO_THROW(sq->evalAsync({}, {}, {})); + EXPECT_NO_THROW(sq->evalAsync()); EXPECT_NO_THROW(sq->evalAwait()); EXPECT_EQ(tensorOut->vector(), std::vector({ 2, 4, 6 })); } -TEST(TestSequence, EvalAsyncSemaphoreOverloadValidatesWaitMaskCount) +TEST(TestSequence, SequenceSubmitSyncValidatesWaitMaskCount) { kp::Manager mgr; - std::shared_ptr sq = mgr.sequence(); - - std::shared_ptr> tensorA = mgr.tensor({ 1, 2, 3 }); - - sq->record({ tensorA }); - std::vector waitSemaphores = { vk::Semaphore{} }; std::vector waitDstStageMasks = { vk::PipelineStageFlagBits::eComputeShader, @@ -300,6 +294,6 @@ TEST(TestSequence, EvalAsyncSemaphoreOverloadValidatesWaitMaskCount) }; std::vector signalSemaphores = {}; - EXPECT_ANY_THROW( - sq->evalAsync(waitSemaphores, waitDstStageMasks, signalSemaphores)); + EXPECT_ANY_THROW(mgr.sequence( + 0, 0, waitSemaphores, waitDstStageMasks, signalSemaphores)); }