@@ -42,7 +42,6 @@ DEFINE_int32(task_group_runqueue_capacity, 4096,
4242DEFINE_int32 (task_group_ntags, 1 , " TaskGroup will be grouped by number ntags" );
4343DEFINE_bool (task_group_set_worker_name, true ,
4444 " Whether to set the name of the worker thread" );
45- DEFINE_bool (thread_affinity, false , " Whether to Bind Cores" );
4645DEFINE_string (cpu_set, " " ,
4746 " Set of CPUs to which cores are bound, for example, 0-3,5,6-7; default: all" );
4847
@@ -79,13 +78,6 @@ void run_tagged_worker_startfn(bthread_tag_t tag) {
7978struct WorkerThreadArgs {
8079 WorkerThreadArgs (TaskControl* _c, bthread_tag_t _t) : c(_c), tag(_t) {}
8180
82- WorkerThreadArgs* set_cpuId (unsigned _cpuId) {
83- if (FLAGS_thread_affinity) {
84- cpuId = _cpuId;
85- }
86- return this ;
87- }
88-
8981 TaskControl* c;
9082 bthread_tag_t tag;
9183 unsigned cpuId;
@@ -100,8 +92,14 @@ void* TaskControl::worker_thread(void* arg) {
10092 auto dummy = static_cast <WorkerThreadArgs*>(arg);
10193 auto c = dummy->c ;
10294 auto tag = dummy->tag ;
103- if (FLAGS_thread_affinity) {
104- bind_thread (pthread_self (), _cpus[dummy->cpuId ]);
95+ if (!_cpus.empty ()) {
96+ if (dummy->cpuId < _cpus.size ()) {
97+ bind_thread (pthread_self (), _cpus[dummy->cpuId ]);
98+ } else {
99+ LOG (ERROR) << " Failed to bind cpuId=" << dummy->cpuId
100+ << " is out of bounds for _cpus (size="
101+ << _cpus.size () << " )" ;
102+ }
105103 }
106104 delete dummy;
107105 run_tagged_worker_startfn (tag);
@@ -225,8 +223,8 @@ int TaskControl::init(int concurrency) {
225223 }
226224 _concurrency = concurrency;
227225
228- if (FLAGS_thread_affinity ) {
229- if (parse_cpuset (FLAGS_cpu_set, _cpus) == -1 || _cpus. empty () ) {
226+ if (!FLAGS_cpu_set. empty () ) {
227+ if (parse_cpuset (FLAGS_cpu_set, _cpus) == -1 ) {
230228 LOG (ERROR) << " invalid cpuset=" << FLAGS_cpu_set;
231229 return -1 ;
232230 }
@@ -264,7 +262,9 @@ int TaskControl::init(int concurrency) {
264262 _workers.resize (_concurrency);
265263 for (int i = 0 ; i < _concurrency; ++i) {
266264 auto arg = new WorkerThreadArgs (this , i % FLAGS_task_group_ntags);
267- arg->set_cpuId (i % _cpus.size ());
265+ if (!_cpus.empty ()) {
266+ arg->cpuId = i % _cpus.size ();
267+ }
268268 const int rc = pthread_create (&_workers[i], NULL , worker_thread, arg);
269269 if (rc) {
270270 delete arg;
@@ -308,7 +308,9 @@ int TaskControl::add_workers(int num, bthread_tag_t tag) {
308308 // _concurrency before create a worker.
309309 _concurrency.fetch_add (1 );
310310 auto arg = new WorkerThreadArgs (this , tag);
311- arg->set_cpuId ((i + old_concurency) % _cpus.size ());
311+ if (!_cpus.empty ()) {
312+ arg->cpuId = (i + old_concurency) % _cpus.size ();
313+ }
312314 const int rc = pthread_create (
313315 &_workers[i + old_concurency], NULL , worker_thread, arg);
314316 if (rc) {
@@ -339,8 +341,7 @@ int TaskControl::parse_cpuset(std::string value, std::vector<unsigned>& cpus) {
339341 std::smatch match;
340342 std::set<unsigned > cpuset;
341343 if (value.empty ()) {
342- cpus = get_current_cpus ();
343- return 0 ;
344+ return -1 ;
344345 }
345346 if (std::regex_match (value, match, r)) {
346347 for (butil::StringSplitter split (value.data (), ' ,' ); split; ++split) {
0 commit comments