Skip to content

Commit f50d39f

Browse files
committed
fix(query): rotate parquet spill row groups
1 parent 0cfbd97 commit f50d39f

7 files changed

Lines changed: 371 additions & 175 deletions

File tree

src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs

Lines changed: 39 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use std::sync::Arc;
1717
use std::time::Duration;
1818
use std::time::Instant;
1919

20-
use databend_base::uniq_id::GlobalUniq;
2120
use databend_common_base::base::ProgressValues;
2221
use databend_common_exception::ErrorCode;
2322
use databend_common_exception::Result;
@@ -31,7 +30,6 @@ use databend_common_storages_parquet::ReadSettings;
3130
use log::debug;
3231
use log::info;
3332
use parking_lot::Mutex;
34-
use parquet::file::metadata::RowGroupMetaData;
3533

3634
use crate::pipelines::memory_settings::MemorySettingsExt;
3735
use crate::pipelines::processors::transforms::aggregator::NewSpilledPayload;
@@ -43,13 +41,13 @@ use crate::sessions::TableContextSpillProgress;
4341
use crate::spillers::Layout;
4442
use crate::spillers::SpillAdapter;
4543
use crate::spillers::SpillTarget;
44+
use crate::spillers::SpilledDataFile;
4645
use crate::spillers::SpillsBufferPool;
4746
use crate::spillers::SpillsDataWriter;
4847

4948
const BYTES_PER_MIB: usize = 1024 * 1024;
5049

5150
struct PayloadWriter {
52-
path: String,
5351
writer: SpillsDataWriter,
5452
}
5553

@@ -58,12 +56,10 @@ impl PayloadWriter {
5856
let data_operator = DataOperator::instance();
5957
let operator = data_operator.spill_operator();
6058
let buffer_pool = SpillsBufferPool::instance();
61-
let file_path = format!("{}/{}", prefix, GlobalUniq::unique());
6259
let spills_data_writer =
63-
buffer_pool.writer(operator, file_path.clone(), writer_pool_bytes)?;
60+
buffer_pool.writer(operator, prefix.to_string(), writer_pool_bytes)?;
6461

6562
Ok(PayloadWriter {
66-
path: file_path,
6763
writer: spills_data_writer,
6864
})
6965
}
@@ -77,9 +73,8 @@ impl PayloadWriter {
7773
self.writer.flush()
7874
}
7975

80-
fn close(self) -> Result<(String, usize, Vec<RowGroupMetaData>)> {
81-
let (bytes_written, row_groups) = self.writer.close()?;
82-
Ok((self.path, bytes_written, row_groups))
76+
fn close(self) -> Result<Vec<SpilledDataFile>> {
77+
self.writer.close()
8378
}
8479
}
8580

@@ -187,40 +182,46 @@ impl AggregatePayloadWriters {
187182
continue;
188183
};
189184

190-
let (path, written_size, row_groups) = writer.close()?;
191-
192-
if written_size != 0 {
193-
info!(
194-
"Write aggregate spill finished: (bucket: {}, location: {}, bytes: {}, rows: {}, batch_count: {})",
195-
partition_id,
185+
for file in writer.close()? {
186+
let SpilledDataFile {
196187
path,
197-
written_size,
198-
row_groups.iter().map(|rg| rg.num_rows()).sum::<i64>(),
199-
row_groups.len()
200-
);
201-
}
188+
bytes_written,
189+
row_groups,
190+
} = file;
191+
192+
if bytes_written != 0 {
193+
info!(
194+
"Write aggregate spill finished: (bucket: {}, location: {}, bytes: {}, rows: {}, batch_count: {})",
195+
partition_id,
196+
path,
197+
bytes_written,
198+
row_groups.iter().map(|rg| rg.num_rows()).sum::<i64>(),
199+
row_groups.len()
200+
);
201+
}
202202

203-
self.ctx.add_spill_file(
204-
Location::Remote(path.clone()),
205-
Layout::Aggregate,
206-
written_size,
207-
);
203+
self.ctx.add_spill_file(
204+
Location::Remote(path.clone()),
205+
Layout::Aggregate,
206+
bytes_written,
207+
);
208208

209-
if row_groups.is_empty() {
210-
continue;
211-
}
209+
if row_groups.is_empty() {
210+
continue;
211+
}
212212

213-
if written_size > 0 {
214-
self.write_stats.add_bytes(written_size);
215-
}
213+
if bytes_written > 0 {
214+
self.write_stats.add_bytes(bytes_written);
215+
}
216216

217-
for row_group in row_groups {
218-
self.write_stats.add_rows(row_group.num_rows() as usize);
219-
spilled_payloads.push(NewSpilledPayload {
220-
bucket: partition_id as isize,
221-
location: path.clone(),
222-
row_group,
223-
});
217+
for row_group in row_groups {
218+
self.write_stats.add_rows(row_group.num_rows() as usize);
219+
spilled_payloads.push(NewSpilledPayload {
220+
bucket: partition_id as isize,
221+
location: path.clone(),
222+
row_group,
223+
});
224+
}
224225
}
225226
}
226227

src/query/service/src/pipelines/processors/transforms/materialized_cte.rs

Lines changed: 22 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use std::sync::Arc;
1818

1919
use async_channel::Receiver;
2020
use async_channel::Sender;
21-
use databend_base::uniq_id::GlobalUniq;
2221
use databend_common_base::base::Progress;
2322
use databend_common_base::base::ProgressValues;
2423
use databend_common_base::runtime::profile::Profile;
@@ -47,8 +46,6 @@ use crate::spillers::SpillTarget;
4746
use crate::spillers::SpillsBufferPool;
4847

4948
const MATERIALIZED_CTE_SPILL_UNIT_SIZE: usize = 8 * 1024 * 1024;
50-
/// Maximum number of row groups per file.
51-
const MAX_ROW_GROUPS_PER_FILE: usize = 2 << 15;
5249

5350
#[derive(Clone)]
5451
pub enum MaterializedCtePayload {
@@ -154,36 +151,33 @@ impl MaterializedCteSink {
154151
let data_operator = DataOperator::instance();
155152
let operator = data_operator.spill_operator();
156153
let buffer_pool = SpillsBufferPool::instance();
157-
let path = format!("{}/{}", self.prefix, GlobalUniq::unique());
158-
let mut writer = buffer_pool.writer(operator, path.clone(), self.writer_pool_bytes)?;
154+
let mut writer =
155+
buffer_pool.writer(operator, self.prefix.clone(), self.writer_pool_bytes)?;
159156

160157
let schema = Arc::new(data_blocks[0].infer_schema());
161-
let mut row_group_ranges = Vec::with_capacity(data_blocks.len());
162-
let mut next_row_group = 0;
158+
let mut payloads = Vec::with_capacity(data_blocks.len());
163159
for block in data_blocks {
164160
writer.write(block)?;
165-
let row_group_count = writer.flush_row_groups()?;
166-
row_group_ranges.push(next_row_group..row_group_count);
167-
next_row_group = row_group_count;
161+
for row_groups in writer.flush_row_groups()? {
162+
payloads.push(MaterializedCtePayload::Spilled(
163+
MaterializedCteSpilledPayload {
164+
path: row_groups.path,
165+
row_groups: Arc::new(row_groups.row_groups),
166+
schema: schema.clone(),
167+
},
168+
));
169+
}
168170
}
169-
let (bytes_written, row_groups) = writer.close()?;
170-
if bytes_written > 0 {
171-
self.ctx.add_spill_file(
172-
Location::Remote(path.clone()),
173-
Layout::Parquet,
174-
bytes_written,
175-
);
171+
for file in writer.close()? {
172+
if file.bytes_written > 0 {
173+
self.ctx.add_spill_file(
174+
Location::Remote(file.path),
175+
Layout::Parquet,
176+
file.bytes_written,
177+
);
178+
}
176179
}
177-
Ok(row_group_ranges
178-
.into_iter()
179-
.map(|range| {
180-
MaterializedCtePayload::Spilled(MaterializedCteSpilledPayload {
181-
path: path.clone(),
182-
row_groups: Arc::new(row_groups[range].to_vec()),
183-
schema: schema.clone(),
184-
})
185-
})
186-
.collect())
180+
Ok(payloads)
187181
}
188182

189183
fn consume_block(&mut self, data_block: DataBlock) -> Result<()> {
@@ -192,9 +186,7 @@ impl MaterializedCteSink {
192186
self.spilling_bytes += data_block.memory_size();
193187
self.spilling_blocks.push(data_block);
194188

195-
if self.spilling_bytes >= self.spill_unit_size()
196-
|| self.spilling_blocks.len() >= MAX_ROW_GROUPS_PER_FILE
197-
{
189+
if self.spilling_bytes >= self.spill_unit_size() {
198190
self.flush_spilling_blocks()?;
199191
}
200192
} else {

src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs

Lines changed: 46 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use std::collections::btree_map::Entry;
1717
use std::sync::Arc;
1818
use std::sync::PoisonError;
1919

20-
use databend_base::uniq_id::GlobalUniq;
2120
use databend_common_base::base::ProgressValues;
2221
use databend_common_exception::Result;
2322
use databend_common_expression::BlockPartitionStream;
@@ -42,6 +41,7 @@ use crate::sessions::TableContextSettings;
4241
use crate::spillers::Layout;
4342
use crate::spillers::SpillAdapter;
4443
use crate::spillers::SpillTarget;
44+
use crate::spillers::SpilledDataFile;
4545
use crate::spillers::SpillsBufferPool;
4646
use crate::spillers::SpillsDataReader;
4747
use crate::spillers::SpillsDataWriter;
@@ -84,8 +84,7 @@ impl<T: GraceMemoryJoin> Join for GraceHashJoin<T> {
8484
};
8585

8686
for (id, data_block) in ready_partitions {
87-
self.partitions[id].writer.write(data_block)?;
88-
self.partitions[id].writer.flush()?;
87+
self.partitions[id].write_block(data_block)?;
8988
}
9089

9190
Ok(())
@@ -104,15 +103,22 @@ impl<T: GraceMemoryJoin> Join for GraceHashJoin<T> {
104103
}
105104

106105
for (id, partition) in ready_partitions.into_iter().enumerate() {
107-
let path = partition.path;
108-
let (written, row_groups) = partition.writer.close()?;
109-
110-
self.state
111-
.ctx
112-
.add_spill_file(Location::Remote(path.clone()), Layout::Parquet, written);
113-
114-
if !row_groups.is_empty() {
115-
partitions_meta.push((id, SpillMetadata { path, row_groups }));
106+
for file in partition.close()? {
107+
let SpilledDataFile {
108+
path,
109+
bytes_written,
110+
row_groups,
111+
} = file;
112+
113+
self.state.ctx.add_spill_file(
114+
Location::Remote(path.clone()),
115+
Layout::Parquet,
116+
bytes_written,
117+
);
118+
119+
if !row_groups.is_empty() {
120+
partitions_meta.push((id, SpillMetadata { path, row_groups }));
121+
}
116122
}
117123
}
118124

@@ -139,8 +145,7 @@ impl<T: GraceMemoryJoin> Join for GraceHashJoin<T> {
139145
let ready_partitions = self.partition_probe_data(data)?;
140146

141147
for (id, data_block) in ready_partitions {
142-
self.partitions[id].writer.write(data_block)?;
143-
self.partitions[id].writer.flush()?;
148+
self.partitions[id].write_block(data_block)?;
144149
}
145150

146151
Ok(Box::new(EmptyJoinStream))
@@ -358,24 +363,30 @@ impl<T: GraceMemoryJoin> GraceHashJoin<T> {
358363

359364
for id in ready_partitions_id {
360365
if let Some(data_block) = self.probe_partition_stream.finalize_partition(id) {
361-
self.partitions[id].writer.write(data_block)?;
362-
self.partitions[id].writer.flush()?;
366+
self.partitions[id].write_block(data_block)?;
363367
}
364368
}
365369

366370
let ready_partitions = std::mem::take(&mut self.partitions);
367371
let mut partitions_meta = Vec::with_capacity(self.partitions.len());
368372

369373
for (id, partition) in ready_partitions.into_iter().enumerate() {
370-
let path = partition.path;
371-
let (written, row_groups) = partition.writer.close()?;
372-
373-
self.state
374-
.ctx
375-
.add_spill_file(Location::Remote(path.clone()), Layout::Parquet, written);
376-
377-
if !row_groups.is_empty() {
378-
partitions_meta.push((id, SpillMetadata { path, row_groups }));
374+
for file in partition.close()? {
375+
let SpilledDataFile {
376+
path,
377+
bytes_written,
378+
row_groups,
379+
} = file;
380+
381+
self.state.ctx.add_spill_file(
382+
Location::Remote(path.clone()),
383+
Layout::Parquet,
384+
bytes_written,
385+
);
386+
387+
if !row_groups.is_empty() {
388+
partitions_meta.push((id, SpillMetadata { path, row_groups }));
389+
}
379390
}
380391
}
381392

@@ -422,7 +433,6 @@ pub enum RestoreStage {
422433
}
423434

424435
pub struct GraceJoinPartition {
425-
path: String,
426436
writer: SpillsDataWriter,
427437
}
428438

@@ -432,15 +442,22 @@ impl GraceJoinPartition {
432442

433443
let operator = data_operator.spill_operator();
434444
let buffer_pool = SpillsBufferPool::instance();
435-
let file_path = format!("{}/{}", prefix, GlobalUniq::unique());
436445
let spills_data_writer =
437-
buffer_pool.writer(operator, file_path.clone(), writer_pool_bytes)?;
446+
buffer_pool.writer(operator, prefix.to_string(), writer_pool_bytes)?;
438447

439448
Ok(GraceJoinPartition {
440-
path: file_path,
441449
writer: spills_data_writer,
442450
})
443451
}
452+
453+
fn write_block(&mut self, block: DataBlock) -> Result<()> {
454+
self.writer.write(block)?;
455+
self.writer.flush()
456+
}
457+
458+
fn close(self) -> Result<Vec<SpilledDataFile>> {
459+
self.writer.close()
460+
}
444461
}
445462

446463
pub struct RestoreProbeStream<'a, T: GraceMemoryJoin> {

0 commit comments

Comments
 (0)