Skip to content

Commit b5fab0b

Browse files
committed
Bugfix: TaskTracer deadlocks due to ABA problem
1 parent 4eeb5e2 commit b5fab0b

4 files changed

Lines changed: 41 additions & 67 deletions

File tree

src/bthread/task_meta.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ struct TaskMeta {
112112
TaskStatus status{TASK_STATUS_UNKNOWN};
113113
// Whether bthread is traced?
114114
bool traced{false};
115+
// [Not Reset] guarantee tracing completion before jumping.
116+
pthread_mutex_t trace_lock{};
115117
// Worker thread id.
116118
pthread_t worker_tid{};
117119

@@ -122,9 +124,11 @@ struct TaskMeta {
122124
pthread_spin_init(&version_lock, 0);
123125
version_butex = butex_create_checked<uint32_t>();
124126
*version_butex = 1;
127+
pthread_mutex_init(&trace_lock, NULL);
125128
}
126129

127130
~TaskMeta() {
131+
pthread_mutex_destroy(&trace_lock);
128132
butex_destroy(version_butex);
129133
version_butex = NULL;
130134
pthread_spin_destroy(&version_lock);

src/bthread/task_tracer.cpp

Lines changed: 34 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ void TaskTracer::Result::OutputToStream(std::ostream& os) const {
119119

120120
bool TaskTracer::Init() {
121121
if (_trace_time.expose("bthread_trace_time") != 0) {
122+
LOG(ERROR) << "Fail to expose bthread_trace_time";
122123
return false;
123124
}
124125
if (!RegisterSignalHandler()) {
@@ -136,7 +137,7 @@ void TaskTracer::set_status(TaskStatus s, TaskMeta* m) {
136137
CHECK_NE(TASK_STATUS_RUNNING, s) << "Use `set_running_status' instead";
137138
CHECK_NE(TASK_STATUS_END, s) << "Use `set_end_status_unsafe' instead";
138139

139-
bool tracing;
140+
bool tracing = false;
140141
{
141142
BAIDU_SCOPED_LOCK(m->version_lock);
142143
if (TASK_STATUS_UNKNOWN == m->status && TASK_STATUS_JUMPING == s) {
@@ -182,31 +183,8 @@ void TaskTracer::Trace(std::ostream& os, bthread_t tid) {
182183
}
183184

184185
void TaskTracer::WaitForTracing(TaskMeta* m) {
185-
BAIDU_SCOPED_LOCK(_mutex);
186-
while (m->traced) {
187-
_cond.Wait();
188-
}
189-
}
190-
191-
TaskStatus TaskTracer::WaitForJumping(TaskMeta* m) {
192-
// Reasons for not using locks here:
193-
// 1. It is necessary to lock before jump_stack, unlock after jump_stack,
194-
// which involves two different bthread and is prone to errors.
195-
// 2. jump_stack is fast.
196-
int i = 0;
197-
do {
198-
// The bthread is jumping now, spin until it finishes.
199-
if (i++ < 30) {
200-
cpu_relax();
201-
} else {
202-
sched_yield();
203-
}
204-
205-
BAIDU_SCOPED_LOCK(m->version_lock);
206-
if (TASK_STATUS_JUMPING != m->status) {
207-
return m->status;
208-
}
209-
} while (true);
186+
BAIDU_SCOPED_LOCK(m->trace_lock);
187+
// Acquiring trace_lock means tracing is done.
210188
}
211189

212190
TaskTracer::Result TaskTracer::TraceImpl(bthread_t tid) {
@@ -224,25 +202,44 @@ TaskTracer::Result TaskTracer::TraceImpl(bthread_t tid) {
224202
// Make sure only one bthread is traced at a time.
225203
BAIDU_SCOPED_LOCK(_trace_request_mutex);
226204

205+
// The chance to remove unused SignalSyncs.
206+
auto iter = std::remove_if(
207+
_inuse_signal_syncs.begin(), _inuse_signal_syncs.end(),
208+
[](butil::intrusive_ptr<SignalSync>& sync) {
209+
return sync->ref_count() == 1;
210+
});
211+
_inuse_signal_syncs.erase(iter, _inuse_signal_syncs.end());
212+
227213
TaskMeta* m = TaskGroup::address_meta(tid);
228214
if (NULL == m) {
229215
return Result::MakeErrorResult("bthread=%d never existed", tid);
230216
}
231217

232-
BAIDU_SCOPED_LOCK(_mutex);
218+
BAIDU_SCOPED_LOCK(m->trace_lock);
233219
TaskStatus status;
234220
pthread_t worker_tid;
235221
const uint32_t given_version = get_version(tid);
236222
{
237223
BAIDU_SCOPED_LOCK(m->version_lock);
238-
if (given_version == *m->version_butex) {
239-
// Start tracing.
240-
m->traced = true;
241-
worker_tid = m->worker_tid;
242-
status = m->status;
243-
} else {
224+
if (given_version != *m->version_butex) {
244225
return Result::MakeErrorResult("bthread=%d not exist now", tid);
245226
}
227+
228+
status = m->status;
229+
if (TASK_STATUS_UNKNOWN == status) {
230+
return Result::MakeErrorResult("bthread=%d not exist now", tid);
231+
} else if (TASK_STATUS_CREATED == status) {
232+
return Result::MakeErrorResult("bthread=%d has just been created", tid);
233+
} else if (TASK_STATUS_FIRST_READY == status) {
234+
return Result::MakeErrorResult("bthread=%d is scheduled for the first time", tid);
235+
} else if (TASK_STATUS_END == status) {
236+
return Result::MakeErrorResult("bthread=%d has ended", tid);
237+
} else if (TASK_STATUS_JUMPING == status) {
238+
return Result::MakeErrorResult("bthread=%d is jumping stack", tid);
239+
}
240+
// Start tracing.
241+
m->traced = true;
242+
worker_tid = m->worker_tid;
246243
}
247244

248245
BRPC_SCOPE_EXIT {
@@ -252,31 +249,16 @@ TaskTracer::Result TaskTracer::TraceImpl(bthread_t tid) {
252249
// tracing completion, so given_version != *m->version_butex is OK.
253250
m->traced = false;
254251
}
255-
// Wake up the waiting worker thread to jump.
256-
_cond.Signal();
257252
};
258253

259-
if (TASK_STATUS_UNKNOWN == status) {
260-
return Result::MakeErrorResult("bthread=%d not exist now", tid);
261-
} else if (TASK_STATUS_CREATED == status) {
262-
return Result::MakeErrorResult("bthread=%d has just been created", tid);
263-
} else if (TASK_STATUS_FIRST_READY == status) {
264-
return Result::MakeErrorResult("bthread=%d is scheduled for the first time", tid);
265-
} else if (TASK_STATUS_END == status) {
266-
return Result::MakeErrorResult("bthread=%d has ended", tid);
267-
} else if (TASK_STATUS_JUMPING == status) {
268-
// Wait for jumping completion.
269-
status = WaitForJumping(m);
270-
}
271-
272-
// After jumping, the status may be RUNNING, SUSPENDED, or READY, which is traceable.
254+
// The status may be RUNNING, SUSPENDED, or READY, which is traceable.
273255
if (TASK_STATUS_RUNNING == status) {
274256
return SignalTrace(worker_tid);
275257
} else if (TASK_STATUS_SUSPENDED == status || TASK_STATUS_READY == status) {
276258
return ContextTrace(m->stack->context);
277259
}
278260

279-
return Result::MakeErrorResult("Invalid TaskStatus=%d", status);
261+
return Result::MakeErrorResult("Invalid TaskStatus=%d of bthread=%d", status, tid);
280262
}
281263

282264
// Instruct ASan to ignore this function.
@@ -408,14 +390,6 @@ TaskTracer::Result TaskTracer::SignalTrace(pthread_t worker_tid) {
408390
return Result::MakeErrorResult("Forbid to trace self");
409391
}
410392

411-
// Remove unused SignalSyncs.
412-
auto iter = std::remove_if(
413-
_inuse_signal_syncs.begin(), _inuse_signal_syncs.end(),
414-
[](butil::intrusive_ptr<SignalSync>& sync) {
415-
return sync->ref_count() == 1;
416-
});
417-
_inuse_signal_syncs.erase(iter, _inuse_signal_syncs.end());
418-
419393
// Each signal trace has an independent SignalSync to
420394
// prevent the previous SignalHandler from affecting the new SignalTrace.
421395
butil::intrusive_ptr<SignalSync> signal_sync(new SignalSync());
@@ -465,6 +439,8 @@ TaskTracer::Result TaskTracer::SignalTrace(pthread_t worker_tid) {
465439
}
466440
break;
467441
}
442+
// Remove the successful SignalSync.
443+
_inuse_signal_syncs.pop_back();
468444

469445
return signal_sync->result;
470446
}

src/bthread/task_tracer.h

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class TaskTracer {
4747
void Trace(std::ostream& os, bthread_t tid);
4848

4949
// When the worker is jumping stack from a bthread to another,
50-
void WaitForTracing(TaskMeta* m);
50+
static void WaitForTracing(TaskMeta* m);
5151

5252
private:
5353
// Error number guard used in signal handler.
@@ -94,7 +94,6 @@ class TaskTracer {
9494
Result result;
9595
};
9696

97-
static TaskStatus WaitForJumping(TaskMeta* m);
9897
Result TraceImpl(bthread_t tid);
9998

10099
unw_cursor_t MakeCursor(bthread_fcontext_t fcontext);
@@ -108,11 +107,6 @@ class TaskTracer {
108107
// Make sure only one bthread is traced at a time.
109108
Mutex _trace_request_mutex;
110109

111-
// For signal trace.
112-
// Make sure bthread does not jump stack when it is being traced.
113-
butil::Mutex _mutex;
114-
butil::ConditionVariable _cond{&_mutex};
115-
116110
// For context trace.
117111
unw_context_t _context{};
118112

test/brpc_http_rpc_protocol_unittest.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1701,9 +1701,9 @@ TEST_F(HttpTest, spring_protobuf_content_type) {
17011701
res.Clear();
17021702
cntl2.http_request().set_content_type("application/x-protobuf");
17031703
stub.Echo(&cntl2, &req, &res, nullptr);
1704-
ASSERT_FALSE(cntl.Failed());
1704+
ASSERT_FALSE(cntl2.Failed());
17051705
ASSERT_EQ(EXP_RESPONSE, res.message());
1706-
ASSERT_EQ("application/x-protobuf", cntl.http_response().content_type());
1706+
ASSERT_EQ("application/x-protobuf", cntl2.http_response().content_type());
17071707
}
17081708

17091709
TEST_F(HttpTest, dump_http_request) {

0 commit comments

Comments
 (0)