Skip to content

Commit 895102b

Browse files
authored
[Chore](be) Stop spill hash join repartition on cancel (#63456)
Problem Summary: Partitioned hash join spill recovery could continue normal repartition progress after cancellation because some loops stopped on `state->is_cancelled()` but then fell through to completion handling. This could mark partially recovered or repartitioned spill data as complete. This PR returns the cancellation status before advancing partition state, clears recovered build data during close, and replaces a debug-only child EOS assertion with a runtime error.
1 parent 1b5fa61 commit 895102b

2 files changed

Lines changed: 54 additions & 4 deletions

File tree

be/src/exec/operator/partitioned_hash_join_probe_operator.cpp

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
#include "exec/spill/spill_repartitioner.h"
3737
#include "runtime/fragment_mgr.h"
3838
#include "runtime/runtime_profile.h"
39+
#include "runtime/runtime_state.h"
3940

4041
namespace doris {
4142

@@ -215,6 +216,7 @@ Status PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
215216
}
216217
_current_probe_reader.reset();
217218
}
219+
_recovered_build_block.reset();
218220

219221
// Clean up any remaining spill partition queue entries
220222
for (auto& entry : _spill_partition_queue) {
@@ -347,7 +349,7 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_partition(
347349
RETURN_IF_ERROR(_current_build_reader->open());
348350
}
349351
bool eos = false;
350-
while (!eos) {
352+
while (!eos && !state->is_cancelled()) {
351353
Block block;
352354
RETURN_IF_ERROR(_current_build_reader->read(&block, &eos));
353355
COUNTER_UPDATE(_recovery_build_rows, block.rows());
@@ -371,6 +373,7 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_partition(
371373
return Status::OK(); // yield — buffer full, more data may remain
372374
}
373375
}
376+
RETURN_IF_CANCELLED(state);
374377
// Build file fully consumed.
375378
RETURN_IF_ERROR(_current_build_reader->close());
376379
_current_build_reader.reset();
@@ -407,13 +410,15 @@ Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_partition(
407410
return Status::OK(); // yield — enough data read
408411
}
409412
}
413+
RETURN_IF_CANCELLED(state);
410414
// Probe file fully consumed.
411415
RETURN_IF_ERROR(_current_probe_reader->close());
412416
_current_probe_reader.reset();
413417
partition_info.probe_file.reset();
414418
return Status::OK();
415419
}
416420

421+
// NOLINTNEXTLINE(readability-function-size,readability-function-cognitive-complexity): existing spill repartition state machine handles build/probe phases together.
417422
Status PartitionedHashJoinProbeLocalState::repartition_current_partition(
418423
RuntimeState* state, JoinSpillPartitionInfo& partition) {
419424
auto& p = _parent->cast<PartitionedHashJoinProbeOperatorX>();
@@ -472,6 +477,7 @@ Status PartitionedHashJoinProbeLocalState::repartition_current_partition(
472477
}
473478
}
474479
RETURN_IF_ERROR(_repartitioner.finalize());
480+
RETURN_IF_CANCELLED(state);
475481
_recovered_build_block.reset();
476482
_current_build_reader.reset(); // clear any leftover reader state
477483
partition.build_file.reset();
@@ -495,9 +501,9 @@ Status PartitionedHashJoinProbeLocalState::repartition_current_partition(
495501
while (!done && !state->is_cancelled()) {
496502
RETURN_IF_ERROR(_repartitioner.repartition(state, partition.probe_file, &done));
497503
}
498-
partition.probe_file.reset();
499-
500504
RETURN_IF_ERROR(_repartitioner.finalize());
505+
RETURN_IF_CANCELLED(state);
506+
partition.probe_file.reset();
501507
_current_probe_reader.reset();
502508
}
503509

@@ -696,7 +702,11 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, Block
696702
// per-partition build and probe spill streams. After this point every partition
697703
// (including the original "level-0" ones) is accessed uniformly via the queue.
698704
if (!local_state._spill_queue_initialized) {
699-
DCHECK(local_state._child_eos) << "pull() with is_spilled=true called before child EOS";
705+
if (UNLIKELY(!local_state._child_eos)) {
706+
return Status::InternalError(
707+
"query:{}, node:{}, pull() with is_spilled=true called before child EOS",
708+
print_id(state->query_id()), node_id());
709+
}
700710
// There maybe some blocks still in partitioned block or probe blocks. Flush them to disk.
701711
RETURN_IF_ERROR(local_state.spill_probe_blocks(state, true));
702712
// Close all probe writers so that SpillFile metadata (part_count, etc.)
@@ -729,6 +739,7 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, Block
729739
return _pull_from_spill_queue(local_state, state, output_block, eos);
730740
}
731741

742+
// NOLINTNEXTLINE(readability-function-size,readability-function-cognitive-complexity): existing spill queue pull handles setup, recovery, and probing phases.
732743
Status PartitionedHashJoinProbeOperatorX::_pull_from_spill_queue(
733744
PartitionedHashJoinProbeLocalState& local_state, RuntimeState* state, Block* output_block,
734745
bool* eos) const {

be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1139,6 +1139,45 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverBuildBlocksFromDiskEmpty) {
11391139
ASSERT_TRUE(local_state->_recovered_build_block == nullptr);
11401140
}
11411141

1142+
TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverBuildBlocksFromDiskCancelledBeforeEmptyEos) {
1143+
auto [probe_operator, sink_operator] = _helper.create_operators();
1144+
1145+
std::shared_ptr<MockPartitionedHashJoinSharedState> shared_state;
1146+
auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(),
1147+
probe_operator.get(), shared_state);
1148+
1149+
SpillFileSPtr spill_file;
1150+
auto relative_path = fmt::format(
1151+
"{}/hash_build-{}-{}", print_id(_helper.runtime_state->query_id()),
1152+
probe_operator->node_id(), ExecEnv::GetInstance()->spill_file_mgr()->next_id());
1153+
ASSERT_TRUE(ExecEnv::GetInstance()
1154+
->spill_file_mgr()
1155+
->create_spill_file(relative_path, spill_file)
1156+
.ok());
1157+
1158+
{
1159+
SpillFileWriterSPtr writer;
1160+
ASSERT_TRUE(spill_file
1161+
->create_writer(_helper.runtime_state.get(),
1162+
local_state->operator_profile(), writer)
1163+
.ok());
1164+
ASSERT_TRUE(writer->close().ok());
1165+
}
1166+
1167+
_helper.runtime_state->cancel(Status::Cancelled("test cancel"));
1168+
1169+
JoinSpillPartitionInfo partition_info(spill_file, nullptr, 0);
1170+
auto status = local_state->recover_build_blocks_from_partition(_helper.runtime_state.get(),
1171+
partition_info);
1172+
ASSERT_TRUE(status.is<ErrorCode::CANCELLED>()) << status.to_string();
1173+
ASSERT_NE(partition_info.build_file, nullptr);
1174+
ASSERT_TRUE(local_state->_recovered_build_block == nullptr);
1175+
1176+
ASSERT_TRUE(local_state->close(_helper.runtime_state.get()).ok());
1177+
ExecEnv::GetInstance()->spill_file_mgr()->delete_spill_file(partition_info.build_file);
1178+
partition_info.build_file.reset();
1179+
}
1180+
11421181
TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverBuildBlocksFromDiskLargeData) {
11431182
// Similar setup as above...
11441183
auto [probe_operator, sink_operator] = _helper.create_operators();

0 commit comments

Comments
 (0)