Skip to content

Commit ac3d8a1

Browse files
committed
local resume_rq each task group
1 parent 2986f4d commit ac3d8a1

4 files changed

Lines changed: 16 additions & 31 deletions

File tree

src/bthread/task_control.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,10 @@ bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
362362
TaskGroup* g = _groups[s % ngroup];
363363
// g is possibly NULL because of concurrent _destroy_group
364364
if (g) {
365+
if (g->pop_resume_task(tid)) {
366+
stolen = true;
367+
break;
368+
}
365369
if (g->_rq.steal(tid)) {
366370
stolen = true;
367371
break;
@@ -432,7 +436,7 @@ void TaskControl::print_resume_q_sizes(std::ostream &os) {
432436
// ngroup > _ngroup: nums[_ngroup ... ngroup-1] = 0
433437
// ngroup < _ngroup: just ignore _groups[_ngroup ... ngroup-1]
434438
for (size_t i = 0; i < ngroup; ++i) {
435-
nums[i] = (_groups[i] ? _groups[i]->_resume_rq_cnt->load(std::memory_order_relaxed) : 0);
439+
nums[i] = (_groups[i] ? _groups[i]->_resume_rq_cnt.load(std::memory_order_relaxed) : 0);
436440
}
437441
}
438442
for (size_t i = 0; i < ngroup; ++i) {

src/bthread/task_group.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,9 +197,9 @@ TaskGroup::TaskGroup(TaskControl* c)
197197
#ifndef NDEBUG
198198
, _sched_recursive_guard(0)
199199
#endif
200-
, _resume_rq_cnt(ResumeRunQueue::Instance().first)
201-
, _resume_rq(ResumeRunQueue::Instance().second)
202-
, _resume_consumer_token(*_resume_rq)
200+
, _resume_rq_cnt(0)
201+
, _resume_rq(1000)
202+
, _resume_consumer_token(_resume_rq)
203203
{
204204
_steal_seed = butil::fast_rand();
205205
_steal_offset = OFFSET_TABLE[_steal_seed % ARRAY_SIZE(OFFSET_TABLE)];

src/bthread/task_group.h

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -52,25 +52,6 @@ class ExitException : public std::exception {
5252
void* _value;
5353
};
5454

55-
// Global resumed tasks.
56-
class ResumeRunQueue {
57-
public:
58-
static std::pair<std::shared_ptr<std::atomic<int>>,
59-
std::shared_ptr<moodycamel::ConcurrentQueue<bthread_t>>> Instance() {
60-
static ResumeRunQueue instance;
61-
return {instance.queue_size_, instance.concurrent_queue_};
62-
}
63-
64-
private:
65-
ResumeRunQueue() {
66-
queue_size_ = std::make_shared<std::atomic<int>>(0);
67-
concurrent_queue_ = std::make_shared<moodycamel::ConcurrentQueue<bthread_t>>(10000);
68-
}
69-
70-
std::shared_ptr<std::atomic<int>> queue_size_;
71-
std::shared_ptr<moodycamel::ConcurrentQueue<bthread_t>> concurrent_queue_;
72-
};
73-
7455
// Thread-local group of tasks.
7556
// Notice that most methods involving context switching are static otherwise
7657
// pointer `this' may change after wakeup. The **pg parameters in following
@@ -277,8 +258,8 @@ friend class TaskControl;
277258

278259
int _sched_recursive_guard;
279260

280-
std::shared_ptr<std::atomic<int>> _resume_rq_cnt;
281-
std::shared_ptr<moodycamel::ConcurrentQueue<bthread_t>> _resume_rq;
261+
std::atomic<int> _resume_rq_cnt;
262+
moodycamel::ConcurrentQueue<bthread_t> _resume_rq;
282263
moodycamel::ConsumerToken _resume_consumer_token;
283264
};
284265

src/bthread/task_group_inl.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,21 +98,21 @@ inline void TaskGroup::push_rq(bthread_t tid) {
9898
}
9999

100100
inline bool TaskGroup::pop_resume_task(bthread_t* tid) {
101-
int tmp_cnt = _resume_rq_cnt->load(std::memory_order_relaxed);
102-
if (tmp_cnt>0 && _resume_rq_cnt->compare_exchange_strong(tmp_cnt, tmp_cnt-1)){
103-
if(_resume_rq->try_dequeue(_resume_consumer_token, *tid)){
101+
int tmp_cnt = _resume_rq_cnt.load(std::memory_order_relaxed);
102+
if (tmp_cnt > 0 && _resume_rq_cnt.compare_exchange_strong(tmp_cnt, tmp_cnt-1)){
103+
if(_resume_rq.try_dequeue(_resume_consumer_token, *tid)){
104104
return true;
105105
}
106106
else {
107-
(*_resume_rq_cnt) ++;
107+
_resume_rq_cnt++;
108108
}
109109
}
110110
return false;
111111
}
112112

113113
inline bool TaskGroup::push_resume_task(bthread_t tid){
114-
if(_resume_rq->enqueue(tid)){
115-
(*_resume_rq_cnt) ++;
114+
if(_resume_rq.enqueue(tid)){
115+
_resume_rq_cnt++;
116116
return true;
117117
}
118118
return false;

0 commit comments

Comments
 (0)