-
Notifications
You must be signed in to change notification settings - Fork 19
Expand file tree
/
Copy paththreadpool.cc
More file actions
92 lines (75 loc) · 2.42 KB
/
threadpool.cc
File metadata and controls
92 lines (75 loc) · 2.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#include "threadpool.hpp"
#include "workerthread.hpp"
#include <QDebug>
class ThreadPool::ThreadPoolPrivate
{
public:
explicit ThreadPoolPrivate(ThreadPool *q)
: q_ptr(q)
{}
~ThreadPoolPrivate()
{
for (auto &thread : threads) {
if (thread->isRunning()) {
thread->quit();
thread->wait();
}
}
threads.clear();
qInfo() << "Total connected clients: " << totalConnectedClients.load();
}
void updateClientCount(int change)
{
if (change > 0) {
totalConnectedClients.fetch_add(change);
}
qint32 oldCount = totalClientCount.fetch_add(change);
qint32 newCount = oldCount + change;
// 使用CAS循环来确保正确更新最大值
qint32 currentMax = maxClientCount.load();
while (newCount > currentMax) {
if (maxClientCount.compare_exchange_weak(currentMax, newCount)) {
emit q_ptr->maxClientCount(newCount);
break;
}
}
emit q_ptr->clientCountChanged(newCount);
}
ThreadPool *q_ptr;
WorkerThreadList threads;
int currentIndex = 0;
std::atomic<qint32> totalClientCount{0};
std::atomic<qint32> maxClientCount{0};
std::atomic<qint32> totalConnectedClients{0};
};
ThreadPool::ThreadPool(int count, const ConnectionCallbacks &callbacks, QObject *parent)
: QObject(parent)
, d_ptr(new ThreadPoolPrivate(this))
{
for (int i = 0; i < count; ++i) {
auto *thread = new WorkerThread(callbacks);
connect(thread, &WorkerThread::message, this, &ThreadPool::message);
connect(thread, &WorkerThread::clientConnected, this, [this]() {
d_ptr->updateClientCount(1);
});
connect(thread, &WorkerThread::clientDisconnected, this, [this]() {
d_ptr->updateClientCount(-1);
});
thread->start();
d_ptr->threads.emplace_back(thread);
}
}
ThreadPool::~ThreadPool() {}
void ThreadPool::dispatchConnection(qintptr socketDescriptor)
{
if (d_ptr->threads.empty())
return;
// Simple round-robin load balancing
auto &thread = d_ptr->threads[d_ptr->currentIndex];
d_ptr->currentIndex = (d_ptr->currentIndex + 1) % d_ptr->threads.size();
thread->handleConnection(socketDescriptor);
}
int ThreadPool::activeThreadCount() const
{
return static_cast<int>(d_ptr->threads.size());
}