Skip to content

Commit db7e37b

Browse files
committed
Add support for checking all living bthreads
User can check all living bthreads by `curl ip:port/bthreads/all` or `curl ip:port/bthreads/all?st=1` to show bthread stack trace. This is an enhancement of the original /bthreads service which provides a method to check a specified bthread by designated bthread id, as user has no idea what the bthread id is. Condisering the performance cost brought by recording the bthread id on bthread startup and finish, currently this function is only enabled when BRPC_BTHREAD_TRACER is defined.
1 parent 7229c36 commit db7e37b

6 files changed

Lines changed: 216 additions & 22 deletions

File tree

src/brpc/builtin/bthreads_service.cpp

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@
2323
#include "brpc/builtin/bthreads_service.h"
2424

2525
namespace bthread {
26-
extern void print_task(std::ostream& os, bthread_t tid);
26+
extern void print_task(std::ostream& os, bthread_t tid, bool enable_trace);
27+
#ifdef BRPC_BTHREAD_TRACER
28+
extern void print_living_task(std::ostream& os, bool enable_trace);
29+
#endif // BRPC_BTHREAD_TRACER
2730
}
2831

2932

@@ -38,30 +41,42 @@ void BthreadsService::default_method(::google::protobuf::RpcController* cntl_bas
3841
cntl->http_response().set_content_type("text/plain");
3942
butil::IOBufBuilder os;
4043
const std::string& constraint = cntl->http_request().unresolved_path();
41-
4244
if (constraint.empty()) {
4345
#ifdef BRPC_BTHREAD_TRACER
44-
os << "Use /bthreads/<bthread_id> or /bthreads/<bthread_id>?st=1 for stack trace";
46+
// when BRPC_BTHREAD_TRACER enabled, we can trace a specified bthread and
47+
// check all living bthread
48+
os << "Use /bthreads/<bthread_id> or /bthreads/<bthread_id>?st=1 for stack trace\n";
49+
os << "To check all living bthread, use /bthreads/<all> or /bthreads/<all>?st=1 for stack trace\n";
4550
#else
46-
os << "Use /bthreads/<bthread_id>";
51+
os << "Use /bthreads/<bthread_id>\n";
4752
#endif // BRPC_BTHREAD_TRACER
4853
} else {
49-
char* endptr = NULL;
50-
bthread_t tid = strtoull(constraint.c_str(), &endptr, 10);
51-
if (*endptr == '\0' || *endptr == '/' || *endptr == '?') {
52-
::bthread::print_task(os, tid);
53-
54+
bool enable_trace = false;
5455
#ifdef BRPC_BTHREAD_TRACER
5556
const std::string* st = cntl->http_request().uri().GetQuery("st");
5657
if (NULL != st && *st == "1") {
57-
os << "\nbthread call stack:\n";
58-
::bthread::stack_trace(os, tid);
58+
enable_trace = true;
5959
}
6060
#endif // BRPC_BTHREAD_TRACER
61+
char* endptr = NULL;
62+
bthread_t tid = strtoull(constraint.c_str(), &endptr, 10);
63+
if (*endptr == '\0' || *endptr == '/' || *endptr == '?') {
64+
::bthread::print_task(os, tid, enable_trace);
65+
}
66+
#ifdef BRPC_BTHREAD_TRACER
67+
else if (constraint != "all" && constraint != "all?st=1") {
68+
cntl->SetFailed(ENOMETHOD, "path=%s is not a bthread id or all, or all?st=1\n",
69+
constraint.c_str());
6170
} else {
62-
cntl->SetFailed(ENOMETHOD, "path=%s is not a bthread id",
71+
::bthread::print_living_task(os, enable_trace);
72+
}
73+
#else
74+
else {
75+
cntl->SetFailed(ENOMETHOD, "path=%s is not a bthread id\n",
6376
constraint.c_str());
6477
}
78+
#endif // BRPC_BTHREAD_TRACER
79+
6580
}
6681
os.move_to(cntl->response_attachment());
6782
}

src/brpc/builtin/index_service.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,11 @@ void IndexService::default_method(::google::protobuf::RpcController* controller,
158158
<< Path("/health", html_addr) << " : Test healthy" << NL
159159
<< Path("/vlog", html_addr) << " : List all VLOG callsites" << NL
160160
<< Path("/sockets", html_addr) << " : Check status of a Socket" << NL
161+
#ifdef BRPC_BTHREAD_TRACER
162+
<< Path("/bthreads", html_addr) << " : Check status of a bthread or all living bthreads" << NL
163+
#else
161164
<< Path("/bthreads", html_addr) << " : Check status of a bthread" << NL
165+
#endif // BRPC_BTHREAD_TRACER
162166
<< Path("/ids", html_addr) << " : Check status of a bthread_id" << NL
163167
<< Path("/protobufs", html_addr) << " : List all protobuf services and messages" << NL
164168
<< Path("/list", html_addr) << " : json signature of methods" << NL

src/bthread/bthread.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include "bthread/bthread.h"
3333

3434
namespace bthread {
35+
extern void print_task(std::ostream& os, bthread_t tid, bool enable_trace);
3536

3637
static bool validate_bthread_concurrency(const char*, int32_t val) {
3738
// bthread_setconcurrency sets the flag on success path which should
@@ -192,6 +193,23 @@ std::string stack_trace(bthread_t tid) {
192193
}
193194
return c->stack_trace(tid);
194195
}
196+
197+
// Print all living (started and not finished) bthread
198+
void print_living_task(std::ostream& os, bool enable_trace) {
199+
TaskControl* c = get_task_control();
200+
if (NULL == c) {
201+
os << "TaskControl has not been created";
202+
return;
203+
}
204+
std::set<bthread_t> tids = c->get_living_bthreads();
205+
if (tids.empty()) {
206+
os << "No living bthreads\n";
207+
return;
208+
}
209+
for (auto tid : tids) {
210+
print_task(os, tid, enable_trace);
211+
}
212+
}
195213
#endif // BRPC_BTHREAD_TRACER
196214

197215
static int add_workers_for_each_tag(int num) {

src/bthread/task_control.h

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include <signal.h>
2929
#include <stddef.h> // size_t
3030
#include <vector>
31+
#include <set>
3132
#include <array>
3233
#include <memory>
3334
#include "butil/atomicops.h" // butil::atomic
@@ -40,6 +41,71 @@
4041
DECLARE_int32(task_group_ntags);
4142
namespace bthread {
4243

44+
#ifdef BRPC_BTHREAD_TRACER
45+
class ShardedBthreadSet {
46+
public:
47+
explicit ShardedBthreadSet(size_t shard_count = 16)
48+
: _shard_count(shard_count), _shards(shard_count) {}
49+
50+
void add(bthread_t id) {
51+
get_shard(id).add(id);
52+
}
53+
54+
void remove(bthread_t id) {
55+
get_shard(id).remove(id);
56+
}
57+
58+
std::set<bthread_t> get_all() {
59+
std::set<bthread_t> result;
60+
for (auto& shard : _shards) {
61+
shard.copy_to(result);
62+
}
63+
return result;
64+
}
65+
66+
private:
67+
class Shard {
68+
public:
69+
Shard() {
70+
pthread_spin_init(&_spinlock, PTHREAD_PROCESS_PRIVATE);
71+
}
72+
73+
~Shard() {
74+
pthread_spin_destroy(&_spinlock);
75+
}
76+
77+
Shard(const Shard&) = delete;
78+
Shard& operator=(const Shard&) = delete;
79+
80+
void add(bthread_t id) {
81+
BAIDU_SCOPED_LOCK(_spinlock);
82+
_bthread_ids.insert(id);
83+
}
84+
85+
void remove(bthread_t id) {
86+
BAIDU_SCOPED_LOCK(_spinlock);
87+
_bthread_ids.erase(id);
88+
}
89+
90+
void copy_to(std::set<bthread_t>& target) {
91+
BAIDU_SCOPED_LOCK(_spinlock);
92+
target.insert(_bthread_ids.begin(), _bthread_ids.end());
93+
}
94+
95+
private:
96+
pthread_spinlock_t _spinlock;
97+
std::set<bthread_t> _bthread_ids;
98+
};
99+
100+
Shard& get_shard(bthread_t id) {
101+
return _shards[id % _shard_count];
102+
}
103+
104+
const size_t _shard_count;
105+
std::vector<Shard> _shards;
106+
};
107+
#endif // BRPC_BTHREAD_TRACER
108+
43109
class TaskGroup;
44110

45111
// Control all task groups
@@ -95,6 +161,9 @@ friend bthread_t init_for_pthread_stack_trace();
95161
// A stacktrace of bthread can be helpful in debugging.
96162
void stack_trace(std::ostream& os, bthread_t tid);
97163
std::string stack_trace(bthread_t tid);
164+
std::set<bthread_t> get_living_bthreads() {
165+
return _living_bthreads.get_all();
166+
}
98167
#endif // BRPC_BTHREAD_TRACER
99168

100169
void push_priority_queue(bthread_tag_t tag, bthread_t tid) {
@@ -130,6 +199,16 @@ friend bthread_t init_for_pthread_stack_trace();
130199
bvar::Adder<int64_t>& tag_nworkers(bthread_tag_t tag);
131200
bvar::Adder<int64_t>& tag_nbthreads(bthread_tag_t tag);
132201

202+
#ifdef BRPC_BTHREAD_TRACER
203+
void record_bthread_start(bthread_t tid) {
204+
_living_bthreads.add(tid);
205+
}
206+
207+
void record_bthread_finish(bthread_t tid) {
208+
_living_bthreads.remove(tid);
209+
}
210+
#endif // BRPC_BTHREAD_TRACER
211+
133212
std::vector<butil::atomic<size_t>> _tagged_ngroup;
134213
std::vector<TaggedGroups> _tagged_groups;
135214
butil::Mutex _modify_group_mutex;
@@ -165,6 +244,7 @@ friend bthread_t init_for_pthread_stack_trace();
165244

166245
#ifdef BRPC_BTHREAD_TRACER
167246
TaskTracer _task_tracer;
247+
ShardedBthreadSet _living_bthreads;
168248
#endif // BRPC_BTHREAD_TRACER
169249

170250
};

src/bthread/task_group.cpp

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
#include "bthread/task_control.h"
3737
#include "bthread/task_group.h"
3838
#include "bthread/timer_thread.h"
39+
#include "bthread/bthread.h"
3940

4041
#ifdef __x86_64__
4142
#include <x86intrin.h>
@@ -442,6 +443,7 @@ void TaskGroup::task_runner(intptr_t skip_remained) {
442443
g->_control->_task_tracer.WaitForTracing(m);
443444
}
444445
g->_control->_task_tracer.set_status(TASK_STATUS_UNKNOWN, m);
446+
g->_control->record_bthread_finish(m->tid);
445447
#endif // BRPC_BTHREAD_TRACER
446448

447449
g->_control->_nbthreads << -1;
@@ -507,6 +509,7 @@ int TaskGroup::start_foreground(TaskGroup** pg,
507509
g->_control->tag_nbthreads(g->tag()) << 1;
508510
#ifdef BRPC_BTHREAD_TRACER
509511
g->_control->_task_tracer.set_status(TASK_STATUS_CREATED, m);
512+
g->_control->record_bthread_start(*th);
510513
#endif // BRPC_BTHREAD_TRACER
511514
if (g->is_current_pthread_task()) {
512515
// never create foreground task in pthread.
@@ -570,6 +573,7 @@ int TaskGroup::start_background(bthread_t* __restrict th,
570573
_control->tag_nbthreads(tag()) << 1;
571574
#ifdef BRPC_BTHREAD_TRACER
572575
_control->_task_tracer.set_status(TASK_STATUS_CREATED, m);
576+
_control->record_bthread_start(*th);
573577
#endif // BRPC_BTHREAD_TRACER
574578
if (REMOTE) {
575579
ready_to_run_remote(m, (using_attr.flags & BTHREAD_NOSIGNAL));
@@ -1088,10 +1092,10 @@ void TaskGroup::yield(TaskGroup** pg) {
10881092
sched(pg);
10891093
}
10901094

1091-
void print_task(std::ostream& os, bthread_t tid) {
1095+
void print_task(std::ostream& os, bthread_t tid, bool enable_trace) {
10921096
TaskMeta* const m = TaskGroup::address_meta(tid);
10931097
if (m == NULL) {
1094-
os << "bthread=" << tid << " : never existed";
1098+
os << "bthread=" << tid << " : never existed\n";
10951099
return;
10961100
}
10971101
const uint32_t given_ver = get_version(tid);
@@ -1127,7 +1131,7 @@ void print_task(std::ostream& os, bthread_t tid) {
11271131
}
11281132
}
11291133
if (!matched) {
1130-
os << "bthread=" << tid << " : not exist now";
1134+
os << "bthread=" << tid << " : not exist now\n";
11311135
} else {
11321136
os << "bthread=" << tid << " :\nstop=" << stop
11331137
<< "\ninterrupted=" << interrupted
@@ -1136,6 +1140,7 @@ void print_task(std::ostream& os, bthread_t tid) {
11361140
<< "\narg=" << (void*)arg
11371141
<< "\nattr={stack_type=" << attr.stack_type
11381142
<< " flags=" << attr.flags
1143+
<< " specified tag=" << attr.tag
11391144
<< " keytable_pool=" << attr.keytable_pool
11401145
<< "}\nhas_tls=" << has_tls
11411146
<< "\nuptime_ns=" << butil::cpuwide_time_ns() - cpuwide_start_ns
@@ -1145,8 +1150,15 @@ void print_task(std::ostream& os, bthread_t tid) {
11451150
<< "\nstatus=" << status
11461151
<< "\ntraced=" << traced
11471152
<< "\nworker_tid=" << worker_tid;
1148-
#else
1149-
;
1153+
if (enable_trace) {
1154+
os << "\nbthread call stack:\n";
1155+
stack_trace(os, tid);
1156+
os << "\n";
1157+
} else {
1158+
os << "\n\n";
1159+
}
1160+
#else
1161+
<< "\n\n";
11501162
(void)status;(void)traced;(void)worker_tid;
11511163
#endif // BRPC_BTHREAD_TRACER
11521164
}

0 commit comments

Comments
 (0)