Skip to content

Commit 17bbba4

Browse files
authored
[fix](pipeline) avoid data queue sink dependency lost wakeup (#63055)
### What problem does this PR solve? Issue Number: N/A Problem Summary: `DataQueueTest.MultiTest` could intermittently hang after DataQueue moved sink dependency notifications outside the per-sub-queue lock. Root cause: `SubQueue` queue state and `sink_dependency` state were no longer serialized by `queue_lock`, so a producer could observe its sink dependency as blocked even after the queue had already become empty, leaving no future push/pop to wake it. This patch updates `sink_dependency->set_ready()` and `sink_dependency->block()` while holding `queue_lock`, keeping queue occupancy and sink readiness transitions atomic with respect to each other. Related PR: #62947
1 parent 98d55d8 commit 17bbba4

1 file changed

Lines changed: 17 additions & 27 deletions

File tree

be/src/exec/operator/data_queue.cpp

Lines changed: 17 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -29,38 +29,28 @@
2929
namespace doris {
3030

3131
void SubQueue::try_pop(std::unique_ptr<Block>* output_block) {
32-
bool need_notify_sink_ready = false;
33-
{
34-
LockGuard l(queue_lock);
35-
if (!blocks.empty()) {
36-
*output_block = std::move(blocks.front());
37-
blocks.pop_front();
38-
bytes_in_queue -= (*output_block)->allocated_bytes();
39-
blocks_in_queue -= 1;
40-
need_notify_sink_ready = blocks.empty();
32+
LockGuard l(queue_lock);
33+
if (!blocks.empty()) {
34+
*output_block = std::move(blocks.front());
35+
blocks.pop_front();
36+
bytes_in_queue -= (*output_block)->allocated_bytes();
37+
blocks_in_queue -= 1;
38+
if (blocks.empty()) {
39+
sink_dependency->set_ready();
4140
}
4241
}
43-
// Notify outside of queue_lock to avoid nested locks.
44-
if (need_notify_sink_ready) {
45-
sink_dependency->set_ready();
46-
}
4742
}
4843

4944
bool SubQueue::try_push(std::unique_ptr<Block> block, std::atomic_uint32_t& total_counter) {
50-
bool need_block_sink = false;
51-
{
52-
LockGuard l(queue_lock);
53-
if (is_finished) {
54-
return false;
55-
}
56-
total_counter++;
57-
bytes_in_queue += block->allocated_bytes();
58-
blocks.emplace_back(std::move(block));
59-
blocks_in_queue += 1;
60-
need_block_sink = (static_cast<int64_t>(blocks.size()) > max_blocks_in_queue.load());
61-
}
62-
// Notify outside of queue_lock to avoid nested locks.
63-
if (need_block_sink) {
45+
LockGuard l(queue_lock);
46+
if (is_finished) {
47+
return false;
48+
}
49+
total_counter++;
50+
bytes_in_queue += block->allocated_bytes();
51+
blocks.emplace_back(std::move(block));
52+
blocks_in_queue += 1;
53+
if (static_cast<int64_t>(blocks.size()) > max_blocks_in_queue.load()) {
6454
sink_dependency->block();
6555
}
6656
return true;

0 commit comments

Comments
 (0)