-
-
Notifications
You must be signed in to change notification settings - Fork 80
Expand file tree
/
Copy pathOPThreadPool.cpp
More file actions
124 lines (102 loc) · 3.45 KB
/
OPThreadPool.cpp
File metadata and controls
124 lines (102 loc) · 3.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#include "OPThreadPool.h"
namespace opsqlite {
ThreadPool::ThreadPool() : done(false) {
// This returns the number of threads supported by the system. If the
// function can't figure out this information, it returns 0. 0 is not good,
// so we create at least 1
// auto numberOfThreads = std::thread::hardware_concurrency();
// if (numberOfThreads == 0) {
// numberOfThreads = 1;
// }
auto numberOfThreads = 1;
for (unsigned i = 0; i < numberOfThreads; ++i) {
// The threads will execute the private member `doWork`. Note that we
// need to pass a reference to the function (namespaced with the class
// name) as the first argument, and the current object as second
// argument
threads.emplace_back(&ThreadPool::doWork, this);
}
}
// The destructor joins all the threads so the program can exit gracefully.
// This will be executed if there is any exception (e.g. creating the threads)
ThreadPool::~ThreadPool() {
// So threads know it's time to shut down
done = true;
// Wake up all the threads, so they can finish and be joined
workQueueConditionVariable.notify_all();
for (auto &thread : threads) {
if (thread.joinable()) {
thread.join();
}
}
threads.clear();
}
// This function will be called by the server every time there is a request
// that needs to be processed by the thread pool
void ThreadPool::queueWork(const std::function<void(void)> &task) {
// Grab the mutex
std::lock_guard<std::mutex> g(workQueueMutex);
// Push the request to the queue
workQueue.push(task);
// Notify one thread that there are requests to process
workQueueConditionVariable.notify_one();
}
// Function used by the threads to grab work from the queue
void ThreadPool::doWork() {
// Loop while the queue is not destructing
while (!done) {
std::function<void(void)> task;
// Create a scope, so we don't lock the queue for longer than necessary
{
std::unique_lock<std::mutex> g(workQueueMutex);
workQueueConditionVariable.wait(g, [&] {
// Only wake up if there are elements in the queue or the
// program is shutting down
return !workQueue.empty() || done;
});
// If we are shutting down exit without trying to process more work
if (done) {
break;
}
task = workQueue.front();
workQueue.pop();
++busy;
}
task();
{
std::lock_guard<std::mutex> g(workQueueMutex);
--busy;
}
workQueueConditionVariable.notify_one();
}
}
void ThreadPool::waitFinished() {
std::unique_lock<std::mutex> g(workQueueMutex);
workQueueConditionVariable.wait(
g, [&] { return workQueue.empty() && (busy == 0); });
}
void ThreadPool::restartPool() {
// So threads know it's time to shut down
done = true;
// Wake up all the threads, so they can finish and be joined
workQueueConditionVariable.notify_all();
for (auto &thread : threads) {
if (thread.joinable()) {
thread.join();
}
}
threads.clear();
auto numberOfThreads = std::thread::hardware_concurrency();
if (numberOfThreads == 0) {
numberOfThreads = 1;
}
for (unsigned i = 0; i < numberOfThreads; ++i) {
// The threads will execute the private member `doWork`. Note that we
// need to pass a reference to the function (namespaced with the class
// name) as the first argument, and the current object as second
// argument
threads.emplace_back(&ThreadPool::doWork, this);
}
done = false;
}
} // namespace opsqlite