-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathThreadPool.hpp
More file actions
149 lines (121 loc) · 2.8 KB
/
ThreadPool.hpp
File metadata and controls
149 lines (121 loc) · 2.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
// Copyright (c) 2018 Ethan Margaillan <contact@ethan.jp>.
// Licensed under the MIT Licence - https://raw.githubusercontent.com/Ethan13310/Thread-Pool-Cpp/master/LICENSE
#pragma once
#include <atomic>
#include <condition_variable>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>
#include <type_traits>
#include <vector>
class thread_pool
{
// Task function
using task_type = std::function<void()>;
public:
explicit thread_pool(std::size_t thread_count = std::thread::hardware_concurrency())
{
for (std::size_t i{ 0 }; i < thread_count; ++i) {
m_workers.emplace_back(std::bind(&thread_pool::thread_loop, this));
}
}
~thread_pool()
{
if (m_workers.size() > 0) {
join();
}
}
thread_pool(thread_pool const &) = delete;
thread_pool(thread_pool &&) = default;
thread_pool &operator=(thread_pool const &) = delete;
thread_pool &operator=(thread_pool &&) = default;
// Push a new task into the queue
template <class Func, class... Args>
auto push(Func &&fn, Args &&...args)
{
using return_type = typename std::result_of<Func(Args...)>::type;
auto task{ std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<Func>(fn), std::forward<Args>(args)...)
) };
auto future{ task->get_future() };
std::unique_lock<std::mutex> lock{ m_mutex };
m_tasks.emplace([task]() {
(*task)();
});
lock.unlock();
m_notifier.notify_one();
return future;
}
// Remove all pending tasks from the queue
void clear()
{
std::unique_lock<std::mutex> lock{ m_mutex };
while (!m_tasks.empty()) {
m_tasks.pop();
}
}
// Wait all workers to finish
void join()
{
m_stop = true;
m_notifier.notify_all();
for (auto &thread : m_workers) {
if (thread.joinable()) {
thread.join();
}
}
m_workers.clear();
}
// Get the number of active and waiting workers
std::size_t thread_count() const
{
return m_workers.size();
}
// Get the number of active workers
std::size_t active_count() const
{
return m_active;
}
private:
// Thread main loop
void thread_loop()
{
while (true) {
// Wait for a new task
auto task{ next_task() };
if (task) {
++m_active;
task();
--m_active;
}
else if (m_stop) {
// No more task + stop required
break;
}
}
}
// Get the next pending task
task_type next_task()
{
std::unique_lock<std::mutex> lock{ m_mutex };
m_notifier.wait(lock, [this]() {
return !m_tasks.empty() || m_stop;
});
if (m_tasks.empty()) {
// No pending task
return {};
}
auto task{ m_tasks.front() };
m_tasks.pop();
return task;
}
std::atomic<bool> m_stop{ false };
std::atomic<std::size_t> m_active{ 0 };
std::condition_variable m_notifier;
std::mutex m_mutex;
std::vector<std::thread> m_workers;
std::queue<task_type> m_tasks;
};