Skip to content

Commit ffebd51

Browse files
committed
review & Support MACOS thread affinity
1 parent 4116fd7 commit ffebd51

2 files changed

Lines changed: 51 additions & 42 deletions

File tree

src/bthread/task_control.cpp

Lines changed: 49 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
// Date: Tue Jul 10 17:40:58 CST 2012
2121

2222
#include <pthread.h>
23+
#include <set>
24+
#include <regex>
2325
#include <sys/syscall.h> // SYS_gettid
2426
#include "butil/scoped_lock.h" // BAIDU_SCOPED_LOCK
2527
#include "butil/errno.h" // berror
@@ -34,6 +36,9 @@
3436
#include "bthread/timer_thread.h" // global_timer_thread
3537
#include <gflags/gflags.h>
3638
#include "bthread/log.h"
39+
#if defined(OS_MACOSX)
40+
#include <mach/mach.h>
41+
#endif
3742

3843
DEFINE_int32(task_group_delete_delay, 1,
3944
"delay deletion of TaskGroup for so many seconds");
@@ -43,7 +48,8 @@ DEFINE_int32(task_group_ntags, 1, "TaskGroup will be grouped by number ntags");
4348
DEFINE_bool(task_group_set_worker_name, true,
4449
"Whether to set the name of the worker thread");
4550
DEFINE_string(cpu_set, "",
46-
"Set of CPUs to which cores are bound, for example, 0-3,5,6-7; default: all");
51+
"Set of CPUs to which cores are bound. "
52+
"for example, 0-3,5,7; default: disable");
4753

4854
namespace bthread {
4955

@@ -60,7 +66,6 @@ extern pthread_mutex_t g_task_control_mutex;
6066
extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
6167
void (*g_worker_startfn)() = NULL;
6268
void (*g_tagged_worker_startfn)(bthread_tag_t) = NULL;
63-
std::vector<unsigned> TaskControl::_cpus;
6469

6570
// May be called in other modules to run startfn in non-worker pthreads.
6671
void run_worker_startfn() {
@@ -77,10 +82,8 @@ void run_tagged_worker_startfn(bthread_tag_t tag) {
7782

7883
struct WorkerThreadArgs {
7984
WorkerThreadArgs(TaskControl* _c, bthread_tag_t _t) : c(_c), tag(_t) {}
80-
8185
TaskControl* c;
8286
bthread_tag_t tag;
83-
unsigned cpuId;
8487
};
8588

8689
void* TaskControl::worker_thread(void* arg) {
@@ -92,15 +95,6 @@ void* TaskControl::worker_thread(void* arg) {
9295
auto dummy = static_cast<WorkerThreadArgs*>(arg);
9396
auto c = dummy->c;
9497
auto tag = dummy->tag;
95-
if (!_cpus.empty()) {
96-
if (dummy->cpuId < _cpus.size()) {
97-
bind_thread(pthread_self(), _cpus[dummy->cpuId]);
98-
} else {
99-
LOG(ERROR) << "Failed to bind cpuId=" << dummy->cpuId
100-
<< " is out of bounds for _cpus (size="
101-
<< _cpus.size() << ")";
102-
}
103-
}
10498
delete dummy;
10599
run_tagged_worker_startfn(tag);
106100

@@ -113,10 +107,14 @@ void* TaskControl::worker_thread(void* arg) {
113107

114108
g->_tid = pthread_self();
115109

110+
int worker_id = c->_next_worker_id.fetch_add(
111+
1, butil::memory_order_relaxed);
112+
if (!c->_cpus.empty()) {
113+
bind_thread_to_cpu(pthread_self(), c->_cpus[worker_id % c->_cpus.size()]);
114+
}
116115
if (FLAGS_task_group_set_worker_name) {
117116
std::string worker_thread_name = butil::string_printf(
118-
"brpc_wkr:%d-%d", g->tag(),
119-
c->_next_worker_id.fetch_add(1, butil::memory_order_relaxed));
117+
"brpc_wkr:%d-%d", g->tag(), worker_id);
120118
butil::PlatformThread::SetNameSimple(worker_thread_name.c_str());
121119
}
122120
BT_VLOG << "Created worker=" << pthread_self() << " tid=" << g->_tid
@@ -262,9 +260,6 @@ int TaskControl::init(int concurrency) {
262260
_workers.resize(_concurrency);
263261
for (int i = 0; i < _concurrency; ++i) {
264262
auto arg = new WorkerThreadArgs(this, i % FLAGS_task_group_ntags);
265-
if (!_cpus.empty()) {
266-
arg->cpuId = i % _cpus.size();
267-
}
268263
const int rc = pthread_create(&_workers[i], NULL, worker_thread, arg);
269264
if (rc) {
270265
delete arg;
@@ -308,9 +303,6 @@ int TaskControl::add_workers(int num, bthread_tag_t tag) {
308303
// _concurrency before create a worker.
309304
_concurrency.fetch_add(1);
310305
auto arg = new WorkerThreadArgs(this, tag);
311-
if (!_cpus.empty()) {
312-
arg->cpuId = (i + old_concurency) % _cpus.size();
313-
}
314306
const int rc = pthread_create(
315307
&_workers[i + old_concurency], NULL, worker_thread, arg);
316308
if (rc) {
@@ -345,14 +337,14 @@ int TaskControl::parse_cpuset(std::string value, std::vector<unsigned>& cpus) {
345337
}
346338
if (std::regex_match(value, match, r)) {
347339
for (butil::StringSplitter split(value.data(), ','); split; ++split) {
348-
butil::StringPiece cpuIds(split.field(), split.length());
349-
cpuIds.trim_spaces();
350-
butil::StringPiece begin = cpuIds;
351-
butil::StringPiece end = cpuIds;
352-
auto dash = cpuIds.find('-');
353-
if (dash != cpuIds.npos) {
354-
begin = cpuIds.substr(0, dash);
355-
end = cpuIds.substr(dash + 1);
340+
butil::StringPiece cpu_ids(split.field(), split.length());
341+
cpu_ids.trim_spaces();
342+
butil::StringPiece begin = cpu_ids;
343+
butil::StringPiece end = cpu_ids;
344+
auto dash = cpu_ids.find('-');
345+
if (dash != cpu_ids.npos) {
346+
begin = cpu_ids.substr(0, dash);
347+
end = cpu_ids.substr(dash + 1);
356348
}
357349
unsigned first = UINT_MAX;
358350
unsigned last = 0;
@@ -372,6 +364,34 @@ int TaskControl::parse_cpuset(std::string value, std::vector<unsigned>& cpus) {
372364
return -1;
373365
}
374366

367+
void TaskControl::bind_thread_to_cpu(pthread_t pthread, unsigned cpu_id) {
368+
#if defined(OS_LINUX)
369+
cpu_set_t cs;
370+
CPU_ZERO(&cs);
371+
CPU_SET(cpu_id, &cs);
372+
auto r = pthread_setaffinity_np(pthread, sizeof(cs), &cs);
373+
if (r != 0) {
374+
LOG(WARNING) << "Failed to bind thread to cpu: " << cpu_id;
375+
}
376+
(void)r;
377+
#elif defined(OS_MACOSX)
378+
thread_port_t mach_thread = pthread_mach_thread_np(pthread);
379+
if (mach_thread != MACH_PORT_NULL) {
380+
LOG(WARNING) << "mach_thread is null"
381+
<< "Failed to bind thread to cpu: " << cpu_id;
382+
return;
383+
}
384+
thread_affinity_policy_data_t policy;
385+
policy.affinity_tag = cpu_id;
386+
if (thread_policy_set(mach_thread,
387+
THREAD_AFFINITY_POLICY,
388+
(thread_policy_t)&policy,
389+
THREAD_AFFINITY_POLICY_COUNT) != KERN_SUCCESS) {
390+
LOG(WARNING) << "Failed to bind thread to cpu: " << cpu_id;
391+
}
392+
#endif
393+
}
394+
375395
#ifdef BRPC_BTHREAD_TRACER
376396
void TaskControl::stack_trace(std::ostream& os, bthread_t tid) {
377397
_task_tracer.Trace(os, tid);

src/bthread/task_control.h

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@
3030
#include <vector>
3131
#include <array>
3232
#include <memory>
33-
#include <set>
34-
#include <regex>
3533
#include "butil/atomicops.h" // butil::atomic
3634
#include "bvar/bvar.h" // bvar::PassiveStatus
3735
#include "bthread/task_tracer.h"
@@ -95,16 +93,7 @@ friend bthread_t init_for_pthread_stack_trace();
9593

9694
static int parse_cpuset(std::string value, std::vector<unsigned>& cpus);
9795

98-
static inline void bind_thread(pthread_t pthread, unsigned cpuId) {
99-
cpu_set_t cs;
100-
CPU_ZERO(&cs);
101-
CPU_SET(cpuId, &cs);
102-
auto r = pthread_setaffinity_np(pthread, sizeof(cs), &cs);
103-
if (r != 0) {
104-
LOG(WARNING) << "Failed to bind thread to cpu: " << cpuId;
105-
}
106-
(void)r;
107-
}
96+
static void bind_thread_to_cpu(pthread_t pthread, unsigned cpu_id);
10897

10998
#ifdef BRPC_BTHREAD_TRACER
11099
// A stacktrace of bthread can be helpful in debugging.
@@ -154,7 +143,7 @@ friend bthread_t init_for_pthread_stack_trace();
154143
bool _stop;
155144
butil::atomic<int> _concurrency;
156145
std::vector<pthread_t> _workers;
157-
static std::vector<unsigned> _cpus;
146+
std::vector<unsigned> _cpus;
158147
butil::atomic<int> _next_worker_id;
159148

160149
bvar::Adder<int64_t> _nworkers;

0 commit comments

Comments
 (0)