Skip to content

Commit 5622ce6

Browse files
committed
Implmeneted option for mutex in Sequence and Manager to ensure thread-safety
Signed-off-by: Troels Ynddal <troels@ynddal.dk>
1 parent 6e5380d commit 5622ce6

5 files changed

Lines changed: 56 additions & 8 deletions

File tree

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ kompute_option(KOMPUTE_OPT_ANDROID_BUILD "Enable android compilation flags requi
101101
kompute_option(KOMPUTE_OPT_DISABLE_VK_DEBUG_LAYERS "Explicitly disable debug layers even on debug." OFF)
102102
kompute_option(KOMPUTE_OPT_DISABLE_VULKAN_VERSION_CHECK "Whether to check if your driver supports the Vulkan Header version you are linking against. This might be useful in case you build shared on a different system than you run later." OFF)
103103
kompute_option(KOMPUTE_OPT_BUILD_SHADERS "Rebuilds all compute shaders during compilation and does not use the already precompiled versions. Requires glslangValidator to be installed on your system." OFF)
104+
kompute_option(KOMPUTE_OPT_THREAD_SAFE_COMPUTE_QUEUE "Enable manager-owned mutex protection around sequence queue submissions." OFF)
104105

105106
# External components
106107
kompute_option(KOMPUTE_OPT_USE_BUILT_IN_SPDLOG "Use the built-in version of Spdlog. Requires 'KOMPUTE_OPT_USE_SPDLOG' to be set to ON in order to have any effect." ON)

src/Manager.cpp

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ Manager::Manager(uint32_t physicalDeviceIndex,
5555
{
5656
this->mManageResources = true;
5757

58+
#ifdef KOMPUTE_OPT_THREAD_SAFE_COMPUTE_QUEUE
59+
this->mSequenceSubmitMutex = std::make_shared<std::mutex>();
60+
#endif
61+
5862
// Make sure the logger is setup
5963
#if !KOMPUTE_OPT_LOG_LEVEL_DISABLED
6064
logger::setupLogger();
@@ -71,6 +75,10 @@ Manager::Manager(std::shared_ptr<vk::Instance> instance,
7175
{
7276
this->mManageResources = false;
7377

78+
#ifdef KOMPUTE_OPT_THREAD_SAFE_COMPUTE_QUEUE
79+
this->mSequenceSubmitMutex = std::make_shared<std::mutex>();
80+
#endif
81+
7482
this->mInstance = instance;
7583
this->mPhysicalDevice = physicalDevice;
7684
this->mDevice = device;
@@ -495,12 +503,18 @@ Manager::sequence(uint32_t queueIndex, uint32_t totalTimestamps)
495503
{
496504
KP_LOG_DEBUG("Kompute Manager sequence() with queueIndex: {}", queueIndex);
497505

506+
std::shared_ptr<std::mutex> submitMutex = nullptr;
507+
#ifdef KOMPUTE_OPT_THREAD_SAFE_COMPUTE_QUEUE
508+
submitMutex = this->mSequenceSubmitMutex;
509+
#endif
510+
498511
std::shared_ptr<Sequence> sq{ new kp::Sequence(
499-
this->mPhysicalDevice,
500-
this->mDevice,
501-
this->mComputeQueues[queueIndex],
502-
this->mComputeQueueFamilyIndices[queueIndex],
503-
totalTimestamps) };
512+
this->mPhysicalDevice,
513+
this->mDevice,
514+
this->mComputeQueues[queueIndex],
515+
this->mComputeQueueFamilyIndices[queueIndex],
516+
totalTimestamps,
517+
submitMutex) };
504518

505519
if (this->mManageResources) {
506520
this->mManagedSequences.push_back(sq);

src/Sequence.cpp

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ Sequence::Sequence(std::shared_ptr<vk::PhysicalDevice> physicalDevice,
88
std::shared_ptr<vk::Device> device,
99
std::shared_ptr<vk::Queue> computeQueue,
1010
uint32_t queueIndex,
11-
uint32_t totalTimestamps) noexcept
11+
uint32_t totalTimestamps,
12+
std::shared_ptr<std::mutex> submitMutex) noexcept
1213
{
1314
KP_LOG_DEBUG("Kompute Sequence Constructor with existing device & queue");
1415

@@ -17,6 +18,7 @@ Sequence::Sequence(std::shared_ptr<vk::PhysicalDevice> physicalDevice,
1718
this->mComputeQueue = computeQueue;
1819
this->mQueueIndex = queueIndex;
1920
this->mFence = this->mDevice->createFence(vk::FenceCreateInfo());
21+
this->mSubmitMutex = submitMutex;
2022

2123
this->createCommandPool();
2224
this->createCommandBuffer();
@@ -133,11 +135,22 @@ Sequence::evalAsync()
133135

134136
this->mDevice->resetFences({ this->mFence });
135137

136-
this->mComputeQueue->submit(1, &submitInfo, this->mFence);
138+
this->submitCommandBuffer(submitInfo);
137139

138140
return shared_from_this();
139141
}
140142

143+
void
144+
Sequence::submitCommandBuffer(const vk::SubmitInfo& submitInfo)
145+
{
146+
if (this->mSubmitMutex) {
147+
std::lock_guard<std::mutex> submitLock(*this->mSubmitMutex);
148+
this->mComputeQueue->submit(1, &submitInfo, this->mFence);
149+
} else {
150+
this->mComputeQueue->submit(1, &submitInfo, this->mFence);
151+
}
152+
}
153+
141154
std::shared_ptr<Sequence>
142155
Sequence::evalAsync(std::shared_ptr<OpBase> op)
143156
{

src/include/kompute/Manager.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77
#include "kompute/Sequence.hpp"
88
#include "logger/Logger.hpp"
99

10+
#ifdef KOMPUTE_OPT_THREAD_SAFE_COMPUTE_QUEUE
11+
#include <mutex>
12+
#endif
13+
1014
#define KP_DEFAULT_SESSION "DEFAULT"
1115

1216
namespace kp {
@@ -546,6 +550,10 @@ class Manager
546550
std::vector<uint32_t> mComputeQueueFamilyIndices;
547551
std::vector<std::shared_ptr<vk::Queue>> mComputeQueues;
548552

553+
#ifdef KOMPUTE_OPT_THREAD_SAFE_COMPUTE_QUEUE
554+
std::shared_ptr<std::mutex> mSequenceSubmitMutex = nullptr;
555+
#endif
556+
549557
bool mManageResources = false;
550558

551559
#ifndef KOMPUTE_DISABLE_VK_DEBUG_LAYERS

src/include/kompute/Sequence.hpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
#include "kompute/operations/OpAlgoDispatch.hpp"
77
#include "kompute/operations/OpBase.hpp"
8+
#include <mutex>
9+
#include <functional>
810

911
namespace kp {
1012

@@ -28,7 +30,8 @@ class Sequence : public std::enable_shared_from_this<Sequence>
2830
std::shared_ptr<vk::Device> device,
2931
std::shared_ptr<vk::Queue> computeQueue,
3032
uint32_t queueIndex,
31-
uint32_t totalTimestamps = 0) noexcept;
33+
uint32_t totalTimestamps = 0,
34+
std::shared_ptr<std::mutex> submitMutex = nullptr) noexcept;
3235

3336
/**
3437
* @brief Make Sequence uncopyable
@@ -293,11 +296,20 @@ class Sequence : public std::enable_shared_from_this<Sequence>
293296
vk::Fence mFence;
294297
std::vector<std::shared_ptr<OpBase>> mOperations{};
295298
std::shared_ptr<vk::QueryPool> timestampQueryPool = nullptr;
299+
std::shared_ptr<std::mutex> mSubmitMutex = nullptr;
296300

297301
// State
298302
bool mRecording = false;
299303
bool mIsRunning = false;
300304

305+
/**
306+
* Submits the command buffer to the compute queue, acquiring the submit
307+
* mutex beforehand if one was provided.
308+
*
309+
* @param submitInfo The Vulkan submit info to pass to the queue.
310+
*/
311+
void submitCommandBuffer(const vk::SubmitInfo& submitInfo);
312+
301313
private:
302314
// Create functions
303315
void createCommandPool();

0 commit comments

Comments
 (0)