Skip to content

Commit ca0afa9

Browse files
committed
Thread/Scheduler: Fix race condition
1 parent d597077 commit ca0afa9

1 file changed

Lines changed: 54 additions & 39 deletions

File tree

modules/Thread/Scheduler.mpp

Lines changed: 54 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ export namespace CppUtils::Thread
7878
inline auto waitUntilFinished() -> void
7979
{
8080
auto accessor = m_items.access();
81-
m_condition.wait(accessor.getLockGuard(), [&accessor] {
82-
return std::empty(accessor.value().set);
81+
m_condition.wait(accessor.getLockGuard(), [&] {
82+
return std::empty(accessor.value().set) and m_processingTasks == 0;
8383
});
8484
}
8585

@@ -108,59 +108,74 @@ export namespace CppUtils::Thread
108108
private:
109109
inline auto runLoop() -> void
110110
{
111-
auto accessor = m_items.access();
112-
auto& [set, map] = accessor.value();
113-
114-
if (std::empty(set))
111+
auto tasksToRun = std::vector<Task>{};
115112
{
116-
m_condition.notify_all();
117-
m_condition.wait(accessor.getLockGuard(), [this, &accessor] {
118-
return not std::empty(accessor.value().set) or m_loop.isStopRequested();
119-
});
120-
return;
121-
}
113+
auto accessor = m_items.access();
114+
auto& [set, map] = accessor.value();
122115

123-
auto first = *std::begin(set);
116+
if (std::empty(set))
117+
{
118+
m_condition.wait(accessor.getLockGuard(), [this, &accessor] {
119+
return not std::empty(accessor.value().set) or m_loop.isStopRequested();
120+
});
121+
if (m_loop.isStopRequested())
122+
return;
123+
if (std::empty(set))
124+
{
125+
m_condition.notify_all();
126+
return;
127+
}
128+
}
124129

125-
if (first->time > Clock::now())
126-
{
127-
m_condition.wait_until(accessor.getLockGuard(), first->time, [&] {
128-
return m_loop.isStopRequested() or first->cancelled or first->time <= Clock::now();
129-
});
130-
return;
131-
}
130+
auto first = *std::begin(set);
132131

133-
map.erase(first->id);
134-
set.erase(first);
135-
if (not first->cancelled)
136-
{
137-
accessor.getLockGuard().unlock();
138-
std::invoke(first->task);
139-
accessor.getLockGuard().lock();
132+
if (first->time > Clock::now())
133+
{
134+
m_condition.wait_until(accessor.getLockGuard(), first->time, [&] {
135+
return m_loop.isStopRequested() or first->cancelled or first->time <= Clock::now();
136+
});
137+
return;
138+
}
139+
140+
const auto endTime = (m_step == Clock::duration::zero()) ? first->time : first->time + m_step;
141+
142+
for (auto it = std::begin(set); it != std::end(set);)
143+
{
144+
if ((*it)->time <= endTime)
145+
{
146+
if (not(*it)->cancelled)
147+
tasksToRun.push_back(std::move((*it)->task));
148+
map.erase((*it)->id);
149+
it = set.erase(it);
150+
}
151+
else
152+
++it;
153+
}
140154
}
141155

142-
if (m_step == Clock::duration::zero())
156+
if (tasksToRun.empty())
143157
return;
144158

145-
const auto endTime = first->time + m_step;
146-
for (auto it = std::begin(set); it != std::end(set) and (*it)->time <= endTime;)
159+
m_processingTasks += std::size(tasksToRun);
160+
for (const auto& task : tasksToRun)
147161
{
148-
auto item = *it;
149-
it = set.erase(it);
150-
map.erase(item->id);
151-
if (not item->cancelled)
152-
{
153-
accessor.getLockGuard().unlock();
154-
std::invoke(item->task);
155-
accessor.getLockGuard().lock();
156-
}
162+
std::invoke(task);
163+
--m_processingTasks;
164+
}
165+
166+
if (m_processingTasks == 0)
167+
{
168+
auto accessor = m_items.access();
169+
if (std::empty(accessor.value().set))
170+
m_condition.notify_all();
157171
}
158172
}
159173

160174
private:
161175
std::atomic_size_t m_nextId = 0;
162176
Clock::duration m_step;
163177
UniqueLocker<Items> m_items;
178+
std::atomic_size_t m_processingTasks = 0;
164179

165180
std::condition_variable m_condition;
166181
ThreadLoop m_loop;

0 commit comments

Comments
 (0)