Skip to content

Commit 5f1d893

Browse files
authored
Add support for checking all living bthreads (#3096)
User can check all living bthreads by `curl ip:port/bthreads/all` or when BRPC_BTHREAD_TRACER is enabled by `curl ip:port/bthreads/all?st=1` to show bthread stack trace. This is an enhancement of the original /bthreads service which provides a method to check a specified bthread by designated bthread id, as user has no idea what the bthread id is. BTW, fix _enable_priority_queue not initialized bug and fix task status incorrectly set to TASK_STATUS_FIRST_READY bug.
1 parent 0708333 commit 5f1d893

12 files changed

Lines changed: 155 additions & 30 deletions

src/brpc/builtin/bthreads_service.cpp

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
#include "brpc/builtin/bthreads_service.h"
2424

2525
namespace bthread {
26-
extern void print_task(std::ostream& os, bthread_t tid);
26+
extern void print_task(std::ostream& os, bthread_t tid, bool enable_trace,
27+
bool ignore_not_matched = false);
28+
extern void print_living_tasks(std::ostream& os, bool enable_trace);
2729
}
2830

2931

@@ -38,30 +40,34 @@ void BthreadsService::default_method(::google::protobuf::RpcController* cntl_bas
3840
cntl->http_response().set_content_type("text/plain");
3941
butil::IOBufBuilder os;
4042
const std::string& constraint = cntl->http_request().unresolved_path();
41-
4243
if (constraint.empty()) {
4344
#ifdef BRPC_BTHREAD_TRACER
44-
os << "Use /bthreads/<bthread_id> or /bthreads/<bthread_id>?st=1 for stack trace";
45+
os << "Use /bthreads/<bthread_id> or /bthreads/<bthread_id>?st=1 for stack trace\n";
46+
os << "To check all living bthread, use /bthreads/all or /bthreads/all?st=1 for stack trace\n";
4547
#else
46-
os << "Use /bthreads/<bthread_id>";
48+
os << "Use /bthreads/<bthread_id>\n";
49+
os << "To check all living bthread, use /bthreads/all\n";
4750
#endif // BRPC_BTHREAD_TRACER
4851
} else {
49-
char* endptr = NULL;
50-
bthread_t tid = strtoull(constraint.c_str(), &endptr, 10);
51-
if (*endptr == '\0' || *endptr == '/' || *endptr == '?') {
52-
::bthread::print_task(os, tid);
53-
52+
bool enable_trace = false;
5453
#ifdef BRPC_BTHREAD_TRACER
5554
const std::string* st = cntl->http_request().uri().GetQuery("st");
5655
if (NULL != st && *st == "1") {
57-
os << "\nbthread call stack:\n";
58-
::bthread::stack_trace(os, tid);
56+
enable_trace = true;
5957
}
6058
#endif // BRPC_BTHREAD_TRACER
61-
} else {
62-
cntl->SetFailed(ENOMETHOD, "path=%s is not a bthread id",
59+
char* endptr = NULL;
60+
bthread_t tid = strtoull(constraint.c_str(), &endptr, 10);
61+
if (*endptr == '\0' || *endptr == '/' || *endptr == '?') {
62+
::bthread::print_task(os, tid, enable_trace);
63+
}
64+
else if (constraint != "all" && constraint != "all?st=1") {
65+
cntl->SetFailed(ENOMETHOD, "path=%s is not a bthread id or all, or all?st=1\n",
6366
constraint.c_str());
67+
} else {
68+
::bthread::print_living_tasks(os, enable_trace);
6469
}
70+
6571
}
6672
os.move_to(cntl->response_attachment());
6773
}

src/brpc/builtin/index_service.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ void IndexService::default_method(::google::protobuf::RpcController* controller,
158158
<< Path("/health", html_addr) << " : Test healthy" << NL
159159
<< Path("/vlog", html_addr) << " : List all VLOG callsites" << NL
160160
<< Path("/sockets", html_addr) << " : Check status of a Socket" << NL
161-
<< Path("/bthreads", html_addr) << " : Check status of a bthread" << NL
161+
<< Path("/bthreads", html_addr) << " : Check status of a bthread or all living bthreads" << NL
162162
<< Path("/ids", html_addr) << " : Check status of a bthread_id" << NL
163163
<< Path("/protobufs", html_addr) << " : List all protobuf services and messages" << NL
164164
<< Path("/list", html_addr) << " : json signature of methods" << NL

src/bthread/bthread.cpp

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
#include "bthread/bthread.h"
3333

3434
namespace bthread {
35+
extern void print_task(std::ostream& os, bthread_t tid, bool enable_trace,
36+
bool ignore_not_matched = false);
3537

3638
static bool validate_bthread_concurrency(const char*, int32_t val) {
3739
// bthread_setconcurrency sets the flag on success path which should
@@ -192,8 +194,26 @@ std::string stack_trace(bthread_t tid) {
192194
}
193195
return c->stack_trace(tid);
194196
}
197+
195198
#endif // BRPC_BTHREAD_TRACER
196199

200+
// Print all living (started and not finished) bthreads
201+
void print_living_tasks(std::ostream& os, bool enable_trace) {
202+
TaskControl* c = get_task_control();
203+
if (NULL == c) {
204+
os << "TaskControl has not been created";
205+
return;
206+
}
207+
auto tids = c->get_living_bthreads();
208+
if (tids.empty()) {
209+
os << "No living bthreads\n";
210+
return;
211+
}
212+
for (auto tid : tids) {
213+
print_task(os, tid, enable_trace, true);
214+
}
215+
}
216+
197217
static int add_workers_for_each_tag(int num) {
198218
int added = 0;
199219
auto c = get_task_control();

src/bthread/task_control.cpp

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ TaskControl::TaskControl()
192192
, _signal_per_second(&_cumulated_signal_count)
193193
, _status(print_rq_sizes_in_the_tc, this)
194194
, _nbthreads("bthread_count")
195+
, _enable_priority_queue(FLAGS_enable_bthread_priority_queue)
195196
, _priority_queues(FLAGS_task_group_ntags)
196197
, _pl_num_of_each_tag(FLAGS_bthread_parking_lot_of_each_tag)
197198
, _tagged_pl(FLAGS_task_group_ntags)
@@ -587,4 +588,23 @@ bvar::LatencyRecorder* TaskControl::create_exposed_pending_time() {
587588
return pt;
588589
}
589590

591+
std::vector<bthread_t> TaskControl::get_living_bthreads() {
592+
std::vector<bthread_t> living_bthread_ids;
593+
living_bthread_ids.reserve(1024);
594+
butil::for_each_resource<TaskMeta>([&living_bthread_ids](TaskMeta* m) {
595+
// filter out those bthreads created by bthread_start* functions,
596+
// i.e. not those created internally to run main task as they are
597+
// opaque to user.
598+
if (m && m->fn) {
599+
// determine whether the bthread is living by checking version
600+
const uint32_t given_ver = get_version(m->tid);
601+
BAIDU_SCOPED_LOCK(m->version_lock);
602+
if (given_ver == *m->version_butex) {
603+
living_bthread_ids.push_back(m->tid);
604+
}
605+
}
606+
});
607+
return living_bthread_ids;
608+
}
609+
590610
} // namespace bthread

src/bthread/task_control.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ friend bthread_t init_for_pthread_stack_trace();
101101
_priority_queues[tag].push(tid);
102102
}
103103

104+
std::vector<bthread_t> get_living_bthreads();
104105
private:
105106
typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups;
106107
typedef std::array<ParkingLot, BTHREAD_MAX_PARKINGLOT> TaggedParkingLot;

src/bthread/task_group.cpp

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
#include "bthread/task_control.h"
3737
#include "bthread/task_group.h"
3838
#include "bthread/timer_thread.h"
39+
#include "bthread/bthread.h"
3940

4041
#ifdef __x86_64__
4142
#include <x86intrin.h>
@@ -678,7 +679,7 @@ void TaskGroup::ending_sched(TaskGroup** pg) {
678679
}
679680
}
680681
}
681-
sched_to(pg, next_meta, true);
682+
sched_to(pg, next_meta);
682683
}
683684

684685
void TaskGroup::sched(TaskGroup** pg) {
@@ -699,7 +700,7 @@ void TaskGroup::sched(TaskGroup** pg) {
699700

700701
extern void CheckBthreadScheSafety();
701702

702-
void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta, bool cur_ending) {
703+
void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta) {
703704
TaskGroup* g = *pg;
704705
#ifndef NDEBUG
705706
if ((++g->_sched_recursive_guard) > 1) {
@@ -1088,10 +1089,11 @@ void TaskGroup::yield(TaskGroup** pg) {
10881089
sched(pg);
10891090
}
10901091

1091-
void print_task(std::ostream& os, bthread_t tid) {
1092+
void print_task(std::ostream& os, bthread_t tid, bool enable_trace,
1093+
bool ignore_not_matched = false) {
10921094
TaskMeta* const m = TaskGroup::address_meta(tid);
10931095
if (m == NULL) {
1094-
os << "bthread=" << tid << " : never existed";
1096+
os << "bthread=" << tid << " : never existed\n";
10951097
return;
10961098
}
10971099
const uint32_t given_ver = get_version(tid);
@@ -1127,7 +1129,9 @@ void print_task(std::ostream& os, bthread_t tid) {
11271129
}
11281130
}
11291131
if (!matched) {
1130-
os << "bthread=" << tid << " : not exist now";
1132+
if (!ignore_not_matched) {
1133+
os << "bthread=" << tid << " : not exist now\n";
1134+
}
11311135
} else {
11321136
os << "bthread=" << tid << " :\nstop=" << stop
11331137
<< "\ninterrupted=" << interrupted
@@ -1136,6 +1140,7 @@ void print_task(std::ostream& os, bthread_t tid) {
11361140
<< "\narg=" << (void*)arg
11371141
<< "\nattr={stack_type=" << attr.stack_type
11381142
<< " flags=" << attr.flags
1143+
<< " specified_tag=" << attr.tag
11391144
<< " keytable_pool=" << attr.keytable_pool
11401145
<< "}\nhas_tls=" << has_tls
11411146
<< "\nuptime_ns=" << butil::cpuwide_time_ns() - cpuwide_start_ns
@@ -1145,8 +1150,13 @@ void print_task(std::ostream& os, bthread_t tid) {
11451150
<< "\nstatus=" << status
11461151
<< "\ntraced=" << traced
11471152
<< "\nworker_tid=" << worker_tid;
1148-
#else
1149-
;
1153+
if (enable_trace) {
1154+
os << "\nbthread call stack:\n";
1155+
stack_trace(os, tid);
1156+
}
1157+
os << "\n\n";
1158+
#else
1159+
<< "\n\n";
11501160
(void)status;(void)traced;(void)worker_tid;
11511161
#endif // BRPC_BTHREAD_TRACER
11521162
}

src/bthread/task_group.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ class TaskGroup {
111111
// Suspend caller and run bthread `next_tid' in TaskGroup *pg.
112112
// Purpose of this function is to avoid pushing `next_tid' to _rq and
113113
// then being popped by sched(pg), which is not necessary.
114-
static void sched_to(TaskGroup** pg, TaskMeta* next_meta, bool cur_ending);
114+
static void sched_to(TaskGroup** pg, TaskMeta* next_meta);
115115
static void sched_to(TaskGroup** pg, bthread_t next_tid);
116116
static void exchange(TaskGroup** pg, TaskMeta* next_meta);
117117

src/bthread/task_group_inl.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ inline void TaskGroup::exchange(TaskGroup** pg, TaskMeta* next_meta) {
5656
? ready_to_run_in_worker_ignoresignal
5757
: ready_to_run_in_worker),
5858
&args);
59-
TaskGroup::sched_to(pg, next_meta, false);
59+
TaskGroup::sched_to(pg, next_meta);
6060
}
6161

6262
inline void TaskGroup::sched_to(TaskGroup** pg, bthread_t next_tid) {
@@ -79,7 +79,7 @@ inline void TaskGroup::sched_to(TaskGroup** pg, bthread_t next_tid) {
7979
}
8080
}
8181
// Update now_ns only when wait_task did yield.
82-
sched_to(pg, next_meta, false);
82+
sched_to(pg, next_meta);
8383
}
8484

8585
inline void TaskGroup::push_rq(bthread_t tid) {

src/bthread/task_tracer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ void TaskTracer::set_status(TaskStatus s, TaskMeta* m) {
147147

148148
tracing = m->traced;
149149
// bthread is scheduled for the first time.
150-
if (TASK_STATUS_READY == s || NULL == m->stack) {
150+
if (TASK_STATUS_READY == s && NULL == m->stack) {
151151
m->status = TASK_STATUS_FIRST_READY;
152152
} else {
153153
m->status = s;

src/butil/resource_pool.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,11 @@ template <typename T> ResourcePoolInfo describe_resources() {
132132
return ResourcePool<T>::singleton()->describe_resources();
133133
}
134134

135+
// Traverse all allocated resources typed T and apply specified operation
136+
template <typename T, typename F>
137+
void for_each_resource(F const& f) {
138+
ResourcePool<T>::singleton()->for_each_resource(f);
139+
}
135140
} // namespace butil
136141

137142
#endif // BUTIL_RESOURCE_POOL_H

0 commit comments

Comments
 (0)