Skip to content

Commit e3f3661

Browse files
authored
[Chore](pipeline) some minor fix and refactor (#61768)
1. 调整了一些不必要的terminate调用 2. 修复set_thread_id处理的不对的问题,可能导致metric或者profile不对 3. 修复了_wake_by不是原子变量,debug string可能读的不对的问题 4. 把pipeline task的terminate方法改名为更直观的名字,避免与operator的terminate混淆 5. 一些注释更新 This pull request refactors the pipeline task termination logic to improve thread safety and clarify responsibilities between dependency unblocking and operator termination. The main change is replacing the `terminate()` method of `PipelineTask` with a new method, `unblock_all_dependencies()`, which is now responsible solely for unblocking dependencies when a task is woken up early or canceled. Operator termination remains the responsibility of the worker thread within `execute()`, ensuring thread safety. This refactor also updates related method calls and documentation for clarity. **Refactoring of pipeline task termination and dependency management:** * Replaced `PipelineTask::terminate()` with `PipelineTask::unblock_all_dependencies()`, which now unblocks all dependencies without terminating operators; updated all relevant call sites and documentation to reflect this separation of concerns. [[1]](diffhunk://#diff-b1379f2a0e6db93d3aaeec091b10d8a13e9461322f30af2c3f4a54df5bee702fL347-R347) [[2]](diffhunk://#diff-b1379f2a0e6db93d3aaeec091b10d8a13e9461322f30af2c3f4a54df5bee702fL366-R367) [[3]](diffhunk://#diff-b1379f2a0e6db93d3aaeec091b10d8a13e9461322f30af2c3f4a54df5bee702fL481-R496) [[4]](diffhunk://#diff-b1379f2a0e6db93d3aaeec091b10d8a13e9461322f30af2c3f4a54df5bee702fL505-L507) [[5]](diffhunk://#diff-b1379f2a0e6db93d3aaeec091b10d8a13e9461322f30af2c3f4a54df5bee702fL711-R714) [[6]](diffhunk://#diff-b1379f2a0e6db93d3aaeec091b10d8a13e9461322f30af2c3f4a54df5bee702fL762-R768) [[7]](diffhunk://#diff-b1379f2a0e6db93d3aaeec091b10d8a13e9461322f30af2c3f4a54df5bee702fL877-R881) [[8]](diffhunk://#diff-3643eb9cad1657c99d794887ca26a78d9641b1d1f5dde3c86444370b7ffc9548L260-R260) [[9]](diffhunk://#diff-2aac108aa389c146aff5acf00160da6d60f736d9a941768b62cb594d8f4f2b39L128-R128) [[10]](diffhunk://#diff-262afd1bf43b83333335fec0b00b65ab0b0241315fd3ceb98c5b3d568971052fL438-R438) * Updated the comment and signature for `unblock_all_dependencies()` in `pipeline_task.h` to clarify its purpose and thread-safety considerations, emphasizing that operator termination must occur on the worker thread. **Thread safety improvements:** * Changed `_wake_by` in `PipelineTask` from a plain `int` to `std::atomic<int>` to ensure thread-safe updates when waking up tasks early. * Fixed the logic in `set_thread_id()` to only update `_thread_id` and increment the counter when the thread ID actually changes.
1 parent 9443e31 commit e3f3661

5 files changed

Lines changed: 30 additions & 26 deletions

File tree

be/src/exec/pipeline/pipeline.cpp

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -125,11 +125,7 @@ void Pipeline::make_all_runnable(PipelineId wake_by) {
125125
for (auto* task : _tasks) {
126126
if (task) {
127127
task->set_wake_up_early(wake_by);
128-
}
129-
}
130-
for (auto* task : _tasks) {
131-
if (task) {
132-
task->terminate();
128+
task->unblock_all_dependencies();
133129
}
134130
}
135131
}

be/src/exec/pipeline/pipeline_fragment_context.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ void PipelineFragmentContext::cancel(const Status reason) {
257257

258258
for (auto& tasks : _tasks) {
259259
for (auto& task : tasks) {
260-
task.first->terminate();
260+
task.first->unblock_all_dependencies();
261261
}
262262
}
263263
}

be/src/exec/pipeline/pipeline_task.cpp

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ Status PipelineTask::prepare(const std::vector<TScanRangeParams>& scan_range, co
160160
}
161161
if (auto fragment = _fragment_context.lock()) {
162162
if (fragment->get_query_ctx()->is_cancelled()) {
163-
terminate();
163+
unblock_all_dependencies();
164164
return fragment->get_query_ctx()->exec_status();
165165
}
166166
} else {
@@ -344,7 +344,7 @@ bool PipelineTask::_is_blocked() {
344344
});
345345
}
346346

347-
void PipelineTask::terminate() {
347+
void PipelineTask::unblock_all_dependencies() {
348348
// We use a lock to assure all dependencies are not deconstructed here.
349349
std::unique_lock<std::mutex> lc(_dependency_lock);
350350
auto fragment = _fragment_context.lock();
@@ -363,7 +363,8 @@ void PipelineTask::terminate() {
363363
[&](Dependency* dep) { dep->set_ready(); });
364364
_memory_sufficient_dependency->set_ready();
365365
} catch (const doris::Exception& e) {
366-
LOG(WARNING) << "Terminate failed: " << e.code() << ", " << e.to_string();
366+
LOG(WARNING) << "unblock_all_dependencies failed: " << e.code() << ", "
367+
<< e.to_string();
367368
}
368369
}
369370
}
@@ -478,20 +479,21 @@ Status PipelineTask::execute(bool* done) {
478479
} else if (_eos && !_spilling &&
479480
(fragment_context->is_canceled() || !_is_pending_finish())) {
480481
// Debug point for testing the race condition fix: inject set_wake_up_early() +
481-
// terminate() here to simulate Thread B writing A then B between Thread A's two
482-
// reads of _wake_up_early.
482+
// unblock_all_dependencies() here to simulate Thread B writing A then B between
483+
// Thread A's two reads of _wake_up_early.
483484
DBUG_EXECUTE_IF("PipelineTask::execute.wake_up_early_in_else_if", {
484485
set_wake_up_early();
485-
terminate();
486+
unblock_all_dependencies();
486487
});
487488
*done = true;
488489
}
489490

490-
// NOTE: The terminate() call is intentionally placed AFTER the _is_pending_finish() check
491-
// above, not before. This ordering is critical to avoid a race condition:
491+
// NOTE: The operator terminate() call is intentionally placed AFTER the
492+
// _is_pending_finish() check above, not before. This ordering is critical to avoid a race
493+
// condition with the seq_cst memory ordering guarantee:
492494
//
493495
// Pipeline::make_all_runnable() writes in this order:
494-
// (A) set_wake_up_early() -> (B) terminate() [sets finish_dep._always_ready]
496+
// (A) set_wake_up_early() -> (B) unblock_all_dependencies() [sets finish_dep._always_ready]
495497
//
496498
// If we checked _wake_up_early (A) before _is_pending_finish() (B), there would be a
497499
// window where Thread A reads _wake_up_early=false, then Thread B writes both A and B,
@@ -502,9 +504,10 @@ Status PipelineTask::execute(bool* done) {
502504
//
503505
// By reading _is_pending_finish() (B) before the second read of _wake_up_early (A),
504506
// if Thread A observes B's effect (_always_ready=true), it is guaranteed to also observe
505-
// A's effect (_wake_up_early=true) on this second read, ensuring terminate() is called.
507+
// A's effect (_wake_up_early=true) on this second read, ensuring operator terminate() is
508+
// called. This relies on _wake_up_early and _always_ready both being std::atomic with the
509+
// default seq_cst ordering — do not weaken them to relaxed or acq/rel.
506510
if (_wake_up_early) {
507-
terminate();
508511
THROW_IF_ERROR(_root->terminate(_state));
509512
THROW_IF_ERROR(_sink->terminate(_state));
510513
}
@@ -708,7 +711,7 @@ Status PipelineTask::execute(bool* done) {
708711
if (required_pipeline_id == pipeline_id() && required_task_id == task_id() &&
709712
fragment_context->get_fragment_id() == required_fragment_id) {
710713
_wake_up_early = true;
711-
terminate();
714+
unblock_all_dependencies();
712715
} else if (required_pipeline_id == pipeline_id() &&
713716
fragment_context->get_fragment_id() == required_fragment_id) {
714717
LOG(WARNING) << "PipelineTask::execute.terminate sleep 5s";
@@ -759,9 +762,10 @@ Status PipelineTask::do_revoke_memory(const std::shared_ptr<SpillContext>& spill
759762
fragment_context->get_query_ctx()->resource_ctx()->cpu_context()->update_cpu_cost_ms(
760763
delta_cpu_time);
761764

762-
// If task is woke up early, we should terminate all operators, and this task could be closed immediately.
765+
// If task is woke up early, unblock all dependencies and terminate all operators,
766+
// so this task could be closed immediately.
763767
if (_wake_up_early) {
764-
terminate();
768+
unblock_all_dependencies();
765769
THROW_IF_ERROR(_root->terminate(_state));
766770
THROW_IF_ERROR(_sink->terminate(_state));
767771
_eos = true;
@@ -874,7 +878,7 @@ void PipelineTask::stop_if_finished() {
874878
if (auto sink = _sink) {
875879
if (sink->is_finished(_state)) {
876880
set_wake_up_early();
877-
terminate();
881+
unblock_all_dependencies();
878882
}
879883
}
880884
}

be/src/exec/pipeline/pipeline_task.h

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,9 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
7171
}
7272

7373
virtual PipelineTask& set_thread_id(int thread_id) {
74-
_thread_id = thread_id;
7574
if (thread_id != _thread_id) {
7675
COUNTER_UPDATE(_core_change_times, 1);
76+
_thread_id = thread_id;
7777
}
7878
return *this;
7979
}
@@ -130,8 +130,12 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
130130
_wake_by = wake_by;
131131
}
132132

133-
// Execution phase should be terminated. This is called if this task is canceled or waken up early.
134-
void terminate();
133+
// Unblock all dependencies so this task can never be blocked again.
134+
// This is called when the task is woken up early or the fragment is canceled.
135+
//
136+
// NOTE: This does NOT call operator-level terminate() — operator terminate must run
137+
// inside execute() on the worker thread because operator state is not thread-safe.
138+
void unblock_all_dependencies();
135139

136140
// 1 used for update priority queue
137141
// note(wb) an ugly implementation, need refactor later
@@ -317,7 +321,7 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
317321
MonotonicStopWatch _state_change_watcher;
318322
std::atomic<bool> _spilling = false;
319323
const std::string _pipeline_name;
320-
int _wake_by = -1;
324+
std::atomic<int> _wake_by = -1;
321325
};
322326

323327
using PipelineTaskSPtr = std::shared_ptr<PipelineTask>;

be/test/exec/pipeline/pipeline_task_test.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ TEST_F(PipelineTaskTest, TEST_TERMINATE) {
435435
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 5000));
436436
terminated = true;
437437
task->set_wake_up_early();
438-
task->terminate();
438+
task->unblock_all_dependencies();
439439
};
440440

441441
std::thread exec_thread(exec_func);

0 commit comments

Comments
 (0)