diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp index 9b0f45991d..5bcda94dcd 100644 --- a/src/bthread/bthread.cpp +++ b/src/bthread/bthread.cpp @@ -274,7 +274,7 @@ start_from_non_worker(bthread_t* __restrict tid, if (NULL == c) { return ENOMEM; } - auto tag = BTHREAD_TAG_DEFAULT; + bthread_tag_t tag = BTHREAD_TAG_DEFAULT; if (attr != NULL && attr->tag != BTHREAD_TAG_INVALID) { tag = attr->tag; } @@ -283,7 +283,7 @@ start_from_non_worker(bthread_t* __restrict tid, // 1. NOSIGNAL is often for creating many bthreads in batch, // inserting into the same TaskGroup maximizes the batch. // 2. bthread_flush() needs to know which TaskGroup to flush. - auto g = tls_task_group_nosignal; + TaskGroup* g = tls_task_group_nosignal; if (NULL == g) { g = c->choose_one_group(tag); tls_task_group_nosignal = g; @@ -298,7 +298,7 @@ start_from_non_worker(bthread_t* __restrict tid, // tag equal to thread local // tag equal to BTHREAD_TAG_INVALID BUTIL_FORCE_INLINE bool can_run_thread_local(const bthread_attr_t* __restrict attr) { - return attr == nullptr || attr->tag == bthread::tls_task_group->tag() || + return attr == nullptr || attr->tag == tls_task_group->tag() || attr->tag == BTHREAD_TAG_INVALID; } @@ -331,7 +331,7 @@ int bthread_start_urgent(bthread_t* __restrict tid, const bthread_attr_t* __restrict attr, void * (*fn)(void*), void* __restrict arg) { - bthread::TaskGroup* g = bthread::tls_task_group; + bthread::TaskGroup* g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group); if (g) { // if attribute is null use thread local task group if (bthread::can_run_thread_local(attr)) { @@ -345,7 +345,7 @@ int bthread_start_background(bthread_t* __restrict tid, const bthread_attr_t* __restrict attr, void * (*fn)(void*), void* __restrict arg) { - bthread::TaskGroup* g = bthread::tls_task_group; + bthread::TaskGroup* g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group); if (g) { // if attribute is null use thread local task group if (bthread::can_run_thread_local(attr)) { @@ -356,7 +356,7 @@ int bthread_start_background(bthread_t* __restrict tid, } void bthread_flush() { - bthread::TaskGroup* g = bthread::tls_task_group; + bthread::TaskGroup* g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group); if (g) { return g->flush_nosignal_tasks(); } @@ -368,8 +368,8 @@ void bthread_flush() { } } -int bthread_interrupt(bthread_t tid, bthread_tag_t tag) { - return bthread::TaskGroup::interrupt(tid, bthread::get_task_control(), tag); +int bthread_interrupt(bthread_t tid, bthread_tag_t /*tag*/) { + return bthread::TaskGroup::interrupt(tid, bthread::get_task_control()); } int bthread_stop(bthread_t tid) { @@ -382,7 +382,7 @@ int bthread_stopped(bthread_t tid) { } bthread_t bthread_self(void) { - bthread::TaskGroup* g = bthread::tls_task_group; + bthread::TaskGroup* g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group); // note: return 0 for main tasks now, which include main thread and // all work threads. So that we can identify main tasks from logs // more easily. This is probably questionable in the future. @@ -397,7 +397,7 @@ int bthread_equal(bthread_t t1, bthread_t t2) { } void bthread_exit(void* retval) { - bthread::TaskGroup* g = bthread::tls_task_group; + bthread::TaskGroup* g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group); if (g != NULL && !g->is_current_main_task()) { throw bthread::ExitException(retval); } else { @@ -511,7 +511,7 @@ int bthread_setconcurrency_by_tag(int num, bthread_tag_t tag) { } int bthread_about_to_quit() { - bthread::TaskGroup* g = bthread::tls_task_group; + bthread::TaskGroup* g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group); if (g != NULL) { bthread::TaskMeta* current_task = g->current_task(); if(!(current_task->attr.flags & BTHREAD_NEVER_QUIT)) { @@ -640,12 +640,12 @@ int bthread_list_join(bthread_list_t* list) { } bthread_tag_t bthread_self_tag(void) { - return bthread::tls_task_group != nullptr ? bthread::tls_task_group->tag() - : BTHREAD_TAG_DEFAULT; + bthread::TaskGroup* g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group); + return g != NULL ? g->tag() : BTHREAD_TAG_DEFAULT; } uint64_t bthread_cpu_clock_ns(void) { - bthread::TaskGroup* g = bthread::tls_task_group; + bthread::TaskGroup* g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group); if (g != NULL && !g->is_current_main_task()) { return g->current_task_cpu_clock_ns(); } diff --git a/src/bthread/butex.cpp b/src/bthread/butex.cpp index aca1281670..92de900cf0 100644 --- a/src/bthread/butex.cpp +++ b/src/bthread/butex.cpp @@ -97,7 +97,6 @@ struct ButexBthreadWaiter : public ButexWaiter { Butex* initial_butex; TaskControl* control; const timespec* abstime; - bthread_tag_t tag; }; // pthread_task or main_task allocates this structure on stack and queue it @@ -320,7 +319,7 @@ int butex_wake(void* arg, bool nosignal) { } ButexBthreadWaiter* bbw = static_cast(front); unsleep_if_necessary(bbw, get_global_timer_thread()); - TaskGroup* g = get_task_group(bbw->control, bbw->tag); + TaskGroup* g = get_task_group(bbw->control, bbw->task_meta->attr.tag); if (g == tls_task_group) { run_in_local_task_group(g, bbw->task_meta, nosignal); } else { @@ -373,7 +372,7 @@ int butex_wake_n(void* arg, size_t n, bool nosignal) { bthread_waiters.tail()->value()); w->RemoveFromList(); unsleep_if_necessary(w, get_global_timer_thread()); - auto g = get_task_group(w->control, w->tag); + auto g = get_task_group(w->control, w->task_meta->attr.tag); g->ready_to_run_general(w->task_meta, true); nwakeups[g->tag()] = g; ++nwakeup; @@ -384,7 +383,7 @@ int butex_wake_n(void* arg, size_t n, bool nosignal) { g->flush_nosignal_tasks_general(); } } - auto g = get_task_group(next->control, next->tag); + auto g = get_task_group(next->control, next->task_meta->attr.tag); if (g == tls_task_group) { run_in_local_task_group(g, next->task_meta, nosignal); } else { @@ -446,7 +445,7 @@ int butex_wake_except(void* arg, bthread_t excluded_bthread) { ButexBthreadWaiter* w = static_cast(bthread_waiters.tail()->value()); w->RemoveFromList(); unsleep_if_necessary(w, get_global_timer_thread()); - auto g = get_task_group(w->control, w->tag); + auto g = get_task_group(w->control, w->task_meta->attr.tag); g->ready_to_run_general(w->task_meta, true); nwakeups[g->tag()] = g; ++nwakeup; @@ -489,11 +488,12 @@ int butex_requeue(void* arg, void* arg2) { } ButexBthreadWaiter* bbw = static_cast(front); unsleep_if_necessary(bbw, get_global_timer_thread()); - auto g = is_same_tag(bbw->tag) ? tls_task_group : NULL; + auto g = is_same_tag(bbw->task_meta->attr.tag) ? tls_task_group : NULL; if (g) { TaskGroup::exchange(&g, bbw->task_meta); } else { - bbw->control->choose_one_group(bbw->tag)->ready_to_run_remote(bbw->task_meta); + g = bbw->control->choose_one_group(bbw->task_meta->attr.tag); + g->ready_to_run_remote(bbw->task_meta); } return 1; } @@ -531,7 +531,8 @@ inline bool erase_from_butex(ButexWaiter* bw, bool wakeup, WaiterState state) { if (erased && wakeup) { if (bw->tid) { ButexBthreadWaiter* bbw = static_cast(bw); - get_task_group(bbw->control, bbw->tag)->ready_to_run_general(bbw->task_meta); + auto g = get_task_group(bbw->control, bbw->task_meta->attr.tag); + g->ready_to_run_general(bbw->task_meta); } else { ButexPthreadWaiter* pw = static_cast(bw); wakeup_pthread(pw); @@ -691,7 +692,6 @@ int butex_wait(void* arg, int expected_value, const timespec* abstime, bool prep bbw.initial_butex = b; bbw.control = g->control(); bbw.abstime = abstime; - bbw.tag = g->tag(); if (abstime != NULL) { // Schedule timer before queueing. If the timer is triggered before diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp index 347dbd24b4..1aa69c2c35 100644 --- a/src/bthread/task_control.cpp +++ b/src/bthread/task_control.cpp @@ -340,7 +340,7 @@ int TaskControl::add_workers(int num, bthread_tag_t tag) { } TaskGroup* TaskControl::choose_one_group(bthread_tag_t tag) { - CHECK(tag >= BTHREAD_TAG_DEFAULT && tag < FLAGS_task_group_ntags); + CHECK(tag >= BTHREAD_TAG_DEFAULT && tag < FLAGS_task_group_ntags) << tag; auto& groups = tag_group(tag); const auto ngroup = tag_ngroup(tag).load(butil::memory_order_acquire); if (ngroup != 0) { diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp index c0804c9a63..0fd0995564 100644 --- a/src/bthread/task_group.cpp +++ b/src/bthread/task_group.cpp @@ -351,7 +351,7 @@ void TaskGroup::asan_task_runner(intptr_t) { void TaskGroup::task_runner(intptr_t skip_remained) { // NOTE: tls_task_group is volatile since tasks are moved around // different groups. - TaskGroup* g = tls_task_group; + TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group); #ifdef BRPC_BTHREAD_TRACER TaskTracer::set_running_status(g->tid(), g->_cur_meta); #endif // BRPC_BTHREAD_TRACER @@ -526,13 +526,15 @@ int TaskGroup::start_foreground(TaskGroup** pg, m->cpuwide_start_ns = start_ns; m->stat = EMPTY_STAT; m->tid = make_tid(*m->version_butex, slot); - m->priority_index = pg ? (*pg)->_cur_meta->priority_index : -1; + + TaskGroup* g = *pg; + m->priority_index = g->_cur_meta->priority_index; + m->attr.tag = g->tag(); *th = m->tid; if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) { LOG(INFO) << "Started bthread " << m->tid; } - TaskGroup* g = *pg; g->_control->_nbthreads << 1; g->_control->tag_nbthreads(g->tag()) << 1; #ifdef BRPC_BTHREAD_TRACER @@ -601,6 +603,7 @@ int TaskGroup::start_background(bthread_t* __restrict th, if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) { LOG(INFO) << "Started bthread " << m->tid; } + m->attr.tag = tag(); _control->_nbthreads << 1; _control->tag_nbthreads(tag()) << 1; #ifdef BRPC_BTHREAD_TRACER @@ -635,7 +638,7 @@ int TaskGroup::join(bthread_t tid, void** return_value) { // The bthread is not created yet, this join is definitely wrong. return EINVAL; } - TaskGroup* g = tls_task_group; + TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group); if (g != NULL && g->current_tid() == tid) { // joining self causes indefinite waiting. return EINVAL; @@ -912,14 +915,14 @@ void TaskGroup::flush_nosignal_tasks_remote_locked(butil::Mutex& locked_mutex) { } void TaskGroup::ready_to_run_general(TaskMeta* meta, bool nosignal) { - if (tls_task_group == this) { + if (BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group) == this) { return ready_to_run(meta, nosignal); } return ready_to_run_remote(meta, nosignal); } void TaskGroup::flush_nosignal_tasks_general() { - if (tls_task_group == this) { + if (BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group) == this) { return flush_nosignal_tasks(); } return flush_nosignal_tasks_remote(); @@ -927,28 +930,30 @@ void TaskGroup::flush_nosignal_tasks_general() { void TaskGroup::ready_to_run_in_worker(void* args_in) { ReadyToRunArgs* args = static_cast(args_in); - return tls_task_group->ready_to_run(args->meta, args->nosignal); + return BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group)-> + ready_to_run(args->meta, args->nosignal); } void TaskGroup::ready_to_run_in_worker_ignoresignal(void* args_in) { ReadyToRunArgs* args = static_cast(args_in); + TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group); + #ifdef BRPC_BTHREAD_TRACER - tls_task_group->_control->_task_tracer.set_status( - TASK_STATUS_READY, args->meta); + g->_control->_task_tracer.set_status(TASK_STATUS_READY, args->meta); #endif // BRPC_BTHREAD_TRACER - return tls_task_group->push_rq(args->meta->tid); + return g->push_rq(args->meta->tid); } void TaskGroup::priority_to_run(void* args_in) { ReadyToRunArgs* args = static_cast(args_in); + TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group); #ifdef BRPC_BTHREAD_TRACER - tls_task_group->_control->_task_tracer.set_status( - TASK_STATUS_READY, args->meta); + g->_control->_task_tracer.set_status(TASK_STATUS_READY, args->meta); #endif // BRPC_BTHREAD_TRACER if (args->meta->priority_index < 0) { - return tls_task_group->push_rq(args->meta->tid); + return g->push_rq(args->meta->tid); } - return tls_task_group->control()->push_ed_priority_queue( + return g->control()->push_ed_priority_queue( args->tag, args->meta->priority_index, args->meta->tid); } @@ -960,10 +965,10 @@ struct SleepArgs { }; static void ready_to_run_from_timer_thread(void* arg) { - CHECK(tls_task_group == NULL); + CHECK(BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group) == NULL); const SleepArgs* e = static_cast(arg); - auto g = e->group; - auto tag = g->tag(); + TaskGroup* g = e->group; + bthread_tag_t tag = g->tag(); g->control()->choose_one_group(tag)->ready_to_run_remote(e->meta); } @@ -1089,7 +1094,7 @@ static int set_butex_waiter(bthread_t tid, ButexWaiter* w) { // by race conditions. // TODO: bthreads created by BTHREAD_ATTR_PTHREAD blocking on bthread_usleep() // can't be interrupted. -int TaskGroup::interrupt(bthread_t tid, TaskControl* c, bthread_tag_t tag) { +int TaskGroup::interrupt(bthread_t tid, TaskControl* c) { // Consume current_waiter in the TaskMeta, wake it up then set it back. ButexWaiter* w = NULL; uint64_t sleep_id = 0; @@ -1110,7 +1115,7 @@ int TaskGroup::interrupt(bthread_t tid, TaskControl* c, bthread_tag_t tag) { } } else if (sleep_id != 0) { if (get_global_timer_thread()->unschedule(sleep_id) == 0) { - TaskGroup* g = tls_task_group; + TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group); TaskMeta* m = address_meta(tid); if (g) { g->ready_to_run(m); @@ -1118,7 +1123,7 @@ int TaskGroup::interrupt(bthread_t tid, TaskControl* c, bthread_tag_t tag) { if (!c) { return EINVAL; } - c->choose_one_group(tag)->ready_to_run_remote(m); + c->choose_one_group(m->attr.tag)->ready_to_run_remote(m); } } } diff --git a/src/bthread/task_group.h b/src/bthread/task_group.h index 54140c0dc2..fc0c5cb469 100644 --- a/src/bthread/task_group.h +++ b/src/bthread/task_group.h @@ -203,7 +203,7 @@ class TaskGroup { // Wake up blocking ops in the thread. // Returns 0 on success, errno otherwise. - static int interrupt(bthread_t tid, TaskControl* c, bthread_tag_t tag); + static int interrupt(bthread_t tid, TaskControl* c); // Get the meta associate with the task. static TaskMeta* address_meta(bthread_t tid);