Skip to content

Commit ff1012d

Browse files
authored
[fix](be) Fix time-sharing executor queued task count (#63568)
### What problem does this PR solve? Issue Number: N/A Related PR: N/A Problem Summary: `TimeSharingTaskExecutor` uses `_total_queued_tasks` for queue-size metrics and capacity checks. When queued splits were removed before execution, for example when a task was cancelled or removed, the split queue removed those splits but `_total_queued_tasks` was not decremented. After repeated removals, `_total_queued_tasks` could become larger than the real queue size. This made the executor report a non-zero queue size even when there were no active or queued splits, and later submissions could be rejected as if the queue were full. This PR keeps queue offer/remove operations consistent by updating `_total_queued_tasks` together with the split queue and token state. ### Release note Fix a bug where the time-sharing scan executor queue size could become inaccurate after queued splits were removed before execution. ### Check List (For Author) - Test - [x] Regression test - [x] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason - Behavior changed: - [ ] No. - [x] Yes. Queued splits removed before execution now decrement executor queued-task accounting, so queue-size metrics and capacity checks reflect the real queued split count. - Does this need documentation? - [x] No. - [ ] Yes. ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label
1 parent 647d5a7 commit ff1012d

3 files changed

Lines changed: 123 additions & 8 deletions

File tree

be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ TimeSharingTaskExecutor::~TimeSharingTaskExecutor() {
276276
}
277277
{
278278
std::unique_lock<std::mutex> l(_lock);
279-
_tokenless->_entries->remove_all(splits_to_destroy);
279+
_remove_queued_splits_unlocked(splits_to_destroy);
280280
}
281281
}
282282

@@ -421,7 +421,7 @@ Status TimeSharingTaskExecutor::_do_submit(std::shared_ptr<PrioritizedSplitRunne
421421
DCHECK(state == SplitThreadPoolToken::State::IDLE ||
422422
state == SplitThreadPoolToken::State::RUNNING);
423423
split->submit_time_watch().start();
424-
_tokenless->_entries->offer(std::move(split));
424+
_offer_split_unlocked(std::move(split));
425425
if (state == SplitThreadPoolToken::State::IDLE) {
426426
_tokenless->transition(SplitThreadPoolToken::State::RUNNING);
427427
}
@@ -433,8 +433,6 @@ Status TimeSharingTaskExecutor::_do_submit(std::shared_ptr<PrioritizedSplitRunne
433433
// 1. If it is a SERIAL token, and there are unsubmitted tasks, submit them to the queue.
434434
// 2. If it is a CONCURRENT token, and there are still unsubmitted tasks, and the upper limit of concurrency is not reached,
435435
// then submitted to the queue.
436-
_total_queued_tasks++;
437-
438436
// Wake up an idle thread for this task. Choosing the thread at the front of
439437
// the list ensures LIFO semantics as idling threads are also added to the front.
440438
//
@@ -570,7 +568,8 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
570568
lock.unlock();
571569
l.lock();
572570
if (_tokenless->state() == SplitThreadPoolToken::State::RUNNING) {
573-
_tokenless->_entries->offer(split);
571+
split->submit_time_watch().reset();
572+
_offer_split_unlocked(split);
574573
}
575574
l.unlock();
576575
} else {
@@ -586,7 +585,8 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
586585
split->reset_level_priority();
587586
std::unique_lock<std::mutex> l(_lock);
588587
if (_tokenless->state() == SplitThreadPoolToken::State::RUNNING) {
589-
_tokenless->_entries->offer(split);
588+
split->submit_time_watch().reset();
589+
_offer_split_unlocked(split);
590590
}
591591
} else {
592592
LOG(WARNING) << "blocked split is failed, split_id: "
@@ -770,7 +770,7 @@ Status TimeSharingTaskExecutor::remove_task(std::shared_ptr<TaskHandle> task_han
770770
}
771771
{
772772
std::unique_lock<std::mutex> l(_lock);
773-
_tokenless->_entries->remove_all(splits_to_destroy);
773+
_remove_queued_splits_unlocked(splits_to_destroy);
774774
}
775775
}
776776

@@ -846,6 +846,36 @@ Status TimeSharingTaskExecutor::re_enqueue_split(std::shared_ptr<TaskHandle> tas
846846
return _do_submit(prioritized_split);
847847
}
848848

849+
void TimeSharingTaskExecutor::_offer_split_unlocked(std::shared_ptr<PrioritizedSplitRunner> split) {
850+
_tokenless->_entries->offer(std::move(split));
851+
++_total_queued_tasks;
852+
}
853+
854+
void TimeSharingTaskExecutor::_remove_queued_splits_unlocked(
855+
const std::vector<std::shared_ptr<PrioritizedSplitRunner>>& splits) {
856+
if (splits.empty()) {
857+
return;
858+
}
859+
860+
const size_t queue_size_before = _tokenless->_entries->size();
861+
_tokenless->_entries->remove_all(splits);
862+
const size_t queue_size_after = _tokenless->_entries->size();
863+
DCHECK_GE(queue_size_before, queue_size_after);
864+
865+
const auto removed = static_cast<int>(queue_size_before - queue_size_after);
866+
DCHECK_GE(_total_queued_tasks, removed);
867+
_total_queued_tasks -= removed;
868+
869+
if (_tokenless->state() == SplitThreadPoolToken::State::RUNNING &&
870+
_tokenless->_active_threads == 0 && _tokenless->_entries->size() == 0) {
871+
_tokenless->transition(SplitThreadPoolToken::State::IDLE);
872+
}
873+
874+
if (_total_queued_tasks == 0 && _active_threads == 0) {
875+
_idle_cond.notify_all();
876+
}
877+
}
878+
849879
void TimeSharingTaskExecutor::_split_finished(std::shared_ptr<PrioritizedSplitRunner> split,
850880
const Status& status) {
851881
_completed_splits_per_level[split->priority().level()]++;

be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,15 @@ class TimeSharingTaskExecutor : public TaskExecutor {
294294
// // Submits a task to be run via token.
295295
Status _do_submit(std::shared_ptr<PrioritizedSplitRunner> split);
296296

297+
// Offer a split to the executor queue and keep _total_queued_tasks consistent.
298+
// REQUIRES: _lock is held.
299+
void _offer_split_unlocked(std::shared_ptr<PrioritizedSplitRunner> split);
300+
301+
// Remove queued splits and keep _total_queued_tasks/token state consistent.
302+
// REQUIRES: _lock is held.
303+
void _remove_queued_splits_unlocked(
304+
const std::vector<std::shared_ptr<PrioritizedSplitRunner>>& splits);
305+
297306
//NOTE: not thread safe, caller should keep it thread-safe by using lock
298307
Status _try_create_thread(int thread_num, std::lock_guard<std::mutex>&);
299308

be/test/exec/executor/time_sharing/time_sharing_task_executor_test.cpp

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,13 +314,37 @@ class ThrowingSplitRunner : public SplitRunner {
314314
Status _status;
315315
};
316316

317+
class QueueOnlySplitRunner : public SplitRunner {
318+
public:
319+
Status init() override { return Status::OK(); }
320+
321+
Result<SharedListenableFuture<Void>> process_for(std::chrono::nanoseconds) override {
322+
_started = true;
323+
_finished = true;
324+
return SharedListenableFuture<Void>::create_ready();
325+
}
326+
327+
void close(const Status& status) override {}
328+
329+
bool is_finished() override { return _finished.load(); }
330+
331+
Status finished_status() override { return Status::OK(); }
332+
333+
std::string get_info() const override { return "queue_only_split"; }
334+
335+
bool is_started() const { return _started.load(); }
336+
337+
private:
338+
std::atomic<bool> _started {false};
339+
std::atomic<bool> _finished {false};
340+
};
341+
317342
class TimeSharingTaskExecutorTest : public testing::Test {
318343
protected:
319344
void SetUp() override {}
320345

321346
void TearDown() override {}
322347

323-
private:
324348
template <typename Container>
325349
void assert_split_states(int end_index, const Container& splits) {
326350
for (int i = 0; i <= end_index; ++i) {
@@ -348,6 +372,58 @@ class TimeSharingTaskExecutorTest : public testing::Test {
348372
}
349373
};
350374

375+
TEST_F(TimeSharingTaskExecutorTest, test_remove_task_clears_queued_task_count) {
376+
auto ticker = std::make_shared<TestingTicker>();
377+
378+
TimeSharingTaskExecutor::ThreadConfig thread_config;
379+
thread_config.thread_name = "leak_repro";
380+
thread_config.workload_group = "normal";
381+
thread_config.max_thread_num = 0;
382+
thread_config.min_thread_num = 0;
383+
thread_config.max_queue_size = 2;
384+
TimeSharingTaskExecutor executor(thread_config, 0, 1, 1, ticker);
385+
ASSERT_TRUE(executor.init().ok());
386+
ASSERT_TRUE(executor.start().ok());
387+
388+
try {
389+
for (int i = 0; i < thread_config.max_queue_size; ++i) {
390+
auto task_handle = TEST_TRY(executor.create_task(
391+
TaskId("removed_task_" + std::to_string(i)), []() { return 0.0; }, 1,
392+
std::chrono::milliseconds(1), std::optional<int>(1)));
393+
auto split = std::make_shared<QueueOnlySplitRunner>();
394+
395+
auto enqueue_result = executor.enqueue_splits(task_handle, false, {split});
396+
ASSERT_TRUE(enqueue_result.has_value()) << enqueue_result.error();
397+
EXPECT_EQ(executor.waiting_splits_size(), 1);
398+
399+
ASSERT_TRUE(executor.remove_task(task_handle).ok());
400+
EXPECT_FALSE(split->is_started());
401+
EXPECT_EQ(executor.waiting_splits_size(), 0);
402+
EXPECT_EQ(executor.get_queue_size(), 0);
403+
}
404+
405+
EXPECT_EQ(executor.num_active_threads(), 0);
406+
EXPECT_EQ(executor.waiting_splits_size(), 0);
407+
EXPECT_EQ(executor.get_queue_size(), 0);
408+
409+
auto task_handle = TEST_TRY(executor.create_task(
410+
TaskId("next_task"), []() { return 0.0; }, 1, std::chrono::milliseconds(1),
411+
std::optional<int>(1)));
412+
auto split = std::make_shared<QueueOnlySplitRunner>();
413+
414+
auto enqueue_result = executor.enqueue_splits(task_handle, false, {split});
415+
ASSERT_TRUE(enqueue_result.has_value()) << enqueue_result.error();
416+
EXPECT_EQ(executor.waiting_splits_size(), 1);
417+
EXPECT_EQ(executor.get_queue_size(), 1);
418+
419+
static_cast<void>(executor.remove_task(task_handle));
420+
} catch (...) {
421+
executor.stop();
422+
throw;
423+
}
424+
executor.stop();
425+
}
426+
351427
TEST_F(TimeSharingTaskExecutorTest, test_tasks_complete) {
352428
auto ticker = std::make_shared<TestingTicker>();
353429

0 commit comments

Comments
 (0)