Skip to content

Commit e08498e

Browse files
authored
[fix](pipeline) Fix wake up early without terminate call (#63539)
### What problem does this PR solve? Issue Number: None Related PR: #61679 Problem Summary: Backport #61679 to branch-4.0. A pipeline task can race with downstream early wake-up: one thread may observe `_wake_up_early` as false, then another thread sets `_wake_up_early` and unblocks finish dependencies, and the first thread later sees `_is_pending_finish()` as false and finishes without calling operator `terminate()`. For hash join build tasks this can leave runtime filter producers in `WAITING_FOR_SYNCED_SIZE`; during close/build, `insert()` expects `WAITING_FOR_DATA` and reports an invalid runtime filter producer state. This change moves operator termination after the pending-finish check so the second `_wake_up_early` read observes the early wake-up and terminates operators before close. ### Release note None ### Check List (For Author) - Test: Unit Test - `build-support/check-format.sh be/src/pipeline/pipeline_task.cpp be/test/pipeline/pipeline_task_test.cpp` - `ninja -C be/build_Release src/exec/CMakeFiles/Exec.dir/pipeline/pipeline_task.cpp.o` - `ninja -C be/ut_build_ASAN src/exec/CMakeFiles/Exec.dir/pipeline/pipeline_task.cpp.o test/CMakeFiles/doris_be_test.dir/exec/pipeline/pipeline_task_test.cpp.o test/doris_be_test` - `be/ut_build_ASAN/test/doris_be_test --gtest_filter=PipelineTaskTest.TEST_TERMINATE_RACE_FIX --gtest_print_time=true` - Behavior changed: No - Does this need documentation: No
1 parent 6151829 commit e08498e

2 files changed

Lines changed: 108 additions & 3 deletions

File tree

be/src/pipeline/pipeline_task.cpp

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -409,15 +409,41 @@ Status PipelineTask::execute(bool* done) {
409409

410410
// If task is woke up early, we should terminate all operators, and this task could be closed immediately.
411411
if (_wake_up_early) {
412-
terminate();
413-
THROW_IF_ERROR(_root->terminate(_state));
414-
THROW_IF_ERROR(_sink->terminate(_state));
415412
_eos = true;
416413
*done = true;
417414
} else if (_eos && !_spilling &&
418415
(fragment_context->is_canceled() || !_is_pending_finish())) {
416+
// Debug point for testing the race condition fix: inject set_wake_up_early() +
417+
// terminate() here to simulate Thread B writing A then B between Thread A's two
418+
// reads of _wake_up_early.
419+
DBUG_EXECUTE_IF("PipelineTask::execute.wake_up_early_in_else_if", {
420+
set_wake_up_early();
421+
terminate();
422+
});
419423
*done = true;
420424
}
425+
426+
// NOTE: The terminate() call is intentionally placed AFTER the _is_pending_finish() check
427+
// above, not before. This ordering is critical to avoid a race condition:
428+
//
429+
// Pipeline::make_all_runnable() writes in this order:
430+
// (A) set_wake_up_early() -> (B) terminate() [sets finish_dep._always_ready]
431+
//
432+
// If we checked _wake_up_early (A) before _is_pending_finish() (B), there would be a
433+
// window where Thread A reads _wake_up_early=false, then Thread B writes both A and B,
434+
// then Thread A reads _is_pending_finish()=false (due to _always_ready). Thread A would
435+
// then set *done=true without ever calling operator terminate(), causing close() to run
436+
// on operators that were never properly terminated (e.g. RuntimeFilterProducer still in
437+
// WAITING_FOR_SYNCED_SIZE state when insert() is called).
438+
//
439+
// By reading _is_pending_finish() (B) before the second read of _wake_up_early (A),
440+
// if Thread A observes B's effect (_always_ready=true), it is guaranteed to also observe
441+
// A's effect (_wake_up_early=true) on this second read, ensuring terminate() is called.
442+
if (_wake_up_early) {
443+
terminate();
444+
THROW_IF_ERROR(_root->terminate(_state));
445+
THROW_IF_ERROR(_sink->terminate(_state));
446+
}
421447
}};
422448
const auto query_id = _state->query_id();
423449
// If this task is already EOS and block is empty (which means we already output all blocks),

be/test/pipeline/pipeline_task_test.cpp

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <glog/logging.h>
1919
#include <gtest/gtest.h>
2020

21+
#include "common/config.h"
2122
#include "common/status.h"
2223
#include "dummy_task_queue.h"
2324
#include "pipeline/dependency.h"
@@ -30,6 +31,7 @@
3031
#include "testutil/mock/mock_thread_mem_tracker_mgr.h"
3132
#include "testutil/mock/mock_workload_group_mgr.h"
3233
#include "thrift_builder.h"
34+
#include "util/debug_points.h"
3335

3436
namespace doris::pipeline {
3537

@@ -1186,4 +1188,81 @@ TEST_F(PipelineTaskTest, TEST_INJECT_SHARED_STATE) {
11861188
}
11871189
}
11881190

1191+
// Test for the race condition fix between _wake_up_early and _is_pending_finish().
1192+
//
1193+
// The race: Pipeline::make_all_runnable() writes in order (A) set_wake_up_early -> (B) terminate()
1194+
// [sets finish_dep._always_ready]. In execute()'s Defer block, if Thread A reads _wake_up_early=false
1195+
// (A), then Thread B writes A and B, then Thread A reads _is_pending_finish()=false (due to
1196+
// _always_ready from B), Thread A would set *done=true without calling operator terminate().
1197+
//
1198+
// The fix: terminate() is called after _is_pending_finish() in the Defer. So if Thread A sees B's
1199+
// effect (_always_ready=true), it must also see A's effect (_wake_up_early=true) on the subsequent
1200+
// read, ensuring terminate() is always called.
1201+
//
1202+
// This test uses a debug point injected into the else-if branch to simulate the exact bad timing:
1203+
// the debug point fires set_wake_up_early() + terminate() after _is_pending_finish() returns false
1204+
// (due to finish_dep being naturally unblocked) but before the second _wake_up_early check.
1205+
TEST_F(PipelineTaskTest, TEST_TERMINATE_RACE_FIX) {
1206+
auto num_instances = 1;
1207+
auto pip_id = 0;
1208+
auto task_id = 0;
1209+
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
1210+
{
1211+
OperatorPtr source_op;
1212+
source_op.reset(new DummyOperator());
1213+
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
1214+
1215+
int op_id = 1;
1216+
int node_id = 2;
1217+
int dest_id = 3;
1218+
DataSinkOperatorPtr sink_op;
1219+
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
1220+
EXPECT_TRUE(pip->set_sink(sink_op).ok());
1221+
}
1222+
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
1223+
std::map<int,
1224+
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
1225+
shared_state_map;
1226+
_runtime_state->resize_op_id_to_local_state(-1);
1227+
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
1228+
profile.get(), shared_state_map, task_id);
1229+
task->_exec_time_slice = 10'000'000'000ULL;
1230+
{
1231+
std::vector<TScanRangeParams> scan_range;
1232+
int sender_id = 0;
1233+
TDataSink tsink;
1234+
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
1235+
}
1236+
_query_ctx->get_execution_dependency()->set_ready();
1237+
1238+
auto* sink_finish_dep =
1239+
_runtime_state->get_sink_local_state()->cast<DummySinkLocalState>().finishdependency();
1240+
EXPECT_NE(sink_finish_dep, nullptr);
1241+
sink_finish_dep->block();
1242+
1243+
task->_operators.front()->cast<DummyOperator>()._eos = true;
1244+
{
1245+
bool done = false;
1246+
EXPECT_TRUE(task->execute(&done).ok());
1247+
EXPECT_TRUE(task->_eos);
1248+
EXPECT_FALSE(done);
1249+
EXPECT_FALSE(task->_wake_up_early);
1250+
}
1251+
1252+
sink_finish_dep->set_ready();
1253+
config::enable_debug_points = true;
1254+
DebugPoints::instance()->add("PipelineTask::execute.wake_up_early_in_else_if");
1255+
{
1256+
bool done = false;
1257+
EXPECT_TRUE(task->execute(&done).ok());
1258+
EXPECT_TRUE(task->_eos);
1259+
EXPECT_TRUE(done);
1260+
EXPECT_TRUE(task->_wake_up_early);
1261+
EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._terminated);
1262+
EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._terminated);
1263+
}
1264+
DebugPoints::instance()->clear();
1265+
config::enable_debug_points = false;
1266+
}
1267+
11891268
} // namespace doris::pipeline

0 commit comments

Comments
 (0)