Skip to content

Commit 11a038d

Browse files
authored
[fix](be) Stop extra operator work after cancellation (#64077)
Problem Summary: Some BE operator paths could keep doing unnecessary work after query cancellation had already been observed by RuntimeState. This covers three focused cases: - Spill revoke and recovery paths could still enter spill or repartition work after cancellation. - Nested loop join build close could still process cross runtime filters after cancellation, scanning build blocks and evaluating filter expressions. - Analytic sink could advance multiple buffered output blocks in one sink call without checking cancellation between blocks. This PR adds cancellation checks at the retained public entry points or block boundaries. The analytic sink change is intentionally conservative: it stops before advancing to another buffered output block, while preserving current-block processing semantics.
1 parent 2867e9c commit 11a038d

19 files changed

Lines changed: 460 additions & 9 deletions

be/src/exec/operator/analytic_sink_operator.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -340,8 +340,9 @@ bool AnalyticSinkLocalState::_get_next_for_range_between(int64_t current_block_r
340340
return false;
341341
}
342342

343-
Status AnalyticSinkLocalState::_execute_impl() {
343+
Status AnalyticSinkLocalState::_execute_impl(RuntimeState* state) {
344344
while (_output_block_index < _input_blocks.size()) {
345+
RETURN_IF_CANCELLED(state);
345346
{
346347
_get_partition_by_end();
347348
// streaming_mode means no need get all parition data, could calculate data when it's arrived
@@ -753,7 +754,7 @@ Status AnalyticSinkOperatorX::sink_impl(doris::RuntimeState* state, Block* input
753754
local_state._reserve_mem_size = 0;
754755
SCOPED_PEAK_MEM(&local_state._reserve_mem_size);
755756
RETURN_IF_ERROR(_add_input_block(state, input_block));
756-
RETURN_IF_ERROR(local_state._execute_impl());
757+
RETURN_IF_ERROR(local_state._execute_impl(state));
757758
if (local_state._input_eos) {
758759
LockGuard lc(local_state._shared_state->sink_eos_lock);
759760
local_state._shared_state->sink_eos = true;

be/src/exec/operator/analytic_sink_operator.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ class AnalyticSinkLocalState : public PipelineXSinkLocalState<AnalyticSharedStat
7373

7474
private:
7575
friend class AnalyticSinkOperatorX;
76-
Status _execute_impl();
76+
Status _execute_impl(RuntimeState* state);
7777
// over(partition by k1 order by k2 range|rows unbounded preceding and unbounded following)
7878
bool _get_next_for_partition(int64_t current_block_rows, int64_t current_block_base_pos);
7979
// over(partition by k1 order by k2 range between unbounded preceding and current row)

be/src/exec/operator/nested_loop_join_build_operator.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,10 @@ Status NestedLoopJoinBuildSinkLocalState::open(RuntimeState* state) {
5353
}
5454

5555
Status NestedLoopJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_status) {
56-
RETURN_IF_ERROR(_runtime_filter_producer_helper->process(state, _shared_state->build_blocks));
56+
if (!state->is_cancelled()) {
57+
RETURN_IF_ERROR(
58+
_runtime_filter_producer_helper->process(state, _shared_state->build_blocks));
59+
}
5760
_runtime_filter_producer_helper->collect_realtime_profile(custom_profile());
5861
RETURN_IF_ERROR(JoinBuildSinkLocalState::close(state, exec_status));
5962
return Status::OK();

be/src/exec/operator/partitioned_aggregation_sink_operator.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,7 @@ Status PartitionedAggSinkLocalState::_spill_hash_table(RuntimeState* state,
356356
HashTableCtxType& context,
357357
HashTableType& hash_table,
358358
const size_t size_to_revoke, bool eos) {
359+
RETURN_IF_CANCELLED(state);
359360
Status status;
360361

361362
context.init_iterator();
@@ -427,6 +428,7 @@ Status PartitionedAggSinkLocalState::_spill_hash_table(RuntimeState* state,
427428
}
428429

429430
Status PartitionedAggSinkLocalState::_revoke_memory(RuntimeState* state) {
431+
RETURN_IF_CANCELLED(state);
430432
if (_eos) {
431433
return Status::OK();
432434
}

be/src/exec/operator/partitioned_aggregation_source_operator.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,8 +355,9 @@ void PartitionedAggLocalState::_init_partition_queue() {
355355

356356
Status PartitionedAggLocalState::_recover_blocks_from_partition(RuntimeState* state,
357357
AggSpillPartitionInfo& partition) {
358+
RETURN_IF_CANCELLED(state);
358359
size_t accumulated_bytes = 0;
359-
if (!partition.spill_file || state->is_cancelled()) {
360+
if (!partition.spill_file) {
360361
return Status::OK();
361362
}
362363

@@ -446,6 +447,7 @@ Status PartitionedAggLocalState::_flush_hash_table_to_sub_spill_files(RuntimeSta
446447
}
447448

448449
Status PartitionedAggLocalState::_flush_and_repartition(RuntimeState* state) {
450+
RETURN_IF_CANCELLED(state);
449451
auto& p = _parent->cast<PartitionedAggSourceOperatorX>();
450452
const int new_level = _current_partition.level + 1;
451453

be/src/exec/operator/partitioned_hash_join_probe_operator.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,7 @@ bool PartitionedHashJoinProbeLocalState::is_blockable() const {
333333

334334
Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_partition(
335335
RuntimeState* state, JoinSpillPartitionInfo& partition_info) {
336+
RETURN_IF_CANCELLED(state);
336337
if (!partition_info.build_file) {
337338
// Build file is already exhausted for this partition.
338339
return Status::OK();
@@ -383,6 +384,7 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_partition(
383384

384385
Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_partition(
385386
RuntimeState* state, JoinSpillPartitionInfo& partition_info) {
387+
RETURN_IF_CANCELLED(state);
386388
if (!partition_info.probe_file) {
387389
// Probe file is already exhausted for this partition.
388390
return Status::OK();
@@ -995,6 +997,7 @@ Status PartitionedHashJoinProbeLocalState::revoke_build_data(RuntimeState* state
995997
// repartitioned and pushed back to the queue so the hash table build can
996998
// proceed later with a smaller footprint.
997999
Status PartitionedHashJoinProbeOperatorX::revoke_memory(RuntimeState* state) {
1000+
RETURN_IF_CANCELLED(state);
9981001
auto& local_state = get_local_state(state);
9991002
VLOG_DEBUG << fmt::format("Query:{}, hash join probe:{}, task:{}, revoke_memory, child_eos:{}",
10001003
print_id(state->query_id()), node_id(), state->task_id(),

be/src/exec/operator/partitioned_hash_join_sink_operator.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ Status PartitionedHashJoinSinkLocalState::_finish_spilling(RuntimeState* state)
299299
/// because we use limit 1MB here. So we need to force spill all memory to disk to make sure we can make progress.
300300
Status PartitionedHashJoinSinkLocalState::_execute_spill_partitioned_blocks(RuntimeState* state,
301301
bool force_spill) {
302+
RETURN_IF_CANCELLED(state);
302303
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::revoke_memory_cancel", {
303304
auto status = Status::InternalError(
304305
"fault_inject partitioned_hash_join_sink revoke_memory canceled");

be/src/exec/operator/spill_iceberg_table_sink_operator.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ size_t SpillIcebergTableSinkLocalState::get_revocable_mem_size(RuntimeState* sta
7979
}
8080

8181
Status SpillIcebergTableSinkLocalState::revoke_memory(RuntimeState* state) {
82+
RETURN_IF_CANCELLED(state);
8283
if (!_writer) {
8384
return Status::OK();
8485
}

be/src/exec/operator/spill_sort_sink_operator.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ size_t SpillSortSinkLocalState::get_reserve_mem_size(RuntimeState* state, bool e
187187
}
188188

189189
Status SpillSortSinkLocalState::_execute_spill_sort(RuntimeState* state) {
190+
RETURN_IF_CANCELLED(state);
190191
auto& parent = Base::_parent->template cast<Parent>();
191192
state->get_query_ctx()->resource_ctx()->task_controller()->increase_revoking_tasks_count();
192193
Defer defer {[&]() {
@@ -219,6 +220,7 @@ Status SpillSortSinkLocalState::_execute_spill_sort(RuntimeState* state) {
219220
}
220221

221222
Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
223+
RETURN_IF_CANCELLED(state);
222224
auto& parent = Base::_parent->template cast<Parent>();
223225
if (!_shared_state->is_spilled) {
224226
_shared_state->is_spilled = true;
@@ -241,4 +243,4 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
241243
_shared_state->sorted_spill_groups.emplace_back(_spilling_file);
242244
return _execute_spill_sort(state);
243245
}
244-
} // namespace doris
246+
} // namespace doris

be/src/exec/operator/spill_sort_source_operator.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ int SpillSortLocalState::_calc_spill_blocks_to_merge(RuntimeState* state) const
8282
}
8383

8484
Status SpillSortLocalState::execute_merge_sort_spill_files(RuntimeState* state) {
85+
RETURN_IF_CANCELLED(state);
8586
auto& parent = Base::_parent->template cast<Parent>();
8687
SCOPED_TIMER(_spill_merge_sort_timer);
8788
Status status;
@@ -262,4 +263,4 @@ Status SpillSortSourceOperatorX::get_block_impl(RuntimeState* state, Block* bloc
262263
return Status::OK();
263264
}
264265

265-
} // namespace doris
266+
} // namespace doris

0 commit comments

Comments
 (0)