Skip to content

Commit 020ce69

Browse files
committed
Thread/ThreadPool: Fix data race
1 parent 1eae9f5 commit 020ce69

2 files changed

Lines changed: 20 additions & 14 deletions

File tree

modules/Thread/ThreadPool.mpp

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,10 @@ export namespace CppUtils::Thread
7171

7272
inline auto waitUntilFinished() -> void
7373
{
74-
if (auto lockGuard = std::unique_lock{m_waitingMutex};
75-
m_activeWorkers != 0 or not isTasksQueueEmpty())
76-
m_waitUntilFinishedCondition.wait(lockGuard, [this] {
77-
return m_activeWorkers == 0 and isTasksQueueEmpty();
74+
auto tasksQueueAccessor = m_tasksQueue.access();
75+
if (m_activeWorkers != 0 or not std::empty(tasksQueueAccessor.value()))
76+
m_waitUntilFinishedCondition.wait(tasksQueueAccessor.getLockGuard(), [this, &tasksQueueAccessor] {
77+
return m_activeWorkers == 0 and std::empty(tasksQueueAccessor.value());
7878
});
7979
}
8080

@@ -85,12 +85,6 @@ export namespace CppUtils::Thread
8585
}
8686

8787
private:
88-
[[nodiscard]] inline auto isTasksQueueEmpty() const -> bool
89-
{
90-
auto tasksQueueAccessor = m_tasksQueue.access();
91-
return std::empty(tasksQueueAccessor.value());
92-
}
93-
9488
inline auto workerThread() -> void
9589
{
9690
auto task = Task{};
@@ -104,7 +98,6 @@ export namespace CppUtils::Thread
10498
if (std::empty(tasksQueueAccessor.value()))
10599
return;
106100

107-
auto lockGuard = std::lock_guard{m_waitingMutex};
108101
m_activeWorkers.fetch_add(1, std::memory_order_relaxed);
109102
task = std::move(tasksQueueAccessor->front());
110103
tasksQueueAccessor->pop();
@@ -113,10 +106,10 @@ export namespace CppUtils::Thread
113106
task();
114107

115108
{
116-
auto lockGuard = std::lock_guard{m_waitingMutex};
109+
auto tasksQueueAccessor = m_tasksQueue.access();
117110
m_activeWorkers.fetch_sub(1, std::memory_order_relaxed);
118111

119-
if (m_activeWorkers.load(std::memory_order_relaxed) == 0 and isTasksQueueEmpty())
112+
if (m_activeWorkers.load(std::memory_order_relaxed) == 0 and std::empty(tasksQueueAccessor.value()))
120113
m_waitUntilFinishedCondition.notify_all();
121114
}
122115
}
@@ -148,7 +141,6 @@ export namespace CppUtils::Thread
148141
std::vector<ThreadLoop> m_workers;
149142
UniqueLocker<std::queue<Task>> m_tasksQueue;
150143

151-
std::mutex m_waitingMutex;
152144
std::condition_variable m_waitUntilFinishedCondition;
153145

154146
mutable std::shared_mutex m_onErrorMutex;

tests/Thread/ThreadPool.mpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,5 +99,19 @@ namespace CppUtils::UnitTest::Thread::ThreadPool
9999
});
100100
suite.expectEqual(future.get(), 42);
101101
});
102+
103+
suite.addTest("Stress test", [&] {
104+
auto threadPool = CppUtils::Thread::ThreadPool{};
105+
auto counter = std::atomic_size_t{0};
106+
const auto numTasks = 10'000uz;
107+
108+
for (auto i = 0uz; i < numTasks; ++i)
109+
threadPool.call([&counter] {
110+
++counter;
111+
});
112+
113+
threadPool.waitUntilFinished();
114+
suite.expectEqual(counter.load(), numTasks);
115+
});
102116
}};
103117
}

0 commit comments

Comments
 (0)