Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions docs/overview/advanced-examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ Async/Await Example

A simple example of asynchronous submission can be found below.

You can also use async submissions with Vulkan semaphores to synchronize
Kompute-generated submits with user-managed queue submits.

First we are able to create the manager as we normally would.

.. code-block:: cpp
Expand Down Expand Up @@ -231,17 +234,27 @@ The parameter provided is the maximum amount of time to wait in nanoseconds. Whe
.. code-block:: cpp
:linenos:

auto sq = mgr.sequence();

// Run Async Kompute operation on the parameters provided
sq->evalAsync<kp::OpAlgoDispatch>(algo);
// Optional: pass submit-level synchronization primitives once when
// creating the sequence so every submit waits/signals alongside
// user-managed queue work
std::vector<vk::Semaphore> waitSemaphores = { externalWaitSemaphore };
std::vector<vk::PipelineStageFlags> waitDstStageMasks = {
vk::PipelineStageFlagBits::eComputeShader
};
std::vector<vk::Semaphore> signalSemaphores = { externalSignalSemaphore };
auto sq = mgr.sequence(0, 0, waitSemaphores, waitDstStageMasks, signalSemaphores);
auto opAlgo = std::make_shared<kp::OpAlgoDispatch>(algo);
sq->evalAsync(opAlgo);

// Here we can do other work

// When we're ready we can wait
// When we're ready we can wait
// The default wait time is UINT64_MAX
sq->evalAwait();

``evalAwait()`` must be called before invoking ``evalAsync()`` again on the
same ``Sequence``.


Finally, below you can see that we can also run syncrhonous commands without having to change anything.

Expand Down
2 changes: 1 addition & 1 deletion docs/overview/custom-operations.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Below you
* - preEval()
- When the Sequence is Evaluated this preEval is called across all operations before dispatching the batch of recorded commands to the GPU. This is useful for example if you need to copy data from local to host memory.
* - postEval()
- After the sequence is Evaluated this postEval is called across all operations. When running asynchronously the postEval is called when you call `evalAwait()`, which is why it's important to always run evalAwait() to ensure the process doesn't go into inconsistent state.
- After the sequence is Evaluated this postEval is called across all operations. In asynchronous flows postEval is called when you run `evalAwait()`, and `evalAwait()` must be called before triggering `evalAsync()` again on the same sequence to avoid inconsistent state.


Simple Operation Extending OpAlgoBase
Expand Down
4 changes: 3 additions & 1 deletion python/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,9 @@ PYBIND11_MODULE(kp, m)
py::arg("desired_extensions") = std::vector<std::string>())
.def("destroy", &kp::Manager::destroy, DOC(kp, Manager, destroy))
.def("sequence",
&kp::Manager::sequence,
[](kp::Manager& self, uint32_t queueIndex, uint32_t totalTimestamps) {
return self.sequence(queueIndex, totalTimestamps, {}, {}, {});
},
DOC(kp, Manager, sequence),
py::arg("queue_index") = 0,
py::arg("total_timestamps") = 0)
Expand Down
15 changes: 14 additions & 1 deletion src/Manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -499,10 +499,20 @@ Manager::createDevice(const std::vector<uint32_t>& familyQueueIndices,
}

std::shared_ptr<Sequence>
Manager::sequence(uint32_t queueIndex, uint32_t totalTimestamps)
Manager::sequence(uint32_t queueIndex,
uint32_t totalTimestamps,
const std::vector<vk::Semaphore>& waitSemaphores,
const std::vector<vk::PipelineStageFlags>& waitDstStageMasks,
const std::vector<vk::Semaphore>& signalSemaphores)
{
KP_LOG_DEBUG("Kompute Manager sequence() with queueIndex: {}", queueIndex);

if (!waitDstStageMasks.empty() &&
waitSemaphores.size() != waitDstStageMasks.size()) {
throw std::runtime_error("Kompute Manager sequence() wait semaphore "
"count must match wait dst stage mask count");
}

std::shared_ptr<std::mutex> submitMutex = nullptr;
#ifdef KOMPUTE_OPT_THREAD_SAFE_COMPUTE_QUEUE
submitMutex = this->mSequenceSubmitMutex;
Expand All @@ -514,6 +524,9 @@ Manager::sequence(uint32_t queueIndex, uint32_t totalTimestamps)
this->mComputeQueues[queueIndex],
this->mComputeQueueFamilyIndices[queueIndex],
totalTimestamps,
waitSemaphores,
waitDstStageMasks,
signalSemaphores,
submitMutex) };

if (this->mManageResources) {
Expand Down
35 changes: 32 additions & 3 deletions src/Sequence.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ Sequence::Sequence(std::shared_ptr<vk::PhysicalDevice> physicalDevice,
std::shared_ptr<vk::Queue> computeQueue,
uint32_t queueIndex,
uint32_t totalTimestamps,
const std::vector<vk::Semaphore>& waitSemaphores,
const std::vector<vk::PipelineStageFlags>& waitDstStageMasks,
const std::vector<vk::Semaphore>& signalSemaphores,
std::shared_ptr<std::mutex> submitMutex) noexcept
{
KP_LOG_DEBUG("Kompute Sequence Constructor with existing device & queue");
Expand All @@ -19,6 +22,14 @@ Sequence::Sequence(std::shared_ptr<vk::PhysicalDevice> physicalDevice,
this->mQueueIndex = queueIndex;
this->mFence = this->mDevice->createFence(vk::FenceCreateInfo());
this->mSubmitMutex = submitMutex;
this->mWaitSemaphores = waitSemaphores;
this->mWaitDstStageMasks = waitDstStageMasks;
this->mSignalSemaphores = signalSemaphores;

if (this->mWaitDstStageMasks.empty() && !this->mWaitSemaphores.empty()) {
this->mWaitDstStageMasks.resize(this->mWaitSemaphores.size(),
vk::PipelineStageFlagBits::eAllCommands);
}

this->createCommandPool();
this->createCommandBuffer();
Expand Down Expand Up @@ -127,8 +138,27 @@ Sequence::evalAsync()
this->mOperations[i]->preEval(*this->mCommandBuffer);
}

if (!this->mWaitDstStageMasks.empty() &&
this->mWaitSemaphores.size() != this->mWaitDstStageMasks.size()) {
throw std::runtime_error("Kompute Sequence evalAsync wait semaphore "
"count must match wait dst stage mask count");
}

const vk::Semaphore* waitSemaphoresPtr =
this->mWaitSemaphores.empty() ? nullptr : this->mWaitSemaphores.data();
const vk::PipelineStageFlags* waitDstStageMasksPtr =
this->mWaitDstStageMasks.empty() ? nullptr : this->mWaitDstStageMasks.data();
const vk::Semaphore* signalSemaphoresPtr =
this->mSignalSemaphores.empty() ? nullptr : this->mSignalSemaphores.data();

vk::SubmitInfo submitInfo(
0, nullptr, nullptr, 1, this->mCommandBuffer.get());
static_cast<uint32_t>(this->mWaitSemaphores.size()),
waitSemaphoresPtr,
waitDstStageMasksPtr,
1,
this->mCommandBuffer.get(),
static_cast<uint32_t>(this->mSignalSemaphores.size()),
signalSemaphoresPtr);

KP_LOG_DEBUG(
"Kompute sequence submitting command buffer into compute queue");
Expand Down Expand Up @@ -156,8 +186,7 @@ Sequence::evalAsync(std::shared_ptr<OpBase> op)
{
this->clear();
this->record(op);
this->evalAsync();
return shared_from_this();
return this->evalAsync();
}

std::shared_ptr<Sequence>
Expand Down
15 changes: 14 additions & 1 deletion src/include/kompute/Manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,23 @@ class Manager
* @param queueIndex The queue to use from the available queues
* @param nrOfTimestamps The maximum number of timestamps to allocate.
* If zero (default), disables latching of timestamps.
* @param waitSemaphores Semaphores to wait on before each submit from this
* sequence.
* @param waitDstStageMasks Pipeline stages to use for each wait semaphore.
* If empty and waitSemaphores is not empty, defaults to
* vk::PipelineStageFlagBits::eAllCommands for every wait semaphore.
* @param signalSemaphores Semaphores to signal after each submit from this
* sequence.
* @returns Shared pointer with initialised sequence
*/
std::shared_ptr<Sequence> sequence(uint32_t queueIndex = 0,
uint32_t totalTimestamps = 0);
uint32_t totalTimestamps = 0,
const std::vector<vk::Semaphore>&
waitSemaphores = {},
const std::vector<vk::PipelineStageFlags>&
waitDstStageMasks = {},
const std::vector<vk::Semaphore>&
signalSemaphores = {});

/**
* Create a managed tensor that will be destroyed by this manager
Expand Down
24 changes: 18 additions & 6 deletions src/include/kompute/Sequence.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ class Sequence : public std::enable_shared_from_this<Sequence>
std::shared_ptr<vk::Queue> computeQueue,
uint32_t queueIndex,
uint32_t totalTimestamps = 0,
const std::vector<vk::Semaphore>& waitSemaphores = {},
const std::vector<vk::PipelineStageFlags>& waitDstStageMasks = {},
const std::vector<vk::Semaphore>& signalSemaphores = {},
std::shared_ptr<std::mutex> submitMutex = nullptr) noexcept;

/**
Expand Down Expand Up @@ -157,18 +160,24 @@ class Sequence : public std::enable_shared_from_this<Sequence>

/**
* Eval Async sends all the recorded and stored operations in the vector of
* operations into the gpu as a submit job without a barrier. EvalAwait()
* must ALWAYS be called after to ensure the sequence is terminated
* correctly.
* operations into the gpu as a submit job without a barrier.
* Submit-level wait/signal semaphores are configured when creating the
* Sequence.
*
* evalAwait() must be called before invoking evalAsync() again on this same
* Sequence to complete the previous async run and reset internal state.
*
* @return Boolean stating whether execution was successful.
*/
std::shared_ptr<Sequence> evalAsync();
/**
* Clears currnet operations to record provided one in the vector of
* operations into the gpu as a submit job without a barrier. EvalAwait()
* must ALWAYS be called after to ensure the sequence is terminated
* correctly.
* operations into the gpu as a submit job without a barrier.
* Submit-level wait/signal semaphores are configured when creating the
* Sequence.
*
* evalAwait() must be called before invoking evalAsync() again on this same
* Sequence to complete the previous async run and reset internal state.
*
* @return Boolean stating whether execution was successful.
*/
Expand Down Expand Up @@ -297,6 +306,9 @@ class Sequence : public std::enable_shared_from_this<Sequence>
std::vector<std::shared_ptr<OpBase>> mOperations{};
std::shared_ptr<vk::QueryPool> timestampQueryPool = nullptr;
std::shared_ptr<std::mutex> mSubmitMutex = nullptr;
std::vector<vk::Semaphore> mWaitSemaphores{};
std::vector<vk::PipelineStageFlags> mWaitDstStageMasks{};
std::vector<vk::Semaphore> mSignalSemaphores{};

// State
bool mRecording = false;
Expand Down
54 changes: 54 additions & 0 deletions test/TestSequence.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,57 @@ TEST(TestSequence, CorrectSequenceRunningError)

EXPECT_EQ(tensorOut->vector(), std::vector<float>({ 2, 4, 6 }));
}

TEST(TestSequence, SequenceSubmitSyncSupportsEmptySyncLists)
{
kp::Manager mgr;

std::shared_ptr<kp::Sequence> sq = mgr.sequence();

std::shared_ptr<kp::TensorT<float>> tensorA = mgr.tensor({ 1, 2, 3 });
std::shared_ptr<kp::TensorT<float>> tensorB = mgr.tensor({ 2, 2, 2 });
std::shared_ptr<kp::TensorT<float>> tensorOut = mgr.tensor({ 0, 0, 0 });

sq->eval<kp::OpSyncDevice>({ tensorA, tensorB, tensorOut });

std::vector<uint32_t> spirv = compileSource(R"(
#version 450

layout (local_size_x = 1) in;

layout(set = 0, binding = 0) buffer bina { float tina[]; };
layout(set = 0, binding = 1) buffer binb { float tinb[]; };
layout(set = 0, binding = 2) buffer bout { float tout[]; };

void main() {
uint index = gl_GlobalInvocationID.x;
tout[index] = tina[index] * tinb[index];
}
)");

std::shared_ptr<kp::Algorithm> algo =
mgr.algorithm({ tensorA, tensorB, tensorOut }, spirv);

sq->record<kp::OpAlgoDispatch>(algo)->record<kp::OpSyncLocal>(
{ tensorA, tensorB, tensorOut });

EXPECT_NO_THROW(sq->evalAsync());
EXPECT_NO_THROW(sq->evalAwait());

EXPECT_EQ(tensorOut->vector(), std::vector<float>({ 2, 4, 6 }));
}

TEST(TestSequence, SequenceSubmitSyncValidatesWaitMaskCount)
{
kp::Manager mgr;

std::vector<vk::Semaphore> waitSemaphores = { vk::Semaphore{} };
std::vector<vk::PipelineStageFlags> waitDstStageMasks = {
vk::PipelineStageFlagBits::eComputeShader,
vk::PipelineStageFlagBits::eTransfer
};
std::vector<vk::Semaphore> signalSemaphores = {};

EXPECT_ANY_THROW(mgr.sequence(
0, 0, waitSemaphores, waitDstStageMasks, signalSemaphores));
}