Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 85 additions & 2 deletions src/bthread/task_control.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
// Date: Tue Jul 10 17:40:58 CST 2012

#include <pthread.h>
#include <set>
#include <regex>
#include <sys/syscall.h> // SYS_gettid
#include "butil/scoped_lock.h" // BAIDU_SCOPED_LOCK
#include "butil/errno.h" // berror
Expand All @@ -34,6 +36,9 @@
#include "bthread/timer_thread.h" // global_timer_thread
#include <gflags/gflags.h>
#include "bthread/log.h"
#if defined(OS_MACOSX)
#include <mach/mach.h>
#endif

DEFINE_int32(task_group_delete_delay, 1,
"delay deletion of TaskGroup for so many seconds");
Expand All @@ -42,6 +47,9 @@ DEFINE_int32(task_group_runqueue_capacity, 4096,
DEFINE_int32(task_group_ntags, 1, "TaskGroup will be grouped by number ntags");
DEFINE_bool(task_group_set_worker_name, true,
"Whether to set the name of the worker thread");
DEFINE_string(cpu_set, "",
"Set of CPUs to which cores are bound. "
"for example, 0-3,5,7; default: disable");

namespace bthread {

Expand Down Expand Up @@ -99,10 +107,14 @@ void* TaskControl::worker_thread(void* arg) {

g->_tid = pthread_self();

int worker_id = c->_next_worker_id.fetch_add(
1, butil::memory_order_relaxed);
if (!c->_cpus.empty()) {
bind_thread_to_cpu(pthread_self(), c->_cpus[worker_id % c->_cpus.size()]);
}
if (FLAGS_task_group_set_worker_name) {
std::string worker_thread_name = butil::string_printf(
"brpc_wkr:%d-%d", g->tag(),
c->_next_worker_id.fetch_add(1, butil::memory_order_relaxed));
"brpc_wkr:%d-%d", g->tag(), worker_id);
butil::PlatformThread::SetNameSimple(worker_thread_name.c_str());
}
BT_VLOG << "Created worker=" << pthread_self() << " tid=" << g->_tid
Expand Down Expand Up @@ -209,6 +221,13 @@ int TaskControl::init(int concurrency) {
}
_concurrency = concurrency;

if (!FLAGS_cpu_set.empty()) {
if (parse_cpuset(FLAGS_cpu_set, _cpus) == -1) {
LOG(ERROR) << "invalid cpuset=" << FLAGS_cpu_set;
return -1;
}
}

// task group group by tags
for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
_tagged_ngroup[i].store(0, std::memory_order_relaxed);
Expand Down Expand Up @@ -309,6 +328,70 @@ TaskGroup* TaskControl::choose_one_group(bthread_tag_t tag) {
return NULL;
}

int TaskControl::parse_cpuset(std::string value, std::vector<unsigned>& cpus) {
static std::regex r("(\\d+-)?(\\d+)(,(\\d+-)?(\\d+))*");
std::smatch match;
std::set<unsigned> cpuset;
if (value.empty()) {
return -1;
}
if (std::regex_match(value, match, r)) {
for (butil::StringSplitter split(value.data(), ','); split; ++split) {
butil::StringPiece cpu_ids(split.field(), split.length());
cpu_ids.trim_spaces();
butil::StringPiece begin = cpu_ids;
butil::StringPiece end = cpu_ids;
auto dash = cpu_ids.find('-');
if (dash != cpu_ids.npos) {
begin = cpu_ids.substr(0, dash);
end = cpu_ids.substr(dash + 1);
}
unsigned first = UINT_MAX;
unsigned last = 0;
int ret;
ret = butil::StringSplitter(begin, '\t').to_uint(&first);
ret = ret | butil::StringSplitter(end, '\t').to_uint(&last);
if (ret != 0 || first > last) {
return -1;
}
for (auto i = first; i <= last; ++i) {
cpuset.insert(i);
}
}
cpus.assign(cpuset.begin(), cpuset.end());
return 0;
}
return -1;
}

void TaskControl::bind_thread_to_cpu(pthread_t pthread, unsigned cpu_id) {
#if defined(OS_LINUX)
cpu_set_t cs;
CPU_ZERO(&cs);
CPU_SET(cpu_id, &cs);
auto r = pthread_setaffinity_np(pthread, sizeof(cs), &cs);
if (r != 0) {
LOG(WARNING) << "Failed to bind thread to cpu: " << cpu_id;
}
(void)r;
#elif defined(OS_MACOSX)
thread_port_t mach_thread = pthread_mach_thread_np(pthread);
if (mach_thread != MACH_PORT_NULL) {
LOG(WARNING) << "mach_thread is null"
<< "Failed to bind thread to cpu: " << cpu_id;
return;
}
thread_affinity_policy_data_t policy;
policy.affinity_tag = cpu_id;
if (thread_policy_set(mach_thread,
THREAD_AFFINITY_POLICY,
(thread_policy_t)&policy,
THREAD_AFFINITY_POLICY_COUNT) != KERN_SUCCESS) {
LOG(WARNING) << "Failed to bind thread to cpu: " << cpu_id;
}
#endif
}

#ifdef BRPC_BTHREAD_TRACER
void TaskControl::stack_trace(std::ostream& os, bthread_t tid) {
_task_tracer.Trace(os, tid);
Expand Down
5 changes: 5 additions & 0 deletions src/bthread/task_control.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ friend bthread_t init_for_pthread_stack_trace();
// If this method is called after init(), it never returns NULL.
TaskGroup* choose_one_group(bthread_tag_t tag);

static int parse_cpuset(std::string value, std::vector<unsigned>& cpus);

static void bind_thread_to_cpu(pthread_t pthread, unsigned cpu_id);

#ifdef BRPC_BTHREAD_TRACER
// A stacktrace of bthread can be helpful in debugging.
void stack_trace(std::ostream& os, bthread_t tid);
Expand Down Expand Up @@ -139,6 +143,7 @@ friend bthread_t init_for_pthread_stack_trace();
bool _stop;
butil::atomic<int> _concurrency;
std::vector<pthread_t> _workers;
std::vector<unsigned> _cpus;
butil::atomic<int> _next_worker_id;

bvar::Adder<int64_t> _nworkers;
Expand Down
Loading