Skip to content

Commit 5140f36

Browse files
sevevmergify[bot]
authored andcommitted
[BugFix] Fix use-after-free in ThreadPool::do_submit when thread creation fails (#71276)
Signed-off-by: sevev <qiangzh95@gmail.com> Co-authored-by: wanpengfei-git <wanpengfei91@163.com> (cherry picked from commit d8420c4) # Conflicts: # be/test/util/threadpool_test.cpp
1 parent 7334e24 commit 5140f36

2 files changed

Lines changed: 192 additions & 5 deletions

File tree

be/src/util/threadpool.cpp

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -431,10 +431,37 @@ Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token
431431
int inactive_threads = _num_threads + _num_threads_pending_start - _active_threads;
432432
int additional_threads = static_cast<int>(_queue.size()) + threads_from_this_submit - inactive_threads;
433433
bool need_a_thread = false;
434+
bool sole_thread = false;
434435
if (additional_threads > 0 &&
435436
_num_threads + _num_threads_pending_start < _max_threads.load(std::memory_order_acquire)) {
436437
need_a_thread = true;
437438
_num_threads_pending_start++;
439+
sole_thread = (_num_threads + _num_threads_pending_start == 1);
440+
}
441+
442+
// When this submit needs to create the very first thread for the pool
443+
// (sole_thread == true), create it while still holding the lock and BEFORE
444+
// enqueuing the task.
445+
//
446+
// Why create before enqueue?
447+
// If thread creation fails and no threads exist, the caller receives an
448+
// error and performs its own cleanup (e.g. counting down a latch). If the
449+
// task were already in the queue, a later successful submit could spawn a
450+
// thread that picks up the orphaned task, executing it against already-
451+
// destroyed state — a use-after-free.
452+
//
453+
// Why hold the lock during thread creation?
454+
// If we released the lock, a concurrent submitter could see our
455+
// _num_threads_pending_start and assume a thread is available, enqueue
456+
// its task without creating a thread, and then have that task stranded
457+
// if our create_thread() fails. The pool has zero threads here, so no
458+
// worker is blocked waiting for this lock.
459+
if (sole_thread) {
460+
Status status = create_thread();
461+
if (!status.ok()) {
462+
_num_threads_pending_start--;
463+
return status;
464+
}
438465
}
439466

440467
TEST_SYNC_POINT_CALLBACK("ThreadPool::do_submit:replace_task", &r);
@@ -467,15 +494,11 @@ Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token
467494
}
468495
unique_lock.unlock();
469496

470-
if (need_a_thread) {
497+
if (need_a_thread && !sole_thread) {
471498
Status status = create_thread();
472499
if (!status.ok()) {
473500
unique_lock.lock();
474501
_num_threads_pending_start--;
475-
if (_num_threads + _num_threads_pending_start == 0) {
476-
// If we have no threads, we can't do any work.
477-
return status;
478-
}
479502
// If we failed to create a thread, but there are still some other
480503
// worker threads, log a warning message and continue.
481504
LOG(ERROR) << "Thread pool failed to create thread: " << status.to_string() << "\n" << get_stack_trace();
@@ -738,6 +761,11 @@ void ThreadPool::dispatch_thread() {
738761
}
739762

740763
Status ThreadPool::create_thread() {
764+
Status status;
765+
TEST_SYNC_POINT_CALLBACK("ThreadPool::create_thread", &status);
766+
if (!status.ok()) {
767+
return status;
768+
}
741769
return Thread::create("thread pool", _name, &ThreadPool::dispatch_thread, this, nullptr);
742770
}
743771

be/test/util/threadpool_test.cpp

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,18 @@
3535
#include <utility>
3636
#include <vector>
3737

38+
<<<<<<< HEAD:be/test/util/threadpool_test.cpp
39+
=======
40+
#include "base/concurrency/await.h"
41+
#include "base/concurrency/countdown_latch.h"
42+
#include "base/concurrency/spinlock.h"
43+
#include "base/metrics.h"
44+
#include "base/random/random.h"
45+
#include "base/testutil/assert.h"
46+
#include "base/testutil/sync_point.h"
47+
#include "base/time/monotime.h"
48+
#include "base/utility/scoped_cleanup.h"
49+
>>>>>>> d8420c4601 ([BugFix] Fix use-after-free in ThreadPool::do_submit when thread creation fails (#71276)):be/test/common/thread/threadpool_test.cpp
3850
#include "common/logging.h"
3951
#include "common/status.h"
4052
#include "gutil/atomicops.h"
@@ -987,4 +999,151 @@ TEST_F(ThreadPoolTest, TestLIFOThreadWakeUps) {
987999
}
9881000
*/
9891001

1002+
// Regression test for a use-after-free bug in do_submit().
1003+
//
1004+
// When the pool has min_threads=0 and all threads have exited, a new submit
1005+
// must create a thread. The old code enqueued the task first, then attempted
1006+
// thread creation. If creation failed (e.g. EAGAIN), it returned an error
1007+
// WITHOUT removing the task from the queue. The caller, seeing the error,
1008+
// would perform its own cleanup (e.g. counting down a latch). A later
1009+
// successful submit could then create a thread that picks up the orphaned
1010+
// task, executing it against already-destroyed state (use-after-free).
1011+
//
1012+
// The fix creates the thread BEFORE enqueuing the task when the pool has
1013+
// zero threads. If thread creation fails, the task is never enqueued.
1014+
TEST_F(ThreadPoolTest, TestSubmitFailsCleanlyWhenNoThreadsExist) {
1015+
// Pool with min_threads=0 so all threads can idle-exit.
1016+
ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
1017+
.set_min_threads(0)
1018+
.set_max_threads(4)
1019+
.set_idle_timeout(MonoDelta::FromMilliseconds(1)))
1020+
.ok());
1021+
1022+
// Wait for threads to exit (min_threads=0, short idle timeout).
1023+
ASSERT_EQ(0, _pool->num_threads());
1024+
1025+
// Inject create_thread failure to simulate EAGAIN (resource exhaustion).
1026+
SyncPoint::GetInstance()->SetCallBack("ThreadPool::create_thread", [](void* arg) {
1027+
auto* status = static_cast<Status*>(arg);
1028+
*status = Status::RuntimeError("Could not create thread: Resource temporarily unavailable");
1029+
});
1030+
SyncPoint::GetInstance()->EnableProcessing();
1031+
SCOPED_CLEANUP({
1032+
SyncPoint::GetInstance()->ClearCallBack("ThreadPool::create_thread");
1033+
SyncPoint::GetInstance()->DisableProcessing();
1034+
});
1035+
1036+
std::atomic<int> run_count{0};
1037+
Status s = _pool->submit_func([&]() { run_count++; });
1038+
// Submit should fail because thread creation failed.
1039+
ASSERT_FALSE(s.ok());
1040+
// The task must NOT have been enqueued.
1041+
ASSERT_EQ(0, _pool->_total_queued_tasks);
1042+
ASSERT_EQ(0, run_count);
1043+
1044+
// Now disable the failure injection and verify normal operation resumes.
1045+
SyncPoint::GetInstance()->ClearCallBack("ThreadPool::create_thread");
1046+
SyncPoint::GetInstance()->DisableProcessing();
1047+
1048+
CountDownLatch latch(1);
1049+
s = _pool->submit_func([&]() {
1050+
run_count++;
1051+
latch.count_down();
1052+
});
1053+
ASSERT_TRUE(s.ok());
1054+
latch.wait();
1055+
ASSERT_EQ(1, run_count);
1056+
_pool->wait();
1057+
_pool->shutdown();
1058+
}
1059+
1060+
// Verify that when thread creation fails but other threads exist in the pool,
1061+
// the task is still enqueued and eventually processed (no error returned).
1062+
TEST_F(ThreadPoolTest, TestSubmitSucceedsWhenThreadCreationFailsButThreadsExist) {
1063+
// Pool with min_threads=1 so at least one thread always exists.
1064+
ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
1065+
.set_min_threads(1)
1066+
.set_max_threads(4)
1067+
.set_idle_timeout(MonoDelta::FromMilliseconds(kThreadIdleTimeoutMs)))
1068+
.ok());
1069+
ASSERT_EQ(1, _pool->num_threads());
1070+
1071+
// Block the existing thread so a new submit requires a new thread.
1072+
CountDownLatch block_latch(1);
1073+
ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&block_latch)).ok());
1074+
1075+
// Now inject create_thread failure.
1076+
SyncPoint::GetInstance()->SetCallBack("ThreadPool::create_thread", [](void* arg) {
1077+
auto* status = static_cast<Status*>(arg);
1078+
*status = Status::RuntimeError("Could not create thread: Resource temporarily unavailable");
1079+
});
1080+
SyncPoint::GetInstance()->EnableProcessing();
1081+
1082+
// Submit should succeed because existing thread can process the task.
1083+
std::atomic<int> run_count{0};
1084+
Status s = _pool->submit_func([&]() { run_count++; });
1085+
ASSERT_TRUE(s.ok());
1086+
1087+
// Disable failure injection and unblock.
1088+
SyncPoint::GetInstance()->ClearCallBack("ThreadPool::create_thread");
1089+
SyncPoint::GetInstance()->DisableProcessing();
1090+
block_latch.count_down();
1091+
1092+
_pool->wait();
1093+
ASSERT_EQ(1, run_count);
1094+
_pool->shutdown();
1095+
}
1096+
1097+
// Verify the original crash scenario: a stack-allocated latch captured by
1098+
// reference in a task. If the task were orphaned in the queue after a failed
1099+
// submit, a later thread would execute it and count_down a destroyed latch.
1100+
// With the fix, the task is never enqueued, so the latch is safe.
1101+
TEST_F(ThreadPoolTest, TestSubmitFailureDoesNotCauseUseAfterFree) {
1102+
ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
1103+
.set_min_threads(0)
1104+
.set_max_threads(4)
1105+
.set_idle_timeout(MonoDelta::FromMilliseconds(1)))
1106+
.ok());
1107+
ASSERT_EQ(0, _pool->num_threads());
1108+
1109+
// First call: fail. Subsequent calls: succeed.
1110+
std::atomic<int> create_call_count{0};
1111+
SyncPoint::GetInstance()->SetCallBack("ThreadPool::create_thread", [&](void* arg) {
1112+
if (create_call_count.fetch_add(1) == 0) {
1113+
auto* status = static_cast<Status*>(arg);
1114+
*status = Status::RuntimeError("Could not create thread: Resource temporarily unavailable");
1115+
}
1116+
});
1117+
SyncPoint::GetInstance()->EnableProcessing();
1118+
SCOPED_CLEANUP({
1119+
SyncPoint::GetInstance()->ClearCallBack("ThreadPool::create_thread");
1120+
SyncPoint::GetInstance()->DisableProcessing();
1121+
});
1122+
1123+
// Simulate the crash scenario: stack-allocated latch, task captured by ref.
1124+
{
1125+
const int kNumTasks = 5;
1126+
CountDownLatch latch(kNumTasks);
1127+
int submit_failures = 0;
1128+
1129+
for (int i = 0; i < kNumTasks; i++) {
1130+
Status s = _pool->submit_func([&latch]() { latch.count_down(); });
1131+
if (!s.ok()) {
1132+
// Caller counts down on failure (like the original bug scenario).
1133+
latch.count_down();
1134+
submit_failures++;
1135+
}
1136+
}
1137+
// First submit should fail (injected), rest should succeed.
1138+
ASSERT_EQ(1, submit_failures);
1139+
// Wait for all count_downs (from both successful tasks and caller cleanup).
1140+
latch.wait();
1141+
}
1142+
// If the orphaned task bug existed, this point would crash (use-after-free
1143+
// on the destroyed latch). With the fix, we reach here safely.
1144+
1145+
_pool->wait();
1146+
_pool->shutdown();
1147+
}
1148+
9901149
} // namespace starrocks

0 commit comments

Comments
 (0)