Skip to content

Commit 2b3ce10

Browse files
committed
Thread/ThreadPool: Finally function
1 parent 7a96a3f commit 2b3ce10

6 files changed

Lines changed: 53 additions & 29 deletions

File tree

modules/Container/MeshNetwork.mpp

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,10 @@ export namespace CppUtils::Container
116116
using difference_type = std::ptrdiff_t;
117117
using pointer = value_type*;
118118
using reference = value_type&;
119-
using internal_iterator_type = std::vector<typename MeshNode<Key, Value>::WeakPtr>::const_iterator;
119+
using internal_iterator_type = std::conditional_t<
120+
std::is_const_v<std::remove_reference_t<NodePtrType>>,
121+
typename std::vector<typename MeshNode<Key, Value>::WeakPtr>::const_iterator,
122+
typename std::vector<typename MeshNode<Key, Value>::WeakPtr>::iterator>;
120123

121124
BranchIterator() = default;
122125
BranchIterator(internal_iterator_type iterator): m_iterator{iterator} {}
@@ -146,7 +149,7 @@ export namespace CppUtils::Container
146149

147150
[[nodiscard]] auto operator!=(const BranchIterator& other) const -> bool
148151
{
149-
return !(*this == other);
152+
return not(*this == other);
150153
}
151154

152155
private:
@@ -156,14 +159,14 @@ export namespace CppUtils::Container
156159
Branch(NodePtrType& self, const Key& key, ChildrenVector children): m_self(self), m_key(key), m_children(std::move(children))
157160
{}
158161

159-
[[nodiscard]] auto begin() const -> BranchIterator
162+
[[nodiscard]] auto begin(this auto&& self) -> BranchIterator
160163
{
161-
return BranchIterator{std::cbegin(m_children)};
164+
return BranchIterator{std::begin(self.m_children)};
162165
}
163166

164-
[[nodiscard]] auto end() const -> BranchIterator
167+
[[nodiscard]] auto end(this auto&& self) -> BranchIterator
165168
{
166-
return BranchIterator{std::cend(m_children)};
169+
return BranchIterator{std::end(self.m_children)};
167170
}
168171

169172
[[nodiscard]] auto size() const -> std::size_t

modules/Execution/ScopeGuard.mpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ export namespace CppUtils::Execution
1616

1717
~ScopeGuard() noexcept
1818
{
19-
m_onExit();
19+
if (m_onExit)
20+
m_onExit();
2021
}
2122

2223
private:

modules/Thread/Scheduler.mpp

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import CppUtils.Chrono.Concept;
55
import CppUtils.Thread.UniqueLocker;
66
import CppUtils.Thread.SharedLocker;
77
import CppUtils.Thread.ThreadLoop;
8-
import CppUtils.Execution.ScopeGuard;
8+
import CppUtils.Thread.ThreadPool;
99

1010
export namespace CppUtils::Thread
1111
{
@@ -47,20 +47,32 @@ export namespace CppUtils::Thread
4747
public:
4848
inline Scheduler(
4949
Clock::duration step = Clock::duration::zero(),
50-
std::function<void(std::exception_ptr)> onError = nullptr):
50+
std::size_t numberThreads = std::thread::hardware_concurrency(),
51+
std::function<void(std::exception_ptr)> onError = nullptr,
52+
std::function<void()> finally = nullptr):
5153
m_step{step},
52-
m_loop{
54+
m_finally{std::move(finally)},
55+
m_threadLoop{
5356
[this] { runLoop(); },
5457
[this] { m_condition.notify_all(); },
55-
std::move(onError)}
58+
onError},
59+
m_threadPool{
60+
numberThreads,
61+
onError,
62+
[this] {
63+
if (m_finally)
64+
m_finally();
65+
if (--m_processingTasks == 0)
66+
m_condition.notify_all();
67+
}}
5668
{
57-
m_loop.start();
69+
m_threadLoop.start();
5870
}
5971

6072
inline ~Scheduler() noexcept
6173
{
6274
cancelAll();
63-
m_loop.requestStop();
75+
m_threadLoop.requestStop();
6476
}
6577

6678
inline auto schedule(Task task, TimePoint when) -> std::size_t
@@ -120,9 +132,9 @@ export namespace CppUtils::Thread
120132
if (std::empty(set))
121133
{
122134
m_condition.wait(accessor.getLockGuard(), [this, &accessor] {
123-
return not std::empty(accessor.value().set) or m_loop.isStopRequested();
135+
return not std::empty(accessor.value().set) or m_threadLoop.isStopRequested();
124136
});
125-
if (m_loop.isStopRequested())
137+
if (m_threadLoop.isStopRequested())
126138
return;
127139
if (std::empty(set))
128140
{
@@ -136,7 +148,7 @@ export namespace CppUtils::Thread
136148
if (first->time > Clock::now())
137149
{
138150
m_condition.wait_until(accessor.getLockGuard(), first->time, [&, first] {
139-
return m_loop.isStopRequested() or first->cancelled or std::empty(set) or (*std::begin(set) != first) or first->time <= Clock::now();
151+
return m_threadLoop.isStopRequested() or first->cancelled or std::empty(set) or (*std::begin(set) != first) or first->time <= Clock::now();
140152
});
141153
return;
142154
}
@@ -161,14 +173,10 @@ export namespace CppUtils::Thread
161173
return;
162174

163175
m_processingTasks += std::size(tasksToRun);
164-
for (const auto& task : tasksToRun)
165-
{
166-
auto _ = Execution::ScopeGuard{[this] {
167-
if (--m_processingTasks == 0)
168-
m_condition.notify_all();
169-
}};
170-
std::invoke(task);
171-
}
176+
for (auto&& task : tasksToRun)
177+
m_threadPool.call([task = std::move(task)] {
178+
std::invoke(task);
179+
});
172180
}
173181

174182
private:
@@ -177,7 +185,10 @@ export namespace CppUtils::Thread
177185
UniqueLocker<Items> m_items;
178186
std::atomic_size_t m_processingTasks = 0;
179187

188+
std::function<void()> m_finally;
189+
180190
std::condition_variable m_condition;
181-
ThreadLoop m_loop;
191+
ThreadLoop m_threadLoop;
192+
ThreadPool m_threadPool;
182193
};
183194
}

modules/Thread/ThreadPool.mpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
export module CppUtils.Thread.ThreadPool;
22

33
import std;
4+
import CppUtils.Execution.ScopeGuard;
45
import CppUtils.Logger;
56
import CppUtils.Thread.ThreadLoop;
67
import CppUtils.Thread.UniqueLocker;
@@ -13,8 +14,13 @@ export namespace CppUtils::Thread
1314
using Task = std::function<void()>;
1415

1516
public:
16-
explicit inline ThreadPool(std::size_t numberThreads = std::thread::hardware_concurrency()):
17-
m_numberThreads{numberThreads}
17+
explicit inline ThreadPool(
18+
std::size_t numberThreads = std::thread::hardware_concurrency(),
19+
std::function<void(std::exception_ptr)> onError = nullptr,
20+
std::function<void()> finally = nullptr):
21+
m_numberThreads{numberThreads},
22+
m_onError{std::move(onError)},
23+
m_finally{std::move(finally)}
1824
{
1925
m_workers.reserve(m_numberThreads);
2026
for (auto i = 0uz; i < m_numberThreads; ++i)
@@ -44,6 +50,7 @@ export namespace CppUtils::Thread
4450
{
4551
using ReturnType = std::invoke_result_t<std::decay_t<decltype(function)>>;
4652
auto task = std::make_shared<std::packaged_task<ReturnType()>>([this, function = std::forward<decltype(function)>(function)]() mutable {
53+
auto _ = Execution::ScopeGuard{m_finally};
4754
try
4855
{
4956
std::invoke(function);
@@ -146,5 +153,7 @@ export namespace CppUtils::Thread
146153
mutable std::shared_mutex m_onErrorMutex;
147154
std::function<void(std::exception_ptr)> m_onError;
148155
std::atomic<bool> m_stopRequested = false;
156+
157+
std::function<void()> m_finally;
149158
};
150159
}

tests/Execution/ScopeGuard.mpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
export module CppUtils.UnitTests.Execution.ScopeGuard;
22

3-
import CppUtils.Execution.ScopeGuard;
4-
import CppUtils.UnitTest;
53
import std;
4+
import CppUtils;
65

76
export namespace CppUtils::UnitTest::Execution::ScopeGuard
87
{

tests/Thread/Scheduler.mpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ export namespace CppUtils::UnitTest::Thread::Scheduler
206206
auto onErrorInvoked = std::atomic_bool{false};
207207
auto scheduler = CppUtils::Thread::Scheduler{
208208
0ms,
209+
std::thread::hardware_concurrency(),
209210
[&](std::exception_ptr) {
210211
onErrorInvoked = true;
211212
}};

0 commit comments

Comments
 (0)