Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions src/bthread/bthread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ start_from_non_worker(bthread_t* __restrict tid,
if (NULL == c) {
return ENOMEM;
}
auto tag = BTHREAD_TAG_DEFAULT;
bthread_tag_t tag = BTHREAD_TAG_DEFAULT;
if (attr != NULL && attr->tag != BTHREAD_TAG_INVALID) {
tag = attr->tag;
}
Expand All @@ -283,7 +283,7 @@ start_from_non_worker(bthread_t* __restrict tid,
// 1. NOSIGNAL is often for creating many bthreads in batch,
// inserting into the same TaskGroup maximizes the batch.
// 2. bthread_flush() needs to know which TaskGroup to flush.
auto g = tls_task_group_nosignal;
TaskGroup* g = tls_task_group_nosignal;
if (NULL == g) {
g = c->choose_one_group(tag);
tls_task_group_nosignal = g;
Expand All @@ -298,7 +298,7 @@ start_from_non_worker(bthread_t* __restrict tid,
// tag equal to thread local
// tag equal to BTHREAD_TAG_INVALID
BUTIL_FORCE_INLINE bool can_run_thread_local(const bthread_attr_t* __restrict attr) {
return attr == nullptr || attr->tag == bthread::tls_task_group->tag() ||
return attr == nullptr || attr->tag == tls_task_group->tag() ||
attr->tag == BTHREAD_TAG_INVALID;
}

Expand Down Expand Up @@ -331,7 +331,7 @@ int bthread_start_urgent(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
bthread::TaskGroup* g = bthread::tls_task_group;
bthread::TaskGroup* g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
if (g) {
// if attribute is null use thread local task group
if (bthread::can_run_thread_local(attr)) {
Expand All @@ -345,7 +345,7 @@ int bthread_start_background(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
bthread::TaskGroup* g = bthread::tls_task_group;
bthread::TaskGroup* g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
if (g) {
// if attribute is null use thread local task group
if (bthread::can_run_thread_local(attr)) {
Expand All @@ -356,7 +356,7 @@ int bthread_start_background(bthread_t* __restrict tid,
}

void bthread_flush() {
bthread::TaskGroup* g = bthread::tls_task_group;
bthread::TaskGroup* g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
if (g) {
return g->flush_nosignal_tasks();
}
Expand All @@ -368,8 +368,8 @@ void bthread_flush() {
}
}

int bthread_interrupt(bthread_t tid, bthread_tag_t tag) {
return bthread::TaskGroup::interrupt(tid, bthread::get_task_control(), tag);
int bthread_interrupt(bthread_t tid, bthread_tag_t /*tag*/) {
return bthread::TaskGroup::interrupt(tid, bthread::get_task_control());
}

int bthread_stop(bthread_t tid) {
Expand All @@ -382,7 +382,7 @@ int bthread_stopped(bthread_t tid) {
}

bthread_t bthread_self(void) {
bthread::TaskGroup* g = bthread::tls_task_group;
bthread::TaskGroup* g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
// note: return 0 for main tasks now, which include main thread and
// all work threads. So that we can identify main tasks from logs
// more easily. This is probably questionable in the future.
Expand All @@ -397,7 +397,7 @@ int bthread_equal(bthread_t t1, bthread_t t2) {
}

void bthread_exit(void* retval) {
bthread::TaskGroup* g = bthread::tls_task_group;
bthread::TaskGroup* g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
if (g != NULL && !g->is_current_main_task()) {
throw bthread::ExitException(retval);
} else {
Expand Down Expand Up @@ -511,7 +511,7 @@ int bthread_setconcurrency_by_tag(int num, bthread_tag_t tag) {
}

int bthread_about_to_quit() {
bthread::TaskGroup* g = bthread::tls_task_group;
bthread::TaskGroup* g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
if (g != NULL) {
bthread::TaskMeta* current_task = g->current_task();
if(!(current_task->attr.flags & BTHREAD_NEVER_QUIT)) {
Expand Down Expand Up @@ -640,12 +640,12 @@ int bthread_list_join(bthread_list_t* list) {
}

bthread_tag_t bthread_self_tag(void) {
return bthread::tls_task_group != nullptr ? bthread::tls_task_group->tag()
: BTHREAD_TAG_DEFAULT;
bthread::TaskGroup* g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
return g != NULL ? g->tag() : BTHREAD_TAG_DEFAULT;
}

uint64_t bthread_cpu_clock_ns(void) {
bthread::TaskGroup* g = bthread::tls_task_group;
bthread::TaskGroup* g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
if (g != NULL && !g->is_current_main_task()) {
return g->current_task_cpu_clock_ns();
}
Expand Down
18 changes: 9 additions & 9 deletions src/bthread/butex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ struct ButexBthreadWaiter : public ButexWaiter {
Butex* initial_butex;
TaskControl* control;
const timespec* abstime;
bthread_tag_t tag;
};

// pthread_task or main_task allocates this structure on stack and queue it
Expand Down Expand Up @@ -320,7 +319,7 @@ int butex_wake(void* arg, bool nosignal) {
}
ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front);
unsleep_if_necessary(bbw, get_global_timer_thread());
TaskGroup* g = get_task_group(bbw->control, bbw->tag);
TaskGroup* g = get_task_group(bbw->control, bbw->task_meta->attr.tag);
if (g == tls_task_group) {
Comment thread
chenBright marked this conversation as resolved.
run_in_local_task_group(g, bbw->task_meta, nosignal);
} else {
Expand Down Expand Up @@ -373,7 +372,7 @@ int butex_wake_n(void* arg, size_t n, bool nosignal) {
bthread_waiters.tail()->value());
w->RemoveFromList();
unsleep_if_necessary(w, get_global_timer_thread());
auto g = get_task_group(w->control, w->tag);
auto g = get_task_group(w->control, w->task_meta->attr.tag);
g->ready_to_run_general(w->task_meta, true);
nwakeups[g->tag()] = g;
Comment thread
chenBright marked this conversation as resolved.
++nwakeup;
Expand All @@ -384,7 +383,7 @@ int butex_wake_n(void* arg, size_t n, bool nosignal) {
g->flush_nosignal_tasks_general();
}
}
auto g = get_task_group(next->control, next->tag);
auto g = get_task_group(next->control, next->task_meta->attr.tag);
if (g == tls_task_group) {
run_in_local_task_group(g, next->task_meta, nosignal);
} else {
Expand Down Expand Up @@ -446,7 +445,7 @@ int butex_wake_except(void* arg, bthread_t excluded_bthread) {
ButexBthreadWaiter* w = static_cast<ButexBthreadWaiter*>(bthread_waiters.tail()->value());
w->RemoveFromList();
unsleep_if_necessary(w, get_global_timer_thread());
auto g = get_task_group(w->control, w->tag);
auto g = get_task_group(w->control, w->task_meta->attr.tag);
g->ready_to_run_general(w->task_meta, true);
Comment thread
chenBright marked this conversation as resolved.
nwakeups[g->tag()] = g;
++nwakeup;
Expand Down Expand Up @@ -489,11 +488,12 @@ int butex_requeue(void* arg, void* arg2) {
}
ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front);
unsleep_if_necessary(bbw, get_global_timer_thread());
auto g = is_same_tag(bbw->tag) ? tls_task_group : NULL;
auto g = is_same_tag(bbw->task_meta->attr.tag) ? tls_task_group : NULL;
if (g) {
TaskGroup::exchange(&g, bbw->task_meta);
} else {
bbw->control->choose_one_group(bbw->tag)->ready_to_run_remote(bbw->task_meta);
g = bbw->control->choose_one_group(bbw->task_meta->attr.tag);
g->ready_to_run_remote(bbw->task_meta);
Comment thread
chenBright marked this conversation as resolved.
}
return 1;
}
Expand Down Expand Up @@ -531,7 +531,8 @@ inline bool erase_from_butex(ButexWaiter* bw, bool wakeup, WaiterState state) {
if (erased && wakeup) {
if (bw->tid) {
ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(bw);
get_task_group(bbw->control, bbw->tag)->ready_to_run_general(bbw->task_meta);
auto g = get_task_group(bbw->control, bbw->task_meta->attr.tag);
g->ready_to_run_general(bbw->task_meta);
Comment thread
chenBright marked this conversation as resolved.
} else {
ButexPthreadWaiter* pw = static_cast<ButexPthreadWaiter*>(bw);
wakeup_pthread(pw);
Expand Down Expand Up @@ -691,7 +692,6 @@ int butex_wait(void* arg, int expected_value, const timespec* abstime, bool prep
bbw.initial_butex = b;
bbw.control = g->control();
bbw.abstime = abstime;
bbw.tag = g->tag();

if (abstime != NULL) {
// Schedule timer before queueing. If the timer is triggered before
Expand Down
2 changes: 1 addition & 1 deletion src/bthread/task_control.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ int TaskControl::add_workers(int num, bthread_tag_t tag) {
}

TaskGroup* TaskControl::choose_one_group(bthread_tag_t tag) {
CHECK(tag >= BTHREAD_TAG_DEFAULT && tag < FLAGS_task_group_ntags);
CHECK(tag >= BTHREAD_TAG_DEFAULT && tag < FLAGS_task_group_ntags) << tag;
auto& groups = tag_group(tag);
const auto ngroup = tag_ngroup(tag).load(butil::memory_order_acquire);
if (ngroup != 0) {
Expand Down
45 changes: 25 additions & 20 deletions src/bthread/task_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ void TaskGroup::asan_task_runner(intptr_t) {
void TaskGroup::task_runner(intptr_t skip_remained) {
// NOTE: tls_task_group is volatile since tasks are moved around
// different groups.
TaskGroup* g = tls_task_group;
TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
#ifdef BRPC_BTHREAD_TRACER
TaskTracer::set_running_status(g->tid(), g->_cur_meta);
#endif // BRPC_BTHREAD_TRACER
Expand Down Expand Up @@ -526,13 +526,15 @@ int TaskGroup::start_foreground(TaskGroup** pg,
m->cpuwide_start_ns = start_ns;
m->stat = EMPTY_STAT;
m->tid = make_tid(*m->version_butex, slot);
m->priority_index = pg ? (*pg)->_cur_meta->priority_index : -1;

TaskGroup* g = *pg;
m->priority_index = g->_cur_meta->priority_index;
m->attr.tag = g->tag();
*th = m->tid;
if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) {
LOG(INFO) << "Started bthread " << m->tid;
}

TaskGroup* g = *pg;
g->_control->_nbthreads << 1;
g->_control->tag_nbthreads(g->tag()) << 1;
#ifdef BRPC_BTHREAD_TRACER
Expand Down Expand Up @@ -601,6 +603,7 @@ int TaskGroup::start_background(bthread_t* __restrict th,
if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) {
LOG(INFO) << "Started bthread " << m->tid;
}
m->attr.tag = tag();
_control->_nbthreads << 1;
_control->tag_nbthreads(tag()) << 1;
#ifdef BRPC_BTHREAD_TRACER
Expand Down Expand Up @@ -635,7 +638,7 @@ int TaskGroup::join(bthread_t tid, void** return_value) {
// The bthread is not created yet, this join is definitely wrong.
return EINVAL;
}
TaskGroup* g = tls_task_group;
TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
if (g != NULL && g->current_tid() == tid) {
// joining self causes indefinite waiting.
return EINVAL;
Expand Down Expand Up @@ -912,43 +915,45 @@ void TaskGroup::flush_nosignal_tasks_remote_locked(butil::Mutex& locked_mutex) {
}

void TaskGroup::ready_to_run_general(TaskMeta* meta, bool nosignal) {
if (tls_task_group == this) {
if (BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group) == this) {
return ready_to_run(meta, nosignal);
}
return ready_to_run_remote(meta, nosignal);
}

void TaskGroup::flush_nosignal_tasks_general() {
if (tls_task_group == this) {
if (BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group) == this) {
return flush_nosignal_tasks();
}
return flush_nosignal_tasks_remote();
}

void TaskGroup::ready_to_run_in_worker(void* args_in) {
ReadyToRunArgs* args = static_cast<ReadyToRunArgs*>(args_in);
return tls_task_group->ready_to_run(args->meta, args->nosignal);
return BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group)->
ready_to_run(args->meta, args->nosignal);
}

void TaskGroup::ready_to_run_in_worker_ignoresignal(void* args_in) {
ReadyToRunArgs* args = static_cast<ReadyToRunArgs*>(args_in);
TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);

#ifdef BRPC_BTHREAD_TRACER
tls_task_group->_control->_task_tracer.set_status(
TASK_STATUS_READY, args->meta);
g->_control->_task_tracer.set_status(TASK_STATUS_READY, args->meta);
#endif // BRPC_BTHREAD_TRACER
return tls_task_group->push_rq(args->meta->tid);
return g->push_rq(args->meta->tid);
}

void TaskGroup::priority_to_run(void* args_in) {
ReadyToRunArgs* args = static_cast<ReadyToRunArgs*>(args_in);
TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
#ifdef BRPC_BTHREAD_TRACER
tls_task_group->_control->_task_tracer.set_status(
TASK_STATUS_READY, args->meta);
g->_control->_task_tracer.set_status(TASK_STATUS_READY, args->meta);
#endif // BRPC_BTHREAD_TRACER
if (args->meta->priority_index < 0) {
return tls_task_group->push_rq(args->meta->tid);
return g->push_rq(args->meta->tid);
}
return tls_task_group->control()->push_ed_priority_queue(
return g->control()->push_ed_priority_queue(
args->tag, args->meta->priority_index, args->meta->tid);
}

Expand All @@ -960,10 +965,10 @@ struct SleepArgs {
};

static void ready_to_run_from_timer_thread(void* arg) {
CHECK(tls_task_group == NULL);
CHECK(BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group) == NULL);
const SleepArgs* e = static_cast<const SleepArgs*>(arg);
auto g = e->group;
auto tag = g->tag();
TaskGroup* g = e->group;
bthread_tag_t tag = g->tag();
g->control()->choose_one_group(tag)->ready_to_run_remote(e->meta);
}

Expand Down Expand Up @@ -1089,7 +1094,7 @@ static int set_butex_waiter(bthread_t tid, ButexWaiter* w) {
// by race conditions.
// TODO: bthreads created by BTHREAD_ATTR_PTHREAD blocking on bthread_usleep()
// can't be interrupted.
int TaskGroup::interrupt(bthread_t tid, TaskControl* c, bthread_tag_t tag) {
int TaskGroup::interrupt(bthread_t tid, TaskControl* c) {
// Consume current_waiter in the TaskMeta, wake it up then set it back.
ButexWaiter* w = NULL;
uint64_t sleep_id = 0;
Expand All @@ -1110,15 +1115,15 @@ int TaskGroup::interrupt(bthread_t tid, TaskControl* c, bthread_tag_t tag) {
}
} else if (sleep_id != 0) {
if (get_global_timer_thread()->unschedule(sleep_id) == 0) {
TaskGroup* g = tls_task_group;
TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
TaskMeta* m = address_meta(tid);
if (g) {
g->ready_to_run(m);
} else {
if (!c) {
return EINVAL;
}
c->choose_one_group(tag)->ready_to_run_remote(m);
c->choose_one_group(m->attr.tag)->ready_to_run_remote(m);
}
Comment thread
chenBright marked this conversation as resolved.
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/bthread/task_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ class TaskGroup {

// Wake up blocking ops in the thread.
// Returns 0 on success, errno otherwise.
static int interrupt(bthread_t tid, TaskControl* c, bthread_tag_t tag);
static int interrupt(bthread_t tid, TaskControl* c);

// Get the meta associate with the task.
static TaskMeta* address_meta(bthread_t tid);
Expand Down
Loading