Skip to content

Commit a3ae29f

Browse files
committed
Add Thread Pool
1 parent f2b78c1 commit a3ae29f

4 files changed

Lines changed: 451 additions & 2 deletions

File tree

include/alloc/pointer.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ class sp {
151151
}
152152

153153
/**
154-
* @brief Static factory method to create a new `UNIQUE` managed object.
154+
* @brief Static factory method to create a new `SHARED` managed object.
155155
* @tparam Args Variadic template for constructor arguments.
156156
* @param args Arguments for `T`'s constructor.
157157
* @return A new `sp<T>` instance managing the created object.
@@ -160,7 +160,7 @@ class sp {
160160
static sp<T> create(Args&&... args) {
161161
sp_pointer_details_t* block = new sp_pointer_details_concrete_t<T>(std::forward<Args>(args)...);
162162

163-
return {block, UNIQUE};
163+
return {block, SHARED};
164164
}
165165

166166
/**

include/parallel/ThreadPool.h

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Copyright 2023-2025 komozoi
3+
* Original Creation Date: 2026-4-9
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
#ifndef EXCESSIVE_THREADPOOL_H
20+
#define EXCESSIVE_THREADPOOL_H
21+
22+
#include <thread>
23+
#include <mutex>
24+
#include <condition_variable>
25+
#include <functional>
26+
27+
#include "alloc/pointer.h"
28+
#include "ds/ArrayList.h"
29+
#include "ds/Queue.h"
30+
31+
32+
class ThreadPoolTask {
33+
public:
34+
virtual void run() = 0;
35+
virtual bool isDone() const = 0;
36+
37+
virtual ~ThreadPoolTask() = default;
38+
};
39+
40+
41+
class ThreadPoolTaskFunction : public ThreadPoolTask {
42+
public:
43+
template<typename Func, typename = typename std::enable_if<!std::is_same<typename std::decay<Func>::type, ThreadPoolTaskFunction>::value>::type>
44+
ThreadPoolTaskFunction(Func&& func)
45+
: task(std::forward<Func>(func)) {}
46+
47+
void operator()() {
48+
run();
49+
}
50+
51+
void run() override {
52+
task();
53+
didRun = true;
54+
}
55+
56+
bool isDone() const override {
57+
return didRun;
58+
}
59+
60+
private:
61+
std::function<void()> task;
62+
bool didRun = false;
63+
};
64+
65+
66+
/**
67+
* @brief A thread pool for executing tasks in parallel.
68+
*
69+
* This class manages a pool of threads and a queue of tasks.
70+
*/
71+
class ThreadPool {
72+
public:
73+
/**
74+
* @brief Constructs a new ThreadPool with the specified number of threads.
75+
* @param threads The number of threads in the pool.
76+
*/
77+
explicit ThreadPool(int threads);
78+
79+
/**
80+
* @brief Destructor. Shuts down the thread pool.
81+
*/
82+
~ThreadPool();
83+
84+
/**
85+
* @brief Submits a task to the thread pool for execution.
86+
* @param task The task to execute.
87+
*/
88+
void submit(sp<ThreadPoolTask> task);
89+
90+
/**
91+
* @brief Submits a task to the thread pool for execution.
92+
* @param func The function to execute.
93+
*/
94+
template<typename Func, typename = typename std::enable_if<!std::is_base_of<ThreadPoolTask, typename std::remove_reference<Func>::type>::value &&
95+
!is_sp<typename std::remove_reference<Func>::type>::value>::type>
96+
sp<ThreadPoolTask> submit(Func&& func) {
97+
sp<ThreadPoolTask> task = sp<ThreadPoolTaskFunction>::create(std::forward<Func>(func));
98+
submit(task);
99+
return task;
100+
}
101+
102+
/**
103+
* @brief Submits a task to the thread pool for execution with arguments.
104+
* @param func The function to execute.
105+
* @param args The arguments to pass to the function.
106+
*/
107+
template<typename Func, typename... Args, typename = typename std::enable_if<(sizeof...(Args) > 0)>::type>
108+
sp<ThreadPoolTask> submit(Func&& func, Args&&... args) {
109+
sp<ThreadPoolTask> task = sp<ThreadPoolTaskFunction>::create(std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
110+
submit(task);
111+
return task;
112+
}
113+
114+
/**
115+
* @brief Shuts down the thread pool gracefully.
116+
*/
117+
void shutdown();
118+
119+
/**
120+
* @brief Checks if the thread pool has been shut down.
121+
* @return True if the thread pool is shut down, false otherwise.
122+
*/
123+
bool isShutdown() const;
124+
125+
/**
126+
* @brief Gets the number of threads in the pool.
127+
* @return The number of threads.
128+
*/
129+
int getPoolSize() const;
130+
131+
/**
132+
* @brief Gets the number of tasks currently in the queue.
133+
* @return The number of tasks in the queue.
134+
*/
135+
int getQueueSize() const;
136+
137+
private:
138+
void workerLoop();
139+
140+
ArrayList<std::thread> workers;
141+
Queue<sp<ThreadPoolTask>> tasks;
142+
143+
mutable std::mutex queueMutex;
144+
std::condition_variable condition;
145+
bool stop;
146+
};
147+
148+
#endif // EXCESSIVE_THREADPOOL_H

src/parallel/ThreadPool.cpp

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright 2023-2025 komozoi
3+
* Original Creation Date: 2026-4-9
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
#include "parallel/ThreadPool.h"
20+
#include <stdexcept>
21+
22+
ThreadPool::ThreadPool(int threads) : stop(false) {
23+
for (int i = 0; i < threads; ++i) {
24+
workers.add(std::thread(&ThreadPool::workerLoop, this));
25+
}
26+
}
27+
28+
ThreadPool::~ThreadPool() {
29+
shutdown();
30+
}
31+
32+
void ThreadPool::workerLoop() {
33+
for (;;) {
34+
sp<ThreadPoolTask> task;
35+
{
36+
std::unique_lock<std::mutex> lock(queueMutex);
37+
while (!stop && tasks.empty()) {
38+
condition.wait(lock);
39+
}
40+
41+
if (stop && tasks.empty()) {
42+
return;
43+
}
44+
45+
task = tasks.pop();
46+
}
47+
task.mut().run();
48+
}
49+
}
50+
51+
void ThreadPool::submit(sp<ThreadPoolTask> task) {
52+
{
53+
std::lock_guard<std::mutex> lock(queueMutex);
54+
55+
if (stop) {
56+
throw std::runtime_error("submit on stopped ThreadPool");
57+
}
58+
59+
tasks.add(task);
60+
}
61+
condition.notify_one();
62+
}
63+
64+
void ThreadPool::shutdown() {
65+
{
66+
std::lock_guard<std::mutex> lock(queueMutex);
67+
if (stop) {
68+
return;
69+
}
70+
stop = true;
71+
}
72+
condition.notify_all();
73+
for (int i = 0; i < workers.size(); ++i) {
74+
if (workers.get(i).joinable()) {
75+
workers.get(i).join();
76+
}
77+
}
78+
}
79+
80+
bool ThreadPool::isShutdown() const {
81+
std::lock_guard<std::mutex> lock(queueMutex);
82+
return stop;
83+
}
84+
85+
int ThreadPool::getPoolSize() const {
86+
return workers.size();
87+
}
88+
89+
int ThreadPool::getQueueSize() const {
90+
std::lock_guard<std::mutex> lock(queueMutex);
91+
return tasks.size();
92+
}

0 commit comments

Comments
 (0)