Skip to content

Commit 5f005a9

Browse files
committed
Thread/Planner
1 parent 04ba74a commit 5f005a9

9 files changed

Lines changed: 227 additions & 98 deletions

File tree

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@
3939
- [`Vec2`](modules/Container/Vec2.mpp) / [`Vec3`](modules/Container/Vec3.mpp) - 2D/3D math vectors with operators and common functions
4040

4141
### 🚀 Execution
42+
- [`Event`](modules/Execution/Event.mpp) - An event for thread synchronization
4243
- [`EventSystem`](modules/Execution/EventSystem.mpp) - Event system to subscribe functions and trigger actions by event name
43-
- [`Planner`](modules/Execution/Planner.mpp) - Simple scheduler to run delayed functions on separate threads
4444

4545
### 📁 File System
4646
- [`Watcher`](modules/FileSystem/Watcher.mpp) - File modification watcher
@@ -81,7 +81,7 @@
8181
- [`Title`](modules/Terminal/Title.mpp) - Terminal title utilities
8282

8383
### 🚦 Multithreading & Synchronization
84-
- [`Event`](modules/Thread/Event.mpp) - An event for thread synchronization
84+
- [`Planner`](modules/Thread/Planner.mpp) - Simple scheduler to run delayed functions on separate threads
8585
- [`ThreadLoop`](modules/Thread/ThreadLoop.mpp) - Thread loop with exception handling
8686
- [`ThreadPool`](modules/Thread/ThreadPool.mpp) - Fixed-size thread pool for parallel task execution
8787
- [`TryAsync`](modules/Thread/TryAsync.mpp) - Launches a function asynchronously, forwards exception to caller

modules/Execution/Execution.mpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,3 @@ export module CppUtils.Execution;
22

33
export import CppUtils.Execution.Event;
44
export import CppUtils.Execution.EventSystem;
5-
export import CppUtils.Execution.Planner;

modules/Execution/Planner.mpp

Lines changed: 0 additions & 39 deletions
This file was deleted.

modules/Thread/Planner.mpp

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
export module CppUtils.Thread.Planner;
2+
3+
import std;
4+
import CppUtils.Chrono.Concept;
5+
import CppUtils.Thread.UniqueLocker;
6+
import CppUtils.Thread.SharedLocker;
7+
import CppUtils.Thread.ThreadLoop;
8+
9+
export namespace CppUtils::Thread
10+
{
11+
class Planner final
12+
{
13+
using Clock = std::chrono::steady_clock;
14+
using TimePoint = Clock::time_point;
15+
using Task = std::function<void()>;
16+
17+
struct Item final
18+
{
19+
TimePoint time;
20+
Task task;
21+
std::atomic_bool cancelled = false;
22+
};
23+
24+
struct Compare final
25+
{
26+
auto operator()(const std::shared_ptr<Item>& a,
27+
const std::shared_ptr<Item>& b) const noexcept -> bool
28+
{
29+
return a->time < b->time;
30+
}
31+
};
32+
33+
using Set = std::multiset<std::shared_ptr<Item>, Compare>;
34+
using Map = std::unordered_map<std::size_t, std::shared_ptr<Item>>;
35+
36+
public:
37+
inline Planner():
38+
m_loop{
39+
[this] { runLoop(); },
40+
[this] { m_condition.notify_all(); }}
41+
{
42+
m_loop.start();
43+
}
44+
45+
inline ~Planner() noexcept
46+
{
47+
cancelAll();
48+
m_loop.requestStop();
49+
}
50+
51+
inline auto schedule(Task task, TimePoint when) -> std::size_t
52+
{
53+
auto id = m_nextId++;
54+
auto item = std::make_shared<Item>(when, std::move(task));
55+
{
56+
auto mapAccessor = m_set.access();
57+
mapAccessor->insert(item);
58+
}
59+
{
60+
auto mapAccessor = m_map.access();
61+
mapAccessor->emplace(id, item);
62+
}
63+
m_condition.notify_all();
64+
return id;
65+
}
66+
67+
inline auto schedule(Task task, Chrono::Duration auto delay) -> std::size_t
68+
{
69+
return schedule(std::move(task), Clock::now() + delay);
70+
}
71+
72+
inline auto waitUntilFinished() -> void
73+
{
74+
auto setAccessor = m_set.access();
75+
m_condition.wait(setAccessor.getLockGuard(), [&setAccessor] {
76+
return std::empty(setAccessor.value());
77+
});
78+
}
79+
80+
inline auto cancel(std::size_t id) -> void
81+
{
82+
auto mapAccessor = m_map.access();
83+
if (auto it = mapAccessor->find(id); it != std::end(mapAccessor.value()))
84+
{
85+
it->second->cancelled = true;
86+
mapAccessor->erase(it);
87+
}
88+
}
89+
90+
inline auto cancelAll() -> void
91+
{
92+
auto mapAccessor = m_map.access();
93+
for (auto& [_, item] : mapAccessor.value())
94+
item->cancelled = true;
95+
mapAccessor->clear();
96+
{
97+
auto setAccessor = m_set.access();
98+
setAccessor->clear();
99+
}
100+
m_condition.notify_all();
101+
}
102+
103+
private:
104+
inline auto runLoop() -> void
105+
{
106+
auto setAccessor = m_set.access();
107+
108+
if (std::empty(setAccessor.value()))
109+
{
110+
m_condition.notify_all();
111+
m_condition.wait(setAccessor.getLockGuard(), [this, &setAccessor] {
112+
return not std::empty(setAccessor.value()) or m_loop.isStopRequested();
113+
});
114+
return;
115+
}
116+
117+
auto item = *std::begin(setAccessor.value());
118+
119+
if (item->cancelled)
120+
{
121+
setAccessor->erase(std::begin(setAccessor.value()));
122+
return;
123+
}
124+
125+
if (item->time > Clock::now())
126+
{
127+
m_condition.wait_until(setAccessor.getLockGuard(), item->time);
128+
return;
129+
}
130+
131+
setAccessor->erase(std::begin(setAccessor.value()));
132+
if (item->cancelled)
133+
return;
134+
135+
setAccessor.getLockGuard().unlock();
136+
std::invoke(item->task);
137+
setAccessor.getLockGuard().lock();
138+
}
139+
140+
private:
141+
std::atomic_size_t m_nextId = 0;
142+
UniqueLocker<Set> m_set;
143+
UniqueLocker<Map> m_map;
144+
145+
std::condition_variable m_condition;
146+
ThreadLoop m_loop;
147+
};
148+
}

modules/Thread/Thread.mpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
export module CppUtils.Thread;
22

3+
export import CppUtils.Thread.Planner;
34
export import CppUtils.Thread.SharedLocker;
45
export import CppUtils.Thread.SharedPtr;
56
export import CppUtils.Thread.ThreadLoop;

tests/Execution/Planner.mpp

Lines changed: 0 additions & 54 deletions
This file was deleted.

tests/Terminal/Canvas.mpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ export namespace CppUtils::UnitTest::Terminal::Canvas
4444

4545
suite.addTest("Fill with planner", [&] {
4646
auto canvas = CppUtils::Terminal::v2::Canvas{CppUtils::Container::Size2{10, 5}};
47-
auto planner = CppUtils::Execution::Planner{};
47+
auto planner = CppUtils::Thread::Planner{};
4848
const auto chars = "-\\|/"sv;
4949
auto loop = 0uz, frame = 0uz;
5050
auto updateFrame = [&](this const auto& self) -> void {

tests/Thread/Planner.mpp

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
export module CppUtils.UnitTests.Thread.Planner;
2+
3+
import std;
4+
import CppUtils;
5+
6+
export namespace CppUtils::UnitTest::Thread::Planner
7+
{
8+
inline auto _ = TestSuite{
9+
"Thread/Planner", {"UnitTest"}, []([[maybe_unused]] auto& suite) {
10+
using namespace std::chrono_literals;
11+
12+
suite.addTest("Initialize Planner", [&] {
13+
auto planner = CppUtils::Thread::Planner{};
14+
15+
planner.waitUntilFinished();
16+
});
17+
18+
suite.addTest("Schedule and execute", [&] {
19+
auto planner = CppUtils::Thread::Planner{};
20+
auto executed = std::atomic_bool{false};
21+
22+
planner.schedule([&executed] {
23+
executed = true;
24+
}, 100ms);
25+
26+
planner.waitUntilFinished();
27+
28+
suite.expect(executed.load());
29+
});
30+
31+
suite.addTest("Recursive scheduling", [&] {
32+
auto planner = CppUtils::Thread::Planner{};
33+
auto counter = std::atomic_size_t{0};
34+
35+
planner.schedule([&planner, &counter] {
36+
++counter;
37+
planner.schedule([&counter] {
38+
++counter;
39+
}, 100ms);
40+
}, 100ms);
41+
42+
planner.waitUntilFinished();
43+
44+
suite.expectEqual(counter.load(), 2uz);
45+
});
46+
47+
suite.addTest("Cancel tasks on destruction", [&] {
48+
auto executed = std::atomic_bool{false};
49+
50+
{
51+
auto planner = CppUtils::Thread::Planner{};
52+
53+
planner.schedule([&executed] {
54+
executed = true;
55+
}, 100ms);
56+
}
57+
58+
suite.expect(not executed.load());
59+
});
60+
61+
suite.addTest("Multiple tasks", [&] {
62+
auto planner = CppUtils::Thread::Planner{};
63+
auto counter = std::atomic_size_t{0};
64+
65+
planner.schedule([&counter] { ++counter; }, 100ms);
66+
planner.schedule([&counter] { ++counter; }, 200ms);
67+
planner.schedule([&counter] { ++counter; }, 300ms);
68+
69+
planner.waitUntilFinished();
70+
71+
suite.expectEqual(counter.load(), 3uz);
72+
});
73+
}};
74+
}

tests/UnitTests.mpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ export import CppUtils.UnitTests.Container.Tree;
1313
export import CppUtils.UnitTests.Container.TypedStack;
1414
export import CppUtils.UnitTests.Container.Vector;
1515
export import CppUtils.UnitTests.Execution.EventSystem;
16-
export import CppUtils.UnitTests.Execution.Planner;
1716
export import CppUtils.UnitTests.FileSystem.Directory;
1817
export import CppUtils.UnitTests.FileSystem.File;
1918
export import CppUtils.UnitTests.FileSystem.Watcher;
@@ -32,6 +31,7 @@ export import CppUtils.UnitTests.String.Utility;
3231
export import CppUtils.UnitTests.System.Error;
3332
export import CppUtils.UnitTests.Terminal.Canvas;
3433
export import CppUtils.UnitTests.Terminal;
34+
export import CppUtils.UnitTests.Thread.Planner;
3535
export import CppUtils.UnitTests.Thread.SharedLocker;
3636
export import CppUtils.UnitTests.Thread.ThreadLoop;
3737
export import CppUtils.UnitTests.Thread.ThreadPool;

0 commit comments

Comments
 (0)