Skip to content

Commit 40852ab

Browse files
mergify[bot]sevevwanpengfei-git
authored
[BugFix] Fix use-after-free in ThreadPool::do_submit when thread creation fails (backport #71276) (#71351)
Signed-off-by: zhangqiang <qiangzh95@gmail.com> Co-authored-by: zhangqiang <qiangzh95@gmail.com> Co-authored-by: wanpengfei-git <wanpengfei91@163.com>
1 parent 4020e6c commit 40852ab

2 files changed

Lines changed: 181 additions & 5 deletions

File tree

be/src/util/threadpool.cpp

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -431,10 +431,37 @@ Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token
431431
int inactive_threads = _num_threads + _num_threads_pending_start - _active_threads;
432432
int additional_threads = static_cast<int>(_queue.size()) + threads_from_this_submit - inactive_threads;
433433
bool need_a_thread = false;
434+
bool sole_thread = false;
434435
if (additional_threads > 0 &&
435436
_num_threads + _num_threads_pending_start < _max_threads.load(std::memory_order_acquire)) {
436437
need_a_thread = true;
437438
_num_threads_pending_start++;
439+
sole_thread = (_num_threads + _num_threads_pending_start == 1);
440+
}
441+
442+
// When this submit needs to create the very first thread for the pool
443+
// (sole_thread == true), create it while still holding the lock and BEFORE
444+
// enqueuing the task.
445+
//
446+
// Why create before enqueue?
447+
// If thread creation fails and no threads exist, the caller receives an
448+
// error and performs its own cleanup (e.g. counting down a latch). If the
449+
// task were already in the queue, a later successful submit could spawn a
450+
// thread that picks up the orphaned task, executing it against already-
451+
// destroyed state — a use-after-free.
452+
//
453+
// Why hold the lock during thread creation?
454+
// If we released the lock, a concurrent submitter could see our
455+
// _num_threads_pending_start and assume a thread is available, enqueue
456+
// its task without creating a thread, and then have that task stranded
457+
// if our create_thread() fails. The pool has zero threads here, so no
458+
// worker is blocked waiting for this lock.
459+
if (sole_thread) {
460+
Status status = create_thread();
461+
if (!status.ok()) {
462+
_num_threads_pending_start--;
463+
return status;
464+
}
438465
}
439466

440467
TEST_SYNC_POINT_CALLBACK("ThreadPool::do_submit:replace_task", &r);
@@ -467,15 +494,11 @@ Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token
467494
}
468495
unique_lock.unlock();
469496

470-
if (need_a_thread) {
497+
if (need_a_thread && !sole_thread) {
471498
Status status = create_thread();
472499
if (!status.ok()) {
473500
unique_lock.lock();
474501
_num_threads_pending_start--;
475-
if (_num_threads + _num_threads_pending_start == 0) {
476-
// If we have no threads, we can't do any work.
477-
return status;
478-
}
479502
// If we failed to create a thread, but there are still some other
480503
// worker threads, log a warning message and continue.
481504
LOG(ERROR) << "Thread pool failed to create thread: " << status.to_string() << "\n" << get_stack_trace();
@@ -738,6 +761,11 @@ void ThreadPool::dispatch_thread() {
738761
}
739762

740763
Status ThreadPool::create_thread() {
764+
Status status;
765+
TEST_SYNC_POINT_CALLBACK("ThreadPool::create_thread", &status);
766+
if (!status.ok()) {
767+
return status;
768+
}
741769
return Thread::create("thread pool", _name, &ThreadPool::dispatch_thread, this, nullptr);
742770
}
743771

be/test/util/threadpool_test.cpp

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
#include "gutil/sysinfo.h"
4545
#include "gutil/walltime.h"
4646
#include "testutil/assert.h"
47+
#include "testutil/sync_point.h"
4748
#include "util/await.h"
4849
#include "util/countdown_latch.h"
4950
#include "util/metrics.h"
@@ -987,4 +988,151 @@ TEST_F(ThreadPoolTest, TestLIFOThreadWakeUps) {
987988
}
988989
*/
989990

991+
// Regression test for a use-after-free bug in do_submit().
992+
//
993+
// When the pool has min_threads=0 and all threads have exited, a new submit
994+
// must create a thread. The old code enqueued the task first, then attempted
995+
// thread creation. If creation failed (e.g. EAGAIN), it returned an error
996+
// WITHOUT removing the task from the queue. The caller, seeing the error,
997+
// would perform its own cleanup (e.g. counting down a latch). A later
998+
// successful submit could then create a thread that picks up the orphaned
999+
// task, executing it against already-destroyed state (use-after-free).
1000+
//
1001+
// The fix creates the thread BEFORE enqueuing the task when the pool has
1002+
// zero threads. If thread creation fails, the task is never enqueued.
1003+
TEST_F(ThreadPoolTest, TestSubmitFailsCleanlyWhenNoThreadsExist) {
1004+
// Pool with min_threads=0 so all threads can idle-exit.
1005+
ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
1006+
.set_min_threads(0)
1007+
.set_max_threads(4)
1008+
.set_idle_timeout(MonoDelta::FromMilliseconds(1)))
1009+
.ok());
1010+
1011+
// Wait for threads to exit (min_threads=0, short idle timeout).
1012+
ASSERT_EQ(0, _pool->num_threads());
1013+
1014+
// Inject create_thread failure to simulate EAGAIN (resource exhaustion).
1015+
SyncPoint::GetInstance()->SetCallBack("ThreadPool::create_thread", [](void* arg) {
1016+
auto* status = static_cast<Status*>(arg);
1017+
*status = Status::RuntimeError("Could not create thread: Resource temporarily unavailable");
1018+
});
1019+
SyncPoint::GetInstance()->EnableProcessing();
1020+
SCOPED_CLEANUP({
1021+
SyncPoint::GetInstance()->ClearCallBack("ThreadPool::create_thread");
1022+
SyncPoint::GetInstance()->DisableProcessing();
1023+
});
1024+
1025+
std::atomic<int> run_count{0};
1026+
Status s = _pool->submit_func([&]() { run_count++; });
1027+
// Submit should fail because thread creation failed.
1028+
ASSERT_FALSE(s.ok());
1029+
// The task must NOT have been enqueued.
1030+
ASSERT_EQ(0, _pool->_total_queued_tasks);
1031+
ASSERT_EQ(0, run_count);
1032+
1033+
// Now disable the failure injection and verify normal operation resumes.
1034+
SyncPoint::GetInstance()->ClearCallBack("ThreadPool::create_thread");
1035+
SyncPoint::GetInstance()->DisableProcessing();
1036+
1037+
CountDownLatch latch(1);
1038+
s = _pool->submit_func([&]() {
1039+
run_count++;
1040+
latch.count_down();
1041+
});
1042+
ASSERT_TRUE(s.ok());
1043+
latch.wait();
1044+
ASSERT_EQ(1, run_count);
1045+
_pool->wait();
1046+
_pool->shutdown();
1047+
}
1048+
1049+
// Verify that when thread creation fails but other threads exist in the pool,
1050+
// the task is still enqueued and eventually processed (no error returned).
1051+
TEST_F(ThreadPoolTest, TestSubmitSucceedsWhenThreadCreationFailsButThreadsExist) {
1052+
// Pool with min_threads=1 so at least one thread always exists.
1053+
ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
1054+
.set_min_threads(1)
1055+
.set_max_threads(4)
1056+
.set_idle_timeout(MonoDelta::FromMilliseconds(kThreadIdleTimeoutMs)))
1057+
.ok());
1058+
ASSERT_EQ(1, _pool->num_threads());
1059+
1060+
// Block the existing thread so a new submit requires a new thread.
1061+
CountDownLatch block_latch(1);
1062+
ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&block_latch)).ok());
1063+
1064+
// Now inject create_thread failure.
1065+
SyncPoint::GetInstance()->SetCallBack("ThreadPool::create_thread", [](void* arg) {
1066+
auto* status = static_cast<Status*>(arg);
1067+
*status = Status::RuntimeError("Could not create thread: Resource temporarily unavailable");
1068+
});
1069+
SyncPoint::GetInstance()->EnableProcessing();
1070+
1071+
// Submit should succeed because existing thread can process the task.
1072+
std::atomic<int> run_count{0};
1073+
Status s = _pool->submit_func([&]() { run_count++; });
1074+
ASSERT_TRUE(s.ok());
1075+
1076+
// Disable failure injection and unblock.
1077+
SyncPoint::GetInstance()->ClearCallBack("ThreadPool::create_thread");
1078+
SyncPoint::GetInstance()->DisableProcessing();
1079+
block_latch.count_down();
1080+
1081+
_pool->wait();
1082+
ASSERT_EQ(1, run_count);
1083+
_pool->shutdown();
1084+
}
1085+
1086+
// Verify the original crash scenario: a stack-allocated latch captured by
1087+
// reference in a task. If the task were orphaned in the queue after a failed
1088+
// submit, a later thread would execute it and count_down a destroyed latch.
1089+
// With the fix, the task is never enqueued, so the latch is safe.
1090+
TEST_F(ThreadPoolTest, TestSubmitFailureDoesNotCauseUseAfterFree) {
1091+
ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
1092+
.set_min_threads(0)
1093+
.set_max_threads(4)
1094+
.set_idle_timeout(MonoDelta::FromMilliseconds(1)))
1095+
.ok());
1096+
ASSERT_EQ(0, _pool->num_threads());
1097+
1098+
// First call: fail. Subsequent calls: succeed.
1099+
std::atomic<int> create_call_count{0};
1100+
SyncPoint::GetInstance()->SetCallBack("ThreadPool::create_thread", [&](void* arg) {
1101+
if (create_call_count.fetch_add(1) == 0) {
1102+
auto* status = static_cast<Status*>(arg);
1103+
*status = Status::RuntimeError("Could not create thread: Resource temporarily unavailable");
1104+
}
1105+
});
1106+
SyncPoint::GetInstance()->EnableProcessing();
1107+
SCOPED_CLEANUP({
1108+
SyncPoint::GetInstance()->ClearCallBack("ThreadPool::create_thread");
1109+
SyncPoint::GetInstance()->DisableProcessing();
1110+
});
1111+
1112+
// Simulate the crash scenario: stack-allocated latch, task captured by ref.
1113+
{
1114+
const int kNumTasks = 5;
1115+
CountDownLatch latch(kNumTasks);
1116+
int submit_failures = 0;
1117+
1118+
for (int i = 0; i < kNumTasks; i++) {
1119+
Status s = _pool->submit_func([&latch]() { latch.count_down(); });
1120+
if (!s.ok()) {
1121+
// Caller counts down on failure (like the original bug scenario).
1122+
latch.count_down();
1123+
submit_failures++;
1124+
}
1125+
}
1126+
// First submit should fail (injected), rest should succeed.
1127+
ASSERT_EQ(1, submit_failures);
1128+
// Wait for all count_downs (from both successful tasks and caller cleanup).
1129+
latch.wait();
1130+
}
1131+
// If the orphaned task bug existed, this point would crash (use-after-free
1132+
// on the destroyed latch). With the fix, we reach here safely.
1133+
1134+
_pool->wait();
1135+
_pool->shutdown();
1136+
}
1137+
9901138
} // namespace starrocks

0 commit comments

Comments
 (0)