forked from OP-Engineering/op-sqlite
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathOPThreadPool.cpp
More file actions
138 lines (111 loc) · 3.57 KB
/
OPThreadPool.cpp
File metadata and controls
138 lines (111 loc) · 3.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#include "OPThreadPool.h"
namespace opsqlite {
ThreadPool::ThreadPool() : done(false) {
// This returns the number of threads supported by the system. If the
// function can't figure out this information, it returns 0. 0 is not good,
// so we create at least 1
// auto numberOfThreads = std::thread::hardware_concurrency();
// if (numberOfThreads == 0) {
// numberOfThreads = 1;
// }
auto numberOfThreads = 1;
for (unsigned i = 0; i < numberOfThreads; ++i) {
// The threads will execute the private member `doWork`. Note that we
// need to pass a reference to the function (namespaced with the class
// name) as the first argument, and the current object as second
// argument
threads.emplace_back(&ThreadPool::doWork, this);
}
}
// The destructor joins all the threads so the program can exit gracefully.
// This will be executed if there is any exception (e.g. creating the threads)
ThreadPool::~ThreadPool() {
// So threads know it's time to shut down
done = true;
// Wake up all the threads, so they can finish and be joined
workQueueConditionVariable.notify_all();
for (auto &thread : threads) {
if (thread.joinable()) {
thread.join();
}
}
threads.clear();
}
// This function will be called by the server every time there is a request
// that needs to be processed by the thread pool
void ThreadPool::queueWork(const std::function<void(void)> &task) {
// Grab the mutex
std::lock_guard<std::mutex> g(workQueueMutex);
// Push the request to the queue
workQueue.push(task);
// Notify one thread that there are requests to process
workQueueConditionVariable.notify_one();
}
// Function used by the threads to grab work from the queue
void ThreadPool::doWork() {
// Loop while the queue is not destructing
while (!done) {
std::function<void(void)> task;
// Create a scope, so we don't lock the queue for longer than necessary
{
std::unique_lock<std::mutex> g(workQueueMutex);
workQueueConditionVariable.wait(g, [&] {
// Only wake up if there are elements in the queue or the
// program is shutting down
return !workQueue.empty() || done;
});
// If we are shutting down exit without trying to process more work
if (done) {
break;
}
task = workQueue.front();
workQueue.pop();
}
++busy;
task();
--busy;
}
}
void ThreadPool::waitFinished() {
std::unique_lock<std::mutex> g(workQueueMutex);
workQueueConditionVariable.wait(
g, [&] { return workQueue.empty() && (busy == 0); });
}
void ThreadPool::restartPool() {
// So threads know it's time to shut down
done = true;
// Wake up all the threads, so they can finish and be joined
workQueueConditionVariable.notify_all();
for (auto &thread : threads) {
if (thread.joinable()) {
thread.join();
}
}
threads.clear();
auto numberOfThreads = std::thread::hardware_concurrency();
if (numberOfThreads == 0) {
numberOfThreads = 1;
}
for (unsigned i = 0; i < numberOfThreads; ++i) {
// The threads will execute the private member `doWork`. Note that we
// need to pass a reference to the function (namespaced with the class
// name) as the first argument, and the current object as second
// argument
threads.emplace_back(&ThreadPool::doWork, this);
}
done = false;
}
void ThreadPool::shutdown() {
if (done) {
return;
}
done = true;
workQueueConditionVariable.notify_all();
for (auto &thread : threads) {
if (thread.joinable()) {
thread.join();
}
}
threads.clear();
}
} // namespace opsqlite