Skip to content

Commit d8420c4

Browse files
[BugFix] Fix use-after-free in ThreadPool::do_submit when thread creation fails (#71276)
Signed-off-by: sevev <qiangzh95@gmail.com> Co-authored-by: wanpengfei-git <wanpengfei91@163.com>
1 parent 7d01c25 commit d8420c4

2 files changed

Lines changed: 181 additions & 5 deletions

File tree

be/src/common/thread/threadpool.cpp

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -426,10 +426,37 @@ Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token
426426
int inactive_threads = _num_threads + _num_threads_pending_start - _active_threads;
427427
int additional_threads = static_cast<int>(_queue.size()) + threads_from_this_submit - inactive_threads;
428428
bool need_a_thread = false;
429+
bool sole_thread = false;
429430
if (additional_threads > 0 &&
430431
_num_threads + _num_threads_pending_start < _max_threads.load(std::memory_order_acquire)) {
431432
need_a_thread = true;
432433
_num_threads_pending_start++;
434+
sole_thread = (_num_threads + _num_threads_pending_start == 1);
435+
}
436+
437+
// When this submit needs to create the very first thread for the pool
438+
// (sole_thread == true), create it while still holding the lock and BEFORE
439+
// enqueuing the task.
440+
//
441+
// Why create before enqueue?
442+
// If thread creation fails and no threads exist, the caller receives an
443+
// error and performs its own cleanup (e.g. counting down a latch). If the
444+
// task were already in the queue, a later successful submit could spawn a
445+
// thread that picks up the orphaned task, executing it against already-
446+
// destroyed state — a use-after-free.
447+
//
448+
// Why hold the lock during thread creation?
449+
// If we released the lock, a concurrent submitter could see our
450+
// _num_threads_pending_start and assume a thread is available, enqueue
451+
// its task without creating a thread, and then have that task stranded
452+
// if our create_thread() fails. The pool has zero threads here, so no
453+
// worker is blocked waiting for this lock.
454+
if (sole_thread) {
455+
Status status = create_thread();
456+
if (!status.ok()) {
457+
_num_threads_pending_start--;
458+
return status;
459+
}
433460
}
434461

435462
TEST_SYNC_POINT_CALLBACK("ThreadPool::do_submit:replace_task", &r);
@@ -462,15 +489,11 @@ Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token
462489
}
463490
unique_lock.unlock();
464491

465-
if (need_a_thread) {
492+
if (need_a_thread && !sole_thread) {
466493
Status status = create_thread();
467494
if (!status.ok()) {
468495
unique_lock.lock();
469496
_num_threads_pending_start--;
470-
if (_num_threads + _num_threads_pending_start == 0) {
471-
// If we have no threads, we can't do any work.
472-
return status;
473-
}
474497
// If we failed to create a thread, but there are still some other
475498
// worker threads, log a warning message and continue.
476499
LOG(ERROR) << "Thread pool failed to create thread: " << status.to_string() << "\n" << get_stack_trace();
@@ -733,6 +756,11 @@ void ThreadPool::dispatch_thread() {
733756
}
734757

735758
Status ThreadPool::create_thread() {
759+
Status status;
760+
TEST_SYNC_POINT_CALLBACK("ThreadPool::create_thread", &status);
761+
if (!status.ok()) {
762+
return status;
763+
}
736764
return Thread::create("thread pool", _name, &ThreadPool::dispatch_thread, this, nullptr);
737765
}
738766

be/test/common/thread/threadpool_test.cpp

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include "base/metrics.h"
4242
#include "base/random/random.h"
4343
#include "base/testutil/assert.h"
44+
#include "base/testutil/sync_point.h"
4445
#include "base/time/monotime.h"
4546
#include "base/utility/scoped_cleanup.h"
4647
#include "common/logging.h"
@@ -990,4 +991,151 @@ TEST_F(ThreadPoolTest, TestLIFOThreadWakeUps) {
990991
}
991992
*/
992993

994+
// Regression test for a use-after-free bug in do_submit().
995+
//
996+
// When the pool has min_threads=0 and all threads have exited, a new submit
997+
// must create a thread. The old code enqueued the task first, then attempted
998+
// thread creation. If creation failed (e.g. EAGAIN), it returned an error
999+
// WITHOUT removing the task from the queue. The caller, seeing the error,
1000+
// would perform its own cleanup (e.g. counting down a latch). A later
1001+
// successful submit could then create a thread that picks up the orphaned
1002+
// task, executing it against already-destroyed state (use-after-free).
1003+
//
1004+
// The fix creates the thread BEFORE enqueuing the task when the pool has
1005+
// zero threads. If thread creation fails, the task is never enqueued.
1006+
TEST_F(ThreadPoolTest, TestSubmitFailsCleanlyWhenNoThreadsExist) {
1007+
// Pool with min_threads=0 so all threads can idle-exit.
1008+
ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
1009+
.set_min_threads(0)
1010+
.set_max_threads(4)
1011+
.set_idle_timeout(MonoDelta::FromMilliseconds(1)))
1012+
.ok());
1013+
1014+
// Wait for threads to exit (min_threads=0, short idle timeout).
1015+
ASSERT_EQ(0, _pool->num_threads());
1016+
1017+
// Inject create_thread failure to simulate EAGAIN (resource exhaustion).
1018+
SyncPoint::GetInstance()->SetCallBack("ThreadPool::create_thread", [](void* arg) {
1019+
auto* status = static_cast<Status*>(arg);
1020+
*status = Status::RuntimeError("Could not create thread: Resource temporarily unavailable");
1021+
});
1022+
SyncPoint::GetInstance()->EnableProcessing();
1023+
SCOPED_CLEANUP({
1024+
SyncPoint::GetInstance()->ClearCallBack("ThreadPool::create_thread");
1025+
SyncPoint::GetInstance()->DisableProcessing();
1026+
});
1027+
1028+
std::atomic<int> run_count{0};
1029+
Status s = _pool->submit_func([&]() { run_count++; });
1030+
// Submit should fail because thread creation failed.
1031+
ASSERT_FALSE(s.ok());
1032+
// The task must NOT have been enqueued.
1033+
ASSERT_EQ(0, _pool->_total_queued_tasks);
1034+
ASSERT_EQ(0, run_count);
1035+
1036+
// Now disable the failure injection and verify normal operation resumes.
1037+
SyncPoint::GetInstance()->ClearCallBack("ThreadPool::create_thread");
1038+
SyncPoint::GetInstance()->DisableProcessing();
1039+
1040+
CountDownLatch latch(1);
1041+
s = _pool->submit_func([&]() {
1042+
run_count++;
1043+
latch.count_down();
1044+
});
1045+
ASSERT_TRUE(s.ok());
1046+
latch.wait();
1047+
ASSERT_EQ(1, run_count);
1048+
_pool->wait();
1049+
_pool->shutdown();
1050+
}
1051+
1052+
// Verify that when thread creation fails but other threads exist in the pool,
1053+
// the task is still enqueued and eventually processed (no error returned).
1054+
TEST_F(ThreadPoolTest, TestSubmitSucceedsWhenThreadCreationFailsButThreadsExist) {
1055+
// Pool with min_threads=1 so at least one thread always exists.
1056+
ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
1057+
.set_min_threads(1)
1058+
.set_max_threads(4)
1059+
.set_idle_timeout(MonoDelta::FromMilliseconds(kThreadIdleTimeoutMs)))
1060+
.ok());
1061+
ASSERT_EQ(1, _pool->num_threads());
1062+
1063+
// Block the existing thread so a new submit requires a new thread.
1064+
CountDownLatch block_latch(1);
1065+
ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&block_latch)).ok());
1066+
1067+
// Now inject create_thread failure.
1068+
SyncPoint::GetInstance()->SetCallBack("ThreadPool::create_thread", [](void* arg) {
1069+
auto* status = static_cast<Status*>(arg);
1070+
*status = Status::RuntimeError("Could not create thread: Resource temporarily unavailable");
1071+
});
1072+
SyncPoint::GetInstance()->EnableProcessing();
1073+
1074+
// Submit should succeed because existing thread can process the task.
1075+
std::atomic<int> run_count{0};
1076+
Status s = _pool->submit_func([&]() { run_count++; });
1077+
ASSERT_TRUE(s.ok());
1078+
1079+
// Disable failure injection and unblock.
1080+
SyncPoint::GetInstance()->ClearCallBack("ThreadPool::create_thread");
1081+
SyncPoint::GetInstance()->DisableProcessing();
1082+
block_latch.count_down();
1083+
1084+
_pool->wait();
1085+
ASSERT_EQ(1, run_count);
1086+
_pool->shutdown();
1087+
}
1088+
1089+
// Verify the original crash scenario: a stack-allocated latch captured by
1090+
// reference in a task. If the task were orphaned in the queue after a failed
1091+
// submit, a later thread would execute it and count_down a destroyed latch.
1092+
// With the fix, the task is never enqueued, so the latch is safe.
1093+
TEST_F(ThreadPoolTest, TestSubmitFailureDoesNotCauseUseAfterFree) {
1094+
ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
1095+
.set_min_threads(0)
1096+
.set_max_threads(4)
1097+
.set_idle_timeout(MonoDelta::FromMilliseconds(1)))
1098+
.ok());
1099+
ASSERT_EQ(0, _pool->num_threads());
1100+
1101+
// First call: fail. Subsequent calls: succeed.
1102+
std::atomic<int> create_call_count{0};
1103+
SyncPoint::GetInstance()->SetCallBack("ThreadPool::create_thread", [&](void* arg) {
1104+
if (create_call_count.fetch_add(1) == 0) {
1105+
auto* status = static_cast<Status*>(arg);
1106+
*status = Status::RuntimeError("Could not create thread: Resource temporarily unavailable");
1107+
}
1108+
});
1109+
SyncPoint::GetInstance()->EnableProcessing();
1110+
SCOPED_CLEANUP({
1111+
SyncPoint::GetInstance()->ClearCallBack("ThreadPool::create_thread");
1112+
SyncPoint::GetInstance()->DisableProcessing();
1113+
});
1114+
1115+
// Simulate the crash scenario: stack-allocated latch, task captured by ref.
1116+
{
1117+
const int kNumTasks = 5;
1118+
CountDownLatch latch(kNumTasks);
1119+
int submit_failures = 0;
1120+
1121+
for (int i = 0; i < kNumTasks; i++) {
1122+
Status s = _pool->submit_func([&latch]() { latch.count_down(); });
1123+
if (!s.ok()) {
1124+
// Caller counts down on failure (like the original bug scenario).
1125+
latch.count_down();
1126+
submit_failures++;
1127+
}
1128+
}
1129+
// First submit should fail (injected), rest should succeed.
1130+
ASSERT_EQ(1, submit_failures);
1131+
// Wait for all count_downs (from both successful tasks and caller cleanup).
1132+
latch.wait();
1133+
}
1134+
// If the orphaned task bug existed, this point would crash (use-after-free
1135+
// on the destroyed latch). With the fix, we reach here safely.
1136+
1137+
_pool->wait();
1138+
_pool->shutdown();
1139+
}
1140+
9931141
} // namespace starrocks

0 commit comments

Comments
 (0)