Skip to content

Commit 2169ba1

Browse files
committed
review & Support MACOS thread affinity
1 parent 2771e6a commit 2169ba1

2 files changed

Lines changed: 37 additions & 30 deletions

File tree

src/bthread/task_control.cpp

Lines changed: 16 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ DEFINE_int32(task_group_ntags, 1, "TaskGroup will be grouped by number ntags");
4343
DEFINE_bool(task_group_set_worker_name, true,
4444
"Whether to set the name of the worker thread");
4545
DEFINE_string(cpu_set, "",
46-
"Set of CPUs to which cores are bound, for example, 0-3,5,6-7; default: all");
46+
"Set of CPUs to which cores are bound. "
47+
"for example, 0-3,5,7; default: disable");
4748

4849
namespace bthread {
4950

@@ -60,7 +61,6 @@ extern pthread_mutex_t g_task_control_mutex;
6061
extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
6162
void (*g_worker_startfn)() = NULL;
6263
void (*g_tagged_worker_startfn)(bthread_tag_t) = NULL;
63-
std::vector<unsigned> TaskControl::_cpus;
6464

6565
// May be called in other modules to run startfn in non-worker pthreads.
6666
void run_worker_startfn() {
@@ -77,10 +77,8 @@ void run_tagged_worker_startfn(bthread_tag_t tag) {
7777

7878
struct WorkerThreadArgs {
7979
WorkerThreadArgs(TaskControl* _c, bthread_tag_t _t) : c(_c), tag(_t) {}
80-
8180
TaskControl* c;
8281
bthread_tag_t tag;
83-
unsigned cpuId;
8482
};
8583

8684
void* TaskControl::worker_thread(void* arg) {
@@ -92,15 +90,6 @@ void* TaskControl::worker_thread(void* arg) {
9290
auto dummy = static_cast<WorkerThreadArgs*>(arg);
9391
auto c = dummy->c;
9492
auto tag = dummy->tag;
95-
if (!_cpus.empty()) {
96-
if (dummy->cpuId < _cpus.size()) {
97-
bind_thread(pthread_self(), _cpus[dummy->cpuId]);
98-
} else {
99-
LOG(ERROR) << "Failed to bind cpuId=" << dummy->cpuId
100-
<< " is out of bounds for _cpus (size="
101-
<< _cpus.size() << ")";
102-
}
103-
}
10493
delete dummy;
10594
run_tagged_worker_startfn(tag);
10695

@@ -113,10 +102,14 @@ void* TaskControl::worker_thread(void* arg) {
113102

114103
g->_tid = pthread_self();
115104

105+
int worker_id = c->_next_worker_id.fetch_add(
106+
1, butil::memory_order_relaxed);
107+
if (!c->_cpus.empty()) {
108+
bind_thread(pthread_self(), c->_cpus[worker_id % c->_cpus.size()]);
109+
}
116110
if (FLAGS_task_group_set_worker_name) {
117111
std::string worker_thread_name = butil::string_printf(
118-
"brpc_wkr:%d-%d", g->tag(),
119-
c->_next_worker_id.fetch_add(1, butil::memory_order_relaxed));
112+
"brpc_wkr:%d-%d", g->tag(), worker_id);
120113
butil::PlatformThread::SetName(worker_thread_name.c_str());
121114
}
122115
BT_VLOG << "Created worker=" << pthread_self() << " tid=" << g->_tid
@@ -262,9 +255,6 @@ int TaskControl::init(int concurrency) {
262255
_workers.resize(_concurrency);
263256
for (int i = 0; i < _concurrency; ++i) {
264257
auto arg = new WorkerThreadArgs(this, i % FLAGS_task_group_ntags);
265-
if (!_cpus.empty()) {
266-
arg->cpuId = i % _cpus.size();
267-
}
268258
const int rc = pthread_create(&_workers[i], NULL, worker_thread, arg);
269259
if (rc) {
270260
delete arg;
@@ -308,9 +298,6 @@ int TaskControl::add_workers(int num, bthread_tag_t tag) {
308298
// _concurrency before create a worker.
309299
_concurrency.fetch_add(1);
310300
auto arg = new WorkerThreadArgs(this, tag);
311-
if (!_cpus.empty()) {
312-
arg->cpuId = (i + old_concurency) % _cpus.size();
313-
}
314301
const int rc = pthread_create(
315302
&_workers[i + old_concurency], NULL, worker_thread, arg);
316303
if (rc) {
@@ -345,14 +332,14 @@ int TaskControl::parse_cpuset(std::string value, std::vector<unsigned>& cpus) {
345332
}
346333
if (std::regex_match(value, match, r)) {
347334
for (butil::StringSplitter split(value.data(), ','); split; ++split) {
348-
butil::StringPiece cpuIds(split.field(), split.length());
349-
cpuIds.trim_spaces();
350-
butil::StringPiece begin = cpuIds;
351-
butil::StringPiece end = cpuIds;
352-
auto dash = cpuIds.find('-');
353-
if (dash != cpuIds.npos) {
354-
begin = cpuIds.substr(0, dash);
355-
end = cpuIds.substr(dash + 1);
335+
butil::StringPiece cpu_ids(split.field(), split.length());
336+
cpu_ids.trim_spaces();
337+
butil::StringPiece begin = cpu_ids;
338+
butil::StringPiece end = cpu_ids;
339+
auto dash = cpu_ids.find('-');
340+
if (dash != cpu_ids.npos) {
341+
begin = cpu_ids.substr(0, dash);
342+
end = cpu_ids.substr(dash + 1);
356343
}
357344
unsigned first = UINT_MAX;
358345
unsigned last = 0;

src/bthread/task_control.h

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232
#include <memory>
3333
#include <set>
3434
#include <regex>
35+
#if defined(OS_MACOSX)
36+
#include <mach/mach.h>
37+
#endif
3538
#include "butil/atomicops.h" // butil::atomic
3639
#include "bvar/bvar.h" // bvar::PassiveStatus
3740
#include "bthread/task_tracer.h"
@@ -96,6 +99,7 @@ friend bthread_t init_for_pthread_stack_trace();
9699
static int parse_cpuset(std::string value, std::vector<unsigned>& cpus);
97100

98101
static inline void bind_thread(pthread_t pthread, unsigned cpuId) {
102+
#if defined(OS_LINUX)
99103
cpu_set_t cs;
100104
CPU_ZERO(&cs);
101105
CPU_SET(cpuId, &cs);
@@ -104,6 +108,22 @@ friend bthread_t init_for_pthread_stack_trace();
104108
LOG(WARNING) << "Failed to bind thread to cpu: " << cpuId;
105109
}
106110
(void)r;
111+
#elif defined(OS_MACOSX)
112+
thread_port_t mach_thread = pthread_mach_thread_np(pthread);
113+
if (mach_thread != MACH_PORT_NULL) {
114+
LOG(WARNING) << "mach_thread is null"
115+
<< "Failed to bind thread to cpu: " << cpuId;
116+
return;
117+
}
118+
thread_affinity_policy_data_t policy;
119+
policy.affinity_tag = cpuId;
120+
if (thread_policy_set(mach_thread,
121+
THREAD_AFFINITY_POLICY,
122+
(thread_policy_t)&policy,
123+
THREAD_AFFINITY_POLICY_COUNT) != KERN_SUCCESS) {
124+
LOG(WARNING) << "Failed to bind thread to cpu: " << cpuId;
125+
}
126+
#endif
107127
}
108128

109129
#ifdef BRPC_BTHREAD_TRACER
@@ -154,7 +174,7 @@ friend bthread_t init_for_pthread_stack_trace();
154174
bool _stop;
155175
butil::atomic<int> _concurrency;
156176
std::vector<pthread_t> _workers;
157-
static std::vector<unsigned> _cpus;
177+
std::vector<unsigned> _cpus;
158178
butil::atomic<int> _next_worker_id;
159179

160180
bvar::Adder<int64_t> _nworkers;

0 commit comments

Comments
 (0)