Skip to content

Commit b0ce07c

Browse files
committed
refactor(window): simplify TransformWindowPartitionCollect event loop
Reduce from 5-state machine (Collect/Spill/Process/Restore/Finish) to 3-state (Collect/Output/Finish). Since spill and restore are now both synchronous, the indirection through separate Spill/Restore steps with async fallback is unnecessary. - Collect: pull data, buffer, spill inline when memory pressure is high - Output: restore next partition, process (sort), push downstream - Remove next_step(), collect(), output() helper methods - Remove is_collect_finished, restored_data_blocks fields
1 parent d2c75b5 commit b0ce07c

4 files changed

Lines changed: 68 additions & 148 deletions

File tree

src/query/service/src/pipelines/processors/transforms/materialized_cte.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,8 @@ impl MaterializedCteSpilledPayload {
6969
let operator = data_operator.spill_operator();
7070
let buffer_pool = SpillsBufferPool::instance();
7171
let settings = ReadSettings::default();
72-
let mut reader = buffer_pool.reader(
73-
operator,
74-
self.path,
75-
self.schema,
76-
selected,
77-
target,
78-
settings,
79-
)?;
72+
let mut reader =
73+
buffer_pool.reader(operator, self.path, self.schema, selected, target, settings)?;
8074
match reader.read()? {
8175
Some(block) => Ok(block),
8276
None => Err(ErrorCode::Internal(

src/query/service/src/pipelines/processors/transforms/window/partition/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
mod data_processor_strategy;
1616
mod hilbert_partition_exchange;
1717
mod transform_window_partition_collect;
18-
mod window_partition_buffer_v2;
18+
mod window_partition_buffer;
1919
mod window_partition_exchange;
2020
mod window_partition_meta;
2121
mod window_partition_partial_top_n_exchange;

src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs

Lines changed: 61 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -31,36 +31,28 @@ use databend_common_pipeline_transforms::MemorySettings;
3131
use databend_common_settings::Settings;
3232

3333
use super::WindowPartitionMeta;
34-
use super::window_partition_buffer_v2::WindowPartitionBufferV2;
34+
use super::window_partition_buffer::WindowPartitionBuffer;
3535
use crate::pipelines::processors::transforms::DataProcessorStrategy;
3636
use crate::sessions::QueryContext;
3737

38+
/// Two-phase state machine:
39+
/// - Collect: pull input data into buffer, spill when memory pressure is high.
40+
/// - Output: restore partitions one by one, process (sort), and push downstream.
3841
#[derive(Debug, Clone, Copy)]
3942
enum Step {
4043
Collect,
41-
Process,
42-
Spill,
43-
Restore,
44-
Finish,
44+
Output,
4545
}
4646

4747
pub struct TransformWindowPartitionCollect<S: DataProcessorStrategy> {
4848
input: Arc<InputPort>,
4949
output: Arc<OutputPort>,
50-
51-
restored_data_blocks: Vec<DataBlock>,
5250
output_data_blocks: VecDeque<DataBlock>,
5351

54-
// The partition id is used to map the partition id to the new partition id.
5552
index_map: Vec<usize>,
56-
// The buffer is used to control the memory usage of the window operator.
57-
buffer: WindowPartitionBufferV2,
58-
53+
buffer: WindowPartitionBuffer,
5954
strategy: S,
60-
61-
// Event variables.
6255
step: Step,
63-
is_collect_finished: bool,
6456
}
6557

6658
impl<S: DataProcessorStrategy> TransformWindowPartitionCollect<S> {
@@ -75,12 +67,10 @@ impl<S: DataProcessorStrategy> TransformWindowPartitionCollect<S> {
7567
memory_settings: MemorySettings,
7668
strategy: S,
7769
) -> Result<Self> {
78-
// Calculate the partition ids collected by the processor.
7970
let partitions: Vec<usize> = (0..num_partitions)
8071
.filter(|&partition| partition % num_processors == processor_id)
8172
.collect();
8273

83-
// Map each partition id to new partition index.
8474
let mut index_map = vec![0; num_partitions];
8575
for (index, partition) in partitions.iter().enumerate() {
8676
index_map[*partition] = index;
@@ -91,9 +81,8 @@ impl<S: DataProcessorStrategy> TransformWindowPartitionCollect<S> {
9181
.get_spill_writer_memory_pool_size_mb()?
9282
.saturating_mul(1024 * 1024);
9383

94-
// Create the window partition buffer.
9584
let sort_block_size = settings.get_window_partition_sort_block_size()? as usize;
96-
let buffer = WindowPartitionBufferV2::new(
85+
let buffer = WindowPartitionBuffer::new(
9786
prefix,
9887
writer_pool_bytes,
9988
partitions.len(),
@@ -104,83 +93,22 @@ impl<S: DataProcessorStrategy> TransformWindowPartitionCollect<S> {
10493
Ok(Self {
10594
input,
10695
output,
96+
output_data_blocks: VecDeque::new(),
10797
index_map,
10898
buffer,
10999
strategy,
110-
is_collect_finished: false,
111-
output_data_blocks: VecDeque::new(),
112-
restored_data_blocks: Vec::new(),
113100
step: Step::Collect,
114101
})
115102
}
116103

117-
fn next_step(&mut self, step: Step) -> Result<Event> {
118-
let event = match step {
119-
Step::Finish => {
120-
self.input.finish();
121-
self.output.finish();
122-
Event::Finished
104+
fn collect_data_block(&mut self, data_block: DataBlock) {
105+
if let Some(meta) = data_block
106+
.get_owned_meta()
107+
.and_then(WindowPartitionMeta::downcast_from)
108+
{
109+
for (id, block) in meta.partitioned_data {
110+
self.buffer.add_data_block(self.index_map[id], block);
123111
}
124-
_ => Event::Sync,
125-
};
126-
self.step = step;
127-
Ok(event)
128-
}
129-
130-
fn collect(&mut self) -> Result<Event> {
131-
if self.output.is_finished() {
132-
self.input.finish();
133-
return self.next_step(Step::Finish);
134-
}
135-
136-
// First check. flush memory data to external storage if need
137-
if self.need_spill() {
138-
return self.next_step(Step::Spill);
139-
}
140-
141-
if self.input.has_data() {
142-
Self::collect_data_block(
143-
self.input.pull_data().unwrap()?,
144-
&self.index_map,
145-
&mut self.buffer,
146-
);
147-
}
148-
149-
// Check again. flush memory data to external storage if need
150-
if self.need_spill() {
151-
return self.next_step(Step::Spill);
152-
}
153-
154-
if self.input.is_finished() {
155-
self.is_collect_finished = true;
156-
return self.next_step(Step::Restore);
157-
}
158-
159-
self.input.set_need_data();
160-
Ok(Event::NeedData)
161-
}
162-
163-
fn output(&mut self) -> Result<Event> {
164-
if self.output.is_finished() {
165-
return self.next_step(Step::Finish);
166-
}
167-
168-
if !self.output.can_push() {
169-
return Ok(Event::NeedConsume);
170-
}
171-
172-
if self.need_spill() {
173-
return self.next_step(Step::Spill);
174-
}
175-
176-
if let Some(data_block) = self.output_data_blocks.pop_front() {
177-
self.output.push_data(Ok(data_block));
178-
return Ok(Event::NeedConsume);
179-
}
180-
181-
match self.buffer.is_empty() {
182-
true => self.next_step(Step::Finish),
183-
false => self.next_step(Step::Restore),
184112
}
185113
}
186114
}
@@ -195,66 +123,66 @@ impl<S: DataProcessorStrategy> Processor for TransformWindowPartitionCollect<S>
195123
}
196124

197125
fn event(&mut self) -> Result<Event> {
198-
// (collect <--> spill) -> (process <--> restore) -> finish
126+
if self.output.is_finished() {
127+
self.input.finish();
128+
return Ok(Event::Finished);
129+
}
130+
199131
match self.step {
200-
Step::Collect => self.collect(),
201-
Step::Spill => {
202-
if self.is_collect_finished {
203-
self.step = Step::Process;
204-
self.output()
205-
} else {
206-
// collect data again.
207-
self.step = Step::Collect;
208-
self.collect()
132+
Step::Collect => {
133+
if self.input.has_data() {
134+
return Ok(Event::Sync);
209135
}
136+
137+
if self.input.is_finished() {
138+
self.step = Step::Output;
139+
return Ok(Event::Sync);
140+
}
141+
142+
self.input.set_need_data();
143+
Ok(Event::NeedData)
210144
}
211-
Step::Process => self.output(),
212-
Step::Restore => {
213-
if self.restored_data_blocks.is_empty() {
214-
self.next_step(Step::Finish)
215-
} else {
216-
self.next_step(Step::Process)
145+
Step::Output => {
146+
if !self.output.can_push() {
147+
return Ok(Event::NeedConsume);
148+
}
149+
150+
if let Some(block) = self.output_data_blocks.pop_front() {
151+
self.output.push_data(Ok(block));
152+
return Ok(Event::NeedConsume);
217153
}
154+
155+
if self.buffer.is_empty() {
156+
self.output.finish();
157+
return Ok(Event::Finished);
158+
}
159+
160+
Ok(Event::Sync)
218161
}
219-
Step::Finish => Ok(Event::Finished),
220162
}
221163
}
222164

223165
fn process(&mut self) -> Result<()> {
224166
match self.step {
225-
Step::Process => {
226-
let restored_data_blocks = std::mem::take(&mut self.restored_data_blocks);
227-
let processed_blocks = self.strategy.process_data_blocks(restored_data_blocks)?;
228-
self.output_data_blocks.extend(processed_blocks);
167+
Step::Collect => {
168+
let data_block = self.input.pull_data().unwrap()?;
169+
self.collect_data_block(data_block);
170+
171+
// Spill if memory pressure is high.
172+
while self.buffer.need_spill() {
173+
self.buffer.spill()?;
174+
}
229175
Ok(())
230176
}
231-
Step::Spill => self.buffer.spill(),
232-
Step::Restore => {
233-
self.restored_data_blocks = self.buffer.restore()?;
177+
Step::Output => {
178+
// Restore next non-empty partition and process it.
179+
let restored = self.buffer.restore()?;
180+
if !restored.is_empty() {
181+
let processed = self.strategy.process_data_blocks(restored)?;
182+
self.output_data_blocks.extend(processed);
183+
}
234184
Ok(())
235185
}
236-
_ => unreachable!(),
237186
}
238187
}
239188
}
240-
241-
impl<S: DataProcessorStrategy> TransformWindowPartitionCollect<S> {
242-
fn collect_data_block(
243-
data_block: DataBlock,
244-
index_map: &[usize],
245-
buffer: &mut WindowPartitionBufferV2,
246-
) {
247-
if let Some(meta) = data_block
248-
.get_owned_meta()
249-
.and_then(WindowPartitionMeta::downcast_from)
250-
{
251-
for (id, data_block) in meta.partitioned_data {
252-
buffer.add_data_block(index_map[id], data_block);
253-
}
254-
}
255-
}
256-
257-
fn need_spill(&mut self) -> bool {
258-
self.buffer.need_spill()
259-
}
260-
}

src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer_v2.rs renamed to src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use databend_common_storage::DataOperator;
2323
use databend_common_storages_parquet::ReadSettings;
2424
use parquet::file::metadata::RowGroupMetaData;
2525

26-
2726
fn concat_data_blocks(data_blocks: Vec<DataBlock>, target_size: usize) -> Result<Vec<DataBlock>> {
2827
let mut num_rows = 0;
2928
let mut result = Vec::new();
@@ -107,7 +106,10 @@ struct PartitionFileReader {
107106

108107
impl PartitionFileReader {
109108
fn restore(&self, indices: Vec<usize>) -> Result<Vec<DataBlock>> {
110-
let selected: Vec<_> = indices.iter().map(|&i| self.row_groups[i].clone()).collect();
109+
let selected: Vec<_> = indices
110+
.iter()
111+
.map(|&i| self.row_groups[i].clone())
112+
.collect();
111113
if selected.is_empty() {
112114
return Ok(Vec::new());
113115
}
@@ -233,10 +235,6 @@ impl PartitionSlot {
233235
}
234236
}
235237

236-
// --- Public PartitionBuffer (WindowPartitionBufferV2) ---
237-
238-
pub(super) type WindowPartitionBufferV2 = WindowPartitionBuffer;
239-
240238
pub(super) struct WindowPartitionBuffer {
241239
prefix: String,
242240
writer_pool_bytes: usize,

0 commit comments

Comments
 (0)