Skip to content

Commit 4f73bbe

Browse files
committed
Add semaphore-aware Sequence evalAsync overloads
Allow submit-level GPU synchronization by letting Sequence submissions wait on and signal Vulkan semaphores without CPU-side waits. This keeps existing evalAsync behavior while adding validation and coverage for the new overload path. Signed-off-by: Michael Tsukanov <tsukanovWOW@ya.ru>
1 parent 6160e78 commit 4f73bbe

3 files changed

Lines changed: 147 additions & 3 deletions

File tree

src/Sequence.cpp

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,14 @@ Sequence::eval(std::shared_ptr<OpBase> op)
110110

111111
std::shared_ptr<Sequence>
112112
Sequence::evalAsync()
113+
{
114+
return this->evalAsync({}, {}, {});
115+
}
116+
117+
std::shared_ptr<Sequence>
118+
Sequence::evalAsync(const std::vector<vk::Semaphore>& waitSemaphores,
119+
const std::vector<vk::PipelineStageFlags>& waitDstStageMasks,
120+
const std::vector<vk::Semaphore>& signalSemaphores)
113121
{
114122
if (this->isRecording()) {
115123
this->end();
@@ -127,8 +135,35 @@ Sequence::evalAsync()
127135
this->mOperations[i]->preEval(*this->mCommandBuffer);
128136
}
129137

138+
if (!waitDstStageMasks.empty() &&
139+
waitSemaphores.size() != waitDstStageMasks.size()) {
140+
throw std::runtime_error("Kompute Sequence evalAsync wait semaphore "
141+
"count must match wait dst stage mask count");
142+
}
143+
144+
std::vector<vk::PipelineStageFlags> resolvedWaitDstStageMasks =
145+
waitDstStageMasks;
146+
if (resolvedWaitDstStageMasks.empty() && !waitSemaphores.empty()) {
147+
resolvedWaitDstStageMasks.resize(waitSemaphores.size(),
148+
vk::PipelineStageFlagBits::eAllCommands);
149+
}
150+
151+
const vk::Semaphore* waitSemaphoresPtr =
152+
waitSemaphores.empty() ? nullptr : waitSemaphores.data();
153+
const vk::PipelineStageFlags* waitDstStageMasksPtr =
154+
resolvedWaitDstStageMasks.empty() ? nullptr
155+
: resolvedWaitDstStageMasks.data();
156+
const vk::Semaphore* signalSemaphoresPtr =
157+
signalSemaphores.empty() ? nullptr : signalSemaphores.data();
158+
130159
vk::SubmitInfo submitInfo(
131-
0, nullptr, nullptr, 1, this->mCommandBuffer.get());
160+
static_cast<uint32_t>(waitSemaphores.size()),
161+
waitSemaphoresPtr,
162+
waitDstStageMasksPtr,
163+
1,
164+
this->mCommandBuffer.get(),
165+
static_cast<uint32_t>(signalSemaphores.size()),
166+
signalSemaphoresPtr);
132167

133168
KP_LOG_DEBUG(
134169
"Kompute sequence submitting command buffer into compute queue");
@@ -153,11 +188,20 @@ Sequence::submitCommandBuffer(const vk::SubmitInfo& submitInfo)
153188

154189
std::shared_ptr<Sequence>
155190
Sequence::evalAsync(std::shared_ptr<OpBase> op)
191+
{
192+
return this->evalAsync(op, {}, {}, {});
193+
}
194+
195+
std::shared_ptr<Sequence>
196+
Sequence::evalAsync(std::shared_ptr<OpBase> op,
197+
const std::vector<vk::Semaphore>& waitSemaphores,
198+
const std::vector<vk::PipelineStageFlags>& waitDstStageMasks,
199+
const std::vector<vk::Semaphore>& signalSemaphores)
156200
{
157201
this->clear();
158202
this->record(op);
159-
this->evalAsync();
160-
return shared_from_this();
203+
return this->evalAsync(
204+
waitSemaphores, waitDstStageMasks, signalSemaphores);
161205
}
162206

163207
std::shared_ptr<Sequence>

src/include/kompute/Sequence.hpp

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,25 @@ class Sequence : public std::enable_shared_from_this<Sequence>
164164
* @return Boolean stating whether execution was successful.
165165
*/
166166
std::shared_ptr<Sequence> evalAsync();
167+
/**
168+
* Eval Async sends all recorded operations as a submit job and allows
169+
* submit-level GPU synchronization by providing wait and signal semaphores.
170+
* EvalAwait() must ALWAYS be called after to ensure the sequence is
171+
* terminated correctly.
172+
*
173+
* @param waitSemaphores Semaphores that must be signaled before this submit
174+
* starts executing.
175+
* @param waitDstStageMasks Pipeline stages at which to wait for each
176+
* semaphore. If empty and waitSemaphores is not empty, defaults to
177+
* vk::PipelineStageFlagBits::eAllCommands for each wait semaphore.
178+
* @param signalSemaphores Semaphores that this submit will signal when it
179+
* completes.
180+
* @return shared_ptr<Sequence> of the Sequence class itself
181+
*/
182+
std::shared_ptr<Sequence> evalAsync(
183+
const std::vector<vk::Semaphore>& waitSemaphores,
184+
const std::vector<vk::PipelineStageFlags>& waitDstStageMasks,
185+
const std::vector<vk::Semaphore>& signalSemaphores);
167186
/**
168187
* Clears currnet operations to record provided one in the vector of
169188
* operations into the gpu as a submit job without a barrier. EvalAwait()
@@ -173,6 +192,27 @@ class Sequence : public std::enable_shared_from_this<Sequence>
173192
* @return Boolean stating whether execution was successful.
174193
*/
175194
std::shared_ptr<Sequence> evalAsync(std::shared_ptr<OpBase> op);
195+
/**
196+
* Clears current operations, records the provided one and submits with
197+
* optional wait/signal semaphores for submit-level GPU synchronization.
198+
* EvalAwait() must ALWAYS be called after to ensure the sequence is
199+
* terminated correctly.
200+
*
201+
* @param op Operation to record prior to submit.
202+
* @param waitSemaphores Semaphores that must be signaled before this submit
203+
* starts executing.
204+
* @param waitDstStageMasks Pipeline stages at which to wait for each
205+
* semaphore. If empty and waitSemaphores is not empty, defaults to
206+
* vk::PipelineStageFlagBits::eAllCommands for each wait semaphore.
207+
* @param signalSemaphores Semaphores that this submit will signal when it
208+
* completes.
209+
* @return shared_ptr<Sequence> of the Sequence class itself
210+
*/
211+
std::shared_ptr<Sequence> evalAsync(
212+
std::shared_ptr<OpBase> op,
213+
const std::vector<vk::Semaphore>& waitSemaphores,
214+
const std::vector<vk::PipelineStageFlags>& waitDstStageMasks,
215+
const std::vector<vk::Semaphore>& signalSemaphores);
176216
/**
177217
* Eval sends all the recorded and stored operations in the vector of
178218
* operations into the gpu as a submit job with a barrier.

test/TestSequence.cpp

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,3 +243,63 @@ TEST(TestSequence, CorrectSequenceRunningError)
243243

244244
EXPECT_EQ(tensorOut->vector(), std::vector<float>({ 2, 4, 6 }));
245245
}
246+
247+
TEST(TestSequence, EvalAsyncSemaphoreOverloadSupportsEmptySyncLists)
248+
{
249+
kp::Manager mgr;
250+
251+
std::shared_ptr<kp::Sequence> sq = mgr.sequence();
252+
253+
std::shared_ptr<kp::TensorT<float>> tensorA = mgr.tensor({ 1, 2, 3 });
254+
std::shared_ptr<kp::TensorT<float>> tensorB = mgr.tensor({ 2, 2, 2 });
255+
std::shared_ptr<kp::TensorT<float>> tensorOut = mgr.tensor({ 0, 0, 0 });
256+
257+
sq->eval<kp::OpSyncDevice>({ tensorA, tensorB, tensorOut });
258+
259+
std::vector<uint32_t> spirv = compileSource(R"(
260+
#version 450
261+
262+
layout (local_size_x = 1) in;
263+
264+
layout(set = 0, binding = 0) buffer bina { float tina[]; };
265+
layout(set = 0, binding = 1) buffer binb { float tinb[]; };
266+
layout(set = 0, binding = 2) buffer bout { float tout[]; };
267+
268+
void main() {
269+
uint index = gl_GlobalInvocationID.x;
270+
tout[index] = tina[index] * tinb[index];
271+
}
272+
)");
273+
274+
std::shared_ptr<kp::Algorithm> algo =
275+
mgr.algorithm({ tensorA, tensorB, tensorOut }, spirv);
276+
277+
sq->record<kp::OpAlgoDispatch>(algo)->record<kp::OpSyncLocal>(
278+
{ tensorA, tensorB, tensorOut });
279+
280+
EXPECT_NO_THROW(sq->evalAsync({}, {}, {}));
281+
EXPECT_NO_THROW(sq->evalAwait());
282+
283+
EXPECT_EQ(tensorOut->vector(), std::vector<float>({ 2, 4, 6 }));
284+
}
285+
286+
TEST(TestSequence, EvalAsyncSemaphoreOverloadValidatesWaitMaskCount)
287+
{
288+
kp::Manager mgr;
289+
290+
std::shared_ptr<kp::Sequence> sq = mgr.sequence();
291+
292+
std::shared_ptr<kp::TensorT<float>> tensorA = mgr.tensor({ 1, 2, 3 });
293+
294+
sq->record<kp::OpSyncDevice>({ tensorA });
295+
296+
std::vector<vk::Semaphore> waitSemaphores = { vk::Semaphore{} };
297+
std::vector<vk::PipelineStageFlags> waitDstStageMasks = {
298+
vk::PipelineStageFlagBits::eComputeShader,
299+
vk::PipelineStageFlagBits::eTransfer
300+
};
301+
std::vector<vk::Semaphore> signalSemaphores = {};
302+
303+
EXPECT_ANY_THROW(
304+
sq->evalAsync(waitSemaphores, waitDstStageMasks, signalSemaphores));
305+
}

0 commit comments

Comments
 (0)