11#pragma once
22
3- #include < atomic>
43#include < limits>
5- #include < optional>
64#include < string>
75#include < memory>
86#include < chrono>
7+ #include < thread>
98#include < utility>
109#include < vector>
1110#include < functional>
1211#include < mutex>
12+ #include < stop_token>
1313
1414#include " thread_pool.hpp"
1515
@@ -82,7 +82,7 @@ class PeriodicSchedulerImpl : public std::enable_shared_from_this<PeriodicSchedu
8282
8383 void Reset () {
8484 id_ = kTaskIDMax ;
85- scheduler_ = nullptr ;
85+ scheduler_. reset () ;
8686 }
8787
8888 std::weak_ptr<PeriodicSchedulerImpl> scheduler_{};
@@ -91,21 +91,23 @@ class PeriodicSchedulerImpl : public std::enable_shared_from_this<PeriodicSchedu
9191
9292 auto Schedule (TaskFn&& task, Duration interval, TaskStopPolicy stop_policy = TaskStopPolicy::kWaitForCompletion )
9393 -> TaskHandle {
94- TaskID id = task_counter_.fetch_add (1 , std::memory_order_relaxed);
95- {
96- std::lock_guard lock{tasks_mtx_};
97- tasks_.emplace_back (std::move (task), id, stop_policy, interval, std::future<void >{}, Clock::now ());
98- }
94+ std::unique_lock lock{mtx_};
95+ TaskID id = task_counter_++;
96+ tasks_.emplace_back (std::move (task), id, stop_policy, interval, std::future<void >(), Clock::now ());
97+ lock.unlock ();
9998 cv_.notify_all ();
10099 return TaskHandle (std::enable_shared_from_this<PeriodicSchedulerImpl<Clock>>::weak_from_this (), id);
101100 }
102101
103102 auto GetNumTasks () const -> size_t {
104- std::lock_guard lock{tasks_mtx_ };
103+ std::lock_guard lock{mtx_ };
105104 return tasks_.size ();
106105 }
107106
108- auto TotalNumTasks () const -> size_t { return task_counter_.load (std::memory_order_relaxed); }
107+ auto GetTaskCounter () const -> size_t {
108+ std::lock_guard lock{mtx_};
109+ return task_counter_;
110+ }
109111
110112 static auto Create (ThreadPool& pool) -> std::shared_ptr<PeriodicSchedulerImpl> {
111113 return std::shared_ptr<PeriodicSchedulerImpl>(new PeriodicSchedulerImpl (pool));
@@ -126,46 +128,43 @@ class PeriodicSchedulerImpl : public std::enable_shared_from_this<PeriodicSchedu
126128 PeriodicSchedulerImpl (ThreadPool& pool)
127129 : pool_(pool),
128130 tasks_(),
129- tasks_mtx_ (),
131+ mtx_ (),
130132 cv_(),
131133 task_counter_(),
132134 worker_(&PeriodicSchedulerImpl<Clock>::ScheduleLoop, this ) {}
133135
134136 auto StopTask (TaskHandle& handle) -> bool {
135- std::optional<Task> task{};
136- {
137- std::lock_guard lock{tasks_mtx_};
138- auto it = std::ranges::find_if (tasks_, [&handle](const auto & t) { return handle.id_ == t.id ; });
139- if (it != tasks_.end ()) {
140- task.emplace (std::move (*it));
141- tasks_.erase (it);
142- }
143- }
144-
145- if (!task) {
137+ std::unique_lock lock{mtx_};
138+ auto it = std::ranges::find_if (tasks_, [&handle](const auto & task) { return handle.id_ == task.id ; });
139+ if (it == tasks_.end ()) {
146140 return false ;
147141 }
148142
149- if (task->stop_policy == TaskStopPolicy::kWaitForCompletion ) {
150- if (task->promise .valid ()) {
151- task->promise .wait ();
143+ auto task = std::move (*it);
144+ tasks_.erase (it);
145+ lock.unlock ();
146+
147+ if (task.stop_policy == TaskStopPolicy::kWaitForCompletion ) {
148+ if (task.promise .valid ()) {
149+ task.promise .wait ();
152150 }
153151 }
154-
155152 handle.Reset ();
156153 return true ;
157154 }
158155
159156 void ScheduleLoop (const std::stop_token& stoken) {
157+ std::stop_callback scb{stoken, [this ] { cv_.notify_all (); }};
158+
160159 while (!stoken.stop_requested ()) {
161- std::unique_lock lock{tasks_mtx_ };
160+ std::unique_lock lock{mtx_ };
162161 if (tasks_.empty ()) {
163- cv_.wait (lock, stoken, [& ] { return stoken. stop_requested () || !tasks_.empty (); });
162+ cv_.wait (lock, stoken, [this ] { return !tasks_.empty (); });
164163 continue ;
165164 }
166165
167166 auto now = Clock::now ();
168- TimePoint next_wake_time = TimePoint::max ();
167+ auto next_wake_time = TimePoint::max ();
169168 for (Task& task : tasks_) {
170169 if (now >= task.next_execution ) {
171170 bool is_running = task.promise .valid () &&
@@ -180,17 +179,17 @@ class PeriodicSchedulerImpl : public std::enable_shared_from_this<PeriodicSchedu
180179 }
181180
182181 if (next_wake_time != TimePoint::max ()) {
183- cv_. wait_until (lock, stoken, next_wake_time,
184- [&] { return stoken. stop_requested () || !tasks_. empty () ; });
182+ auto id = task_counter_;
183+ cv_. wait_until (lock, stoken, next_wake_time, [ this , id]() { return id < task_counter_ ; });
185184 }
186185 }
187186 }
188187
189188 ThreadPool& pool_;
190189 std::vector<Task> tasks_;
191- std::mutex tasks_mtx_ ;
190+ mutable std::mutex mtx_ ;
192191 std::condition_variable_any cv_;
193- std::atomic< TaskID> task_counter_;
192+ TaskID task_counter_;
194193 std::jthread worker_;
195194};
196195
0 commit comments