Skip to content

Commit 965f043

Browse files
author
m30070657
committed
Add pthread CPU affinity support
1 parent 5f1d893 commit 965f043

2 files changed

Lines changed: 95 additions & 0 deletions

File tree

src/bthread/task_control.cpp

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ DEFINE_int32(task_group_runqueue_capacity, 4096,
4242
DEFINE_int32(task_group_ntags, 1, "TaskGroup will be grouped by number ntags");
4343
DEFINE_bool(task_group_set_worker_name, true,
4444
"Whether to set the name of the worker thread");
45+
DEFINE_bool(thread_affinity, false, "Whether to Bind Cores");
46+
DEFINE_string(cpu_set , "",
47+
"Set of CPUs to which cores are bound, for example, 0-3,5,6-7; default: all");
4548

4649
namespace bthread {
4750

@@ -58,6 +61,7 @@ extern pthread_mutex_t g_task_control_mutex;
5861
extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
5962
void (*g_worker_startfn)() = NULL;
6063
void (*g_tagged_worker_startfn)(bthread_tag_t) = NULL;
64+
std::vector<unsigned> TaskControl::_cpus;
6165

6266
// May be called in other modules to run startfn in non-worker pthreads.
6367
void run_worker_startfn() {
@@ -74,8 +78,17 @@ void run_tagged_worker_startfn(bthread_tag_t tag) {
7478

7579
struct WorkerThreadArgs {
7680
WorkerThreadArgs(TaskControl* _c, bthread_tag_t _t) : c(_c), tag(_t) {}
81+
82+
WorkerThreadArgs* set_cpuId(unsigned _cpuId) {
83+
if (FLAGS_thread_affinity) {
84+
cpuId = _cpuId;
85+
}
86+
return this;
87+
}
88+
7789
TaskControl* c;
7890
bthread_tag_t tag;
91+
unsigned cpuId;
7992
};
8093

8194
void* TaskControl::worker_thread(void* arg) {
@@ -87,6 +100,9 @@ void* TaskControl::worker_thread(void* arg) {
87100
auto dummy = static_cast<WorkerThreadArgs*>(arg);
88101
auto c = dummy->c;
89102
auto tag = dummy->tag;
103+
if (FLAGS_thread_affinity) {
104+
bind_thread(pthread_self(), _cpus[dummy->cpuId]);
105+
}
90106
delete dummy;
91107
run_tagged_worker_startfn(tag);
92108

@@ -209,6 +225,13 @@ int TaskControl::init(int concurrency) {
209225
}
210226
_concurrency = concurrency;
211227

228+
if (FLAGS_thread_affinity) {
229+
if (parse_cpuset(FLAGS_cpu_set, _cpus) == -1 || _cpus.empty()) {
230+
LOG(ERROR) << "invalid cpuset=" << FLAGS_cpu_set;
231+
return -1;
232+
}
233+
}
234+
212235
// task group group by tags
213236
for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
214237
_tagged_ngroup[i].store(0, std::memory_order_relaxed);
@@ -241,6 +264,7 @@ int TaskControl::init(int concurrency) {
241264
_workers.resize(_concurrency);
242265
for (int i = 0; i < _concurrency; ++i) {
243266
auto arg = new WorkerThreadArgs(this, i % FLAGS_task_group_ntags);
267+
arg->set_cpuId(i % _cpus.size());
244268
const int rc = pthread_create(&_workers[i], NULL, worker_thread, arg);
245269
if (rc) {
246270
delete arg;
@@ -284,6 +308,7 @@ int TaskControl::add_workers(int num, bthread_tag_t tag) {
284308
// _concurrency before create a worker.
285309
_concurrency.fetch_add(1);
286310
auto arg = new WorkerThreadArgs(this, tag);
311+
arg->set_cpuId((i + old_concurency) % _cpus.size());
287312
const int rc = pthread_create(
288313
&_workers[i + old_concurency], NULL, worker_thread, arg);
289314
if (rc) {
@@ -309,6 +334,43 @@ TaskGroup* TaskControl::choose_one_group(bthread_tag_t tag) {
309334
return NULL;
310335
}
311336

337+
int TaskControl::parse_cpuset(std::string value, std::vector<unsigned>& cpus) {
338+
static std::regex r("(\\d+-)?(\\d+)(,(\\d+-)?(\\d+))*");
339+
std::smatch match;
340+
std::set<unsigned> cpuset;
341+
if (value.empty()) {
342+
cpus = get_current_cpus();
343+
return 0;
344+
}
345+
if (std::regex_match(value, match, r)) {
346+
for (butil::StringSplitter split(value.data(), ','); split; ++split) {
347+
butil::StringPiece cpuIds(split.field(), split.length());
348+
cpuIds.trim_spaces();
349+
butil::StringPiece beg = cpuIds;
350+
butil::StringPiece end = cpuIds;
351+
auto dash = cpuIds.find('-');
352+
if (dash != cpuIds.npos) {
353+
beg = cpuIds.substr(0, dash);
354+
end = cpuIds.substr(dash + 1);
355+
}
356+
unsigned b;
357+
unsigned e;
358+
int ret;
359+
ret = butil::StringSplitter(beg, '\t').to_uint(&b);
360+
ret = ret | butil::StringSplitter(end, '\t').to_uint(&e);
361+
if (ret != 0 || b > e) {
362+
return -1;
363+
}
364+
for (auto i = b; i <= e; ++i) {
365+
cpuset.insert(i);
366+
}
367+
}
368+
cpus.assign(cpuset.begin(), cpuset.end());
369+
return 0;
370+
}
371+
return -1;
372+
}
373+
312374
#ifdef BRPC_BTHREAD_TRACER
313375
void TaskControl::stack_trace(std::ostream& os, bthread_t tid) {
314376
_task_tracer.Trace(os, tid);

src/bthread/task_control.h

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
#include <vector>
3131
#include <array>
3232
#include <memory>
33+
#include <set>
34+
#include <regex>
3335
#include "butil/atomicops.h" // butil::atomic
3436
#include "bvar/bvar.h" // bvar::PassiveStatus
3537
#include "bthread/task_tracer.h"
@@ -91,6 +93,36 @@ friend bthread_t init_for_pthread_stack_trace();
9193
// If this method is called after init(), it never returns NULL.
9294
TaskGroup* choose_one_group(bthread_tag_t tag);
9395

96+
static int parse_cpuset(std::string value, std::vector<unsigned>& cpus);
97+
98+
static inline void bind_thread(pthread_t pthread, unsigned cpuId) {
99+
cpu_set_t cs;
100+
CPU_ZERO(&cs);
101+
CPU_SET(cpuId, &cs);
102+
auto r = pthread_setaffinity_np(pthread, sizeof(cs), &cs);
103+
if (r != 0) {
104+
LOG(WARNING) << "Failed to bind thread to cpu: " << cpuId;
105+
}
106+
(void)r;
107+
}
108+
109+
static inline std::vector<unsigned> get_current_cpus() {
110+
cpu_set_t cs;
111+
auto r = pthread_getaffinity_np(pthread_self(), sizeof(cs), &cs);
112+
if (r != 0) {
113+
LOG(ERROR) << "get thread affinity failed";
114+
exit(1);
115+
}
116+
std::vector<unsigned> cpus;
117+
unsigned nr = CPU_COUNT(&cs);
118+
for (int cpu = 0; cpu < CPU_SETSIZE && cpus.size() < nr; cpu++) {
119+
if (CPU_ISSET(cpu, &cs)) {
120+
cpus.push_back(cpu);
121+
}
122+
}
123+
return cpus;
124+
}
125+
94126
#ifdef BRPC_BTHREAD_TRACER
95127
// A stacktrace of bthread can be helpful in debugging.
96128
void stack_trace(std::ostream& os, bthread_t tid);
@@ -139,6 +171,7 @@ friend bthread_t init_for_pthread_stack_trace();
139171
bool _stop;
140172
butil::atomic<int> _concurrency;
141173
std::vector<pthread_t> _workers;
174+
static std::vector<unsigned> _cpus;
142175
butil::atomic<int> _next_worker_id;
143176

144177
bvar::Adder<int64_t> _nworkers;

0 commit comments

Comments
 (0)