Skip to content

Commit 463b407

Browse files
committed
fix(query): rotate parquet spill row groups
1 parent cbe3fb2 commit 463b407

8 files changed

Lines changed: 345 additions & 107 deletions

File tree

src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs

Lines changed: 47 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use std::sync::Arc;
1717
use std::time::Duration;
1818
use std::time::Instant;
1919

20-
use databend_base::uniq_id::GlobalUniq;
2120
use databend_common_base::base::ProgressValues;
2221
use databend_common_exception::ErrorCode;
2322
use databend_common_exception::Result;
@@ -31,7 +30,6 @@ use databend_common_storages_parquet::ReadSettings;
3130
use log::debug;
3231
use log::info;
3332
use parking_lot::Mutex;
34-
use parquet::file::metadata::RowGroupMetaData;
3533

3634
use crate::pipelines::memory_settings::MemorySettingsExt;
3735
use crate::pipelines::processors::transforms::aggregator::NewSpilledPayload;
@@ -41,45 +39,40 @@ use crate::sessions::TableContext;
4139
use crate::sessions::TableContextSettings;
4240
use crate::sessions::TableContextSpillProgress;
4341
use crate::spillers::Layout;
42+
use crate::spillers::RollingSpillsDataWriter;
4443
use crate::spillers::SpillAdapter;
4544
use crate::spillers::SpillTarget;
45+
use crate::spillers::SpilledDataFile;
4646
use crate::spillers::SpillsBufferPool;
47-
use crate::spillers::SpillsDataWriter;
4847

4948
const BYTES_PER_MIB: usize = 1024 * 1024;
5049

5150
struct PayloadWriter {
52-
path: String,
53-
writer: SpillsDataWriter,
51+
writer: RollingSpillsDataWriter,
5452
}
5553

5654
impl PayloadWriter {
5755
fn try_create(prefix: &str, writer_pool_bytes: usize) -> Result<Self> {
5856
let data_operator = DataOperator::instance();
5957
let operator = data_operator.spill_operator();
6058
let buffer_pool = SpillsBufferPool::instance();
61-
let file_path = format!("{}/{}", prefix, GlobalUniq::unique());
62-
let spills_data_writer =
63-
buffer_pool.writer(operator, file_path.clone(), writer_pool_bytes)?;
6459

6560
Ok(PayloadWriter {
66-
path: file_path,
67-
writer: spills_data_writer,
61+
writer: RollingSpillsDataWriter::create(
62+
operator,
63+
prefix.to_string(),
64+
writer_pool_bytes,
65+
buffer_pool,
66+
),
6867
})
6968
}
7069

7170
fn write_block(&mut self, block: DataBlock) -> Result<()> {
72-
if block.is_empty() {
73-
return Ok(());
74-
}
75-
76-
self.writer.write(block)?;
77-
self.writer.flush()
71+
self.writer.write_and_flush(block)
7872
}
7973

80-
fn close(self) -> Result<(String, usize, Vec<RowGroupMetaData>)> {
81-
let (bytes_written, row_groups) = self.writer.close()?;
82-
Ok((self.path, bytes_written, row_groups))
74+
fn close(self) -> Result<Vec<SpilledDataFile>> {
75+
self.writer.close()
8376
}
8477
}
8578

@@ -187,40 +180,46 @@ impl AggregatePayloadWriters {
187180
continue;
188181
};
189182

190-
let (path, written_size, row_groups) = writer.close()?;
191-
192-
if written_size != 0 {
193-
info!(
194-
"Write aggregate spill finished: (bucket: {}, location: {}, bytes: {}, rows: {}, batch_count: {})",
195-
partition_id,
183+
for file in writer.close()? {
184+
let SpilledDataFile {
196185
path,
197-
written_size,
198-
row_groups.iter().map(|rg| rg.num_rows()).sum::<i64>(),
199-
row_groups.len()
200-
);
201-
}
186+
bytes_written,
187+
row_groups,
188+
} = file;
189+
190+
if bytes_written != 0 {
191+
info!(
192+
"Write aggregate spill finished: (bucket: {}, location: {}, bytes: {}, rows: {}, batch_count: {})",
193+
partition_id,
194+
path,
195+
bytes_written,
196+
row_groups.iter().map(|rg| rg.num_rows()).sum::<i64>(),
197+
row_groups.len()
198+
);
199+
}
202200

203-
self.ctx.add_spill_file(
204-
Location::Remote(path.clone()),
205-
Layout::Aggregate,
206-
written_size,
207-
);
201+
self.ctx.add_spill_file(
202+
Location::Remote(path.clone()),
203+
Layout::Aggregate,
204+
bytes_written,
205+
);
208206

209-
if row_groups.is_empty() {
210-
continue;
211-
}
207+
if row_groups.is_empty() {
208+
continue;
209+
}
212210

213-
if written_size > 0 {
214-
self.write_stats.add_bytes(written_size);
215-
}
211+
if bytes_written > 0 {
212+
self.write_stats.add_bytes(bytes_written);
213+
}
216214

217-
for row_group in row_groups {
218-
self.write_stats.add_rows(row_group.num_rows() as usize);
219-
spilled_payloads.push(NewSpilledPayload {
220-
bucket: partition_id as isize,
221-
location: path.clone(),
222-
row_group,
223-
});
215+
for row_group in row_groups {
216+
self.write_stats.add_rows(row_group.num_rows() as usize);
217+
spilled_payloads.push(NewSpilledPayload {
218+
bucket: partition_id as isize,
219+
location: path.clone(),
220+
row_group,
221+
});
222+
}
224223
}
225224
}
226225

src/query/service/src/pipelines/processors/transforms/materialized_cte.rs

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -151,30 +151,61 @@ impl MaterializedCteSink {
151151
&self,
152152
data_blocks: Vec<DataBlock>,
153153
) -> Result<Vec<MaterializedCtePayload>> {
154-
let local_file_size = data_blocks
155-
.iter()
156-
.map(DataBlock::memory_size)
157-
.sum::<usize>()
158-
.max(self.spill_unit_size())
159-
.max(1);
160154
let schema = Arc::new(data_blocks[0].infer_schema());
161155
let mut writer_creator = self.spiller.new_writer_creator(schema)?;
156+
let mut pending_blocks = Vec::new();
157+
let mut pending_size = 0;
158+
let mut payloads = Vec::with_capacity(data_blocks.len());
159+
160+
for data_block in data_blocks {
161+
pending_size += data_block.memory_size();
162+
pending_blocks.push(data_block);
163+
164+
if pending_blocks.len() >= SpillWriter::MAX_ROW_GROUPS_PER_FILE {
165+
self.spill_data_block_chunk(
166+
&mut writer_creator,
167+
std::mem::take(&mut pending_blocks),
168+
pending_size,
169+
&mut payloads,
170+
)?;
171+
pending_size = 0;
172+
}
173+
}
174+
175+
if !pending_blocks.is_empty() {
176+
self.spill_data_block_chunk(
177+
&mut writer_creator,
178+
pending_blocks,
179+
pending_size,
180+
&mut payloads,
181+
)?;
182+
}
183+
184+
Ok(payloads)
185+
}
186+
187+
fn spill_data_block_chunk(
188+
&self,
189+
writer_creator: &mut crate::spillers::WriterCreator,
190+
data_blocks: Vec<DataBlock>,
191+
data_size: usize,
192+
payloads: &mut Vec<MaterializedCtePayload>,
193+
) -> Result<()> {
194+
let local_file_size = data_size.max(self.spill_unit_size()).max(1);
162195
let mut writer = writer_creator.open(Some(local_file_size))?;
163196
let mut row_groups = Vec::with_capacity(data_blocks.len());
164197
for data_block in data_blocks {
165198
row_groups.push(writer.add_row_group(vec![data_block])?);
166199
}
167200
let reader = writer.close()?;
168201

169-
Ok(row_groups
170-
.into_iter()
171-
.map(|row_group| {
172-
MaterializedCtePayload::Spilled(MaterializedCteSpilledPayload {
173-
reader: reader.clone(),
174-
row_group,
175-
})
202+
payloads.extend(row_groups.into_iter().map(|row_group| {
203+
MaterializedCtePayload::Spilled(MaterializedCteSpilledPayload {
204+
reader: reader.clone(),
205+
row_group,
176206
})
177-
.collect())
207+
}));
208+
Ok(())
178209
}
179210

180211
fn consume_block(&mut self, data_block: DataBlock) -> Result<()> {
@@ -184,7 +215,7 @@ impl MaterializedCteSink {
184215
self.spilling_blocks.push(data_block);
185216

186217
if self.spilling_bytes >= self.spill_unit_size()
187-
|| self.spilling_blocks.len() >= SpillWriter::MAX_ORDINAL
218+
|| self.spilling_blocks.len() >= SpillWriter::MAX_ROW_GROUPS_PER_FILE
188219
{
189220
self.flush_spilling_blocks()?;
190221
}

src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs

Lines changed: 52 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use std::collections::btree_map::Entry;
1717
use std::sync::Arc;
1818
use std::sync::PoisonError;
1919

20-
use databend_base::uniq_id::GlobalUniq;
2120
use databend_common_base::base::ProgressValues;
2221
use databend_common_exception::Result;
2322
use databend_common_expression::BlockPartitionStream;
@@ -40,11 +39,12 @@ use crate::pipelines::processors::transforms::new_hash_join::join::JoinStream;
4039
use crate::sessions::QueryContext;
4140
use crate::sessions::TableContextSettings;
4241
use crate::spillers::Layout;
42+
use crate::spillers::RollingSpillsDataWriter;
4343
use crate::spillers::SpillAdapter;
4444
use crate::spillers::SpillTarget;
45+
use crate::spillers::SpilledDataFile;
4546
use crate::spillers::SpillsBufferPool;
4647
use crate::spillers::SpillsDataReader;
47-
use crate::spillers::SpillsDataWriter;
4848

4949
const BYTES_PER_MIB: usize = 1024 * 1024;
5050

@@ -84,8 +84,7 @@ impl<T: GraceMemoryJoin> Join for GraceHashJoin<T> {
8484
};
8585

8686
for (id, data_block) in ready_partitions {
87-
self.partitions[id].writer.write(data_block)?;
88-
self.partitions[id].writer.flush()?;
87+
self.partitions[id].write_block(data_block)?;
8988
}
9089

9190
Ok(())
@@ -104,15 +103,22 @@ impl<T: GraceMemoryJoin> Join for GraceHashJoin<T> {
104103
}
105104

106105
for (id, partition) in ready_partitions.into_iter().enumerate() {
107-
let path = partition.path;
108-
let (written, row_groups) = partition.writer.close()?;
109-
110-
self.state
111-
.ctx
112-
.add_spill_file(Location::Remote(path.clone()), Layout::Parquet, written);
113-
114-
if !row_groups.is_empty() {
115-
partitions_meta.push((id, SpillMetadata { path, row_groups }));
106+
for file in partition.close()? {
107+
let SpilledDataFile {
108+
path,
109+
bytes_written,
110+
row_groups,
111+
} = file;
112+
113+
self.state.ctx.add_spill_file(
114+
Location::Remote(path.clone()),
115+
Layout::Parquet,
116+
bytes_written,
117+
);
118+
119+
if !row_groups.is_empty() {
120+
partitions_meta.push((id, SpillMetadata { path, row_groups }));
121+
}
116122
}
117123
}
118124

@@ -139,8 +145,7 @@ impl<T: GraceMemoryJoin> Join for GraceHashJoin<T> {
139145
let ready_partitions = self.partition_probe_data(data)?;
140146

141147
for (id, data_block) in ready_partitions {
142-
self.partitions[id].writer.write(data_block)?;
143-
self.partitions[id].writer.flush()?;
148+
self.partitions[id].write_block(data_block)?;
144149
}
145150

146151
Ok(Box::new(EmptyJoinStream))
@@ -358,24 +363,30 @@ impl<T: GraceMemoryJoin> GraceHashJoin<T> {
358363

359364
for id in ready_partitions_id {
360365
if let Some(data_block) = self.probe_partition_stream.finalize_partition(id) {
361-
self.partitions[id].writer.write(data_block)?;
362-
self.partitions[id].writer.flush()?;
366+
self.partitions[id].write_block(data_block)?;
363367
}
364368
}
365369

366370
let ready_partitions = std::mem::take(&mut self.partitions);
367371
let mut partitions_meta = Vec::with_capacity(self.partitions.len());
368372

369373
for (id, partition) in ready_partitions.into_iter().enumerate() {
370-
let path = partition.path;
371-
let (written, row_groups) = partition.writer.close()?;
372-
373-
self.state
374-
.ctx
375-
.add_spill_file(Location::Remote(path.clone()), Layout::Parquet, written);
376-
377-
if !row_groups.is_empty() {
378-
partitions_meta.push((id, SpillMetadata { path, row_groups }));
374+
for file in partition.close()? {
375+
let SpilledDataFile {
376+
path,
377+
bytes_written,
378+
row_groups,
379+
} = file;
380+
381+
self.state.ctx.add_spill_file(
382+
Location::Remote(path.clone()),
383+
Layout::Parquet,
384+
bytes_written,
385+
);
386+
387+
if !row_groups.is_empty() {
388+
partitions_meta.push((id, SpillMetadata { path, row_groups }));
389+
}
379390
}
380391
}
381392

@@ -422,8 +433,7 @@ pub enum RestoreStage {
422433
}
423434

424435
pub struct GraceJoinPartition {
425-
path: String,
426-
writer: SpillsDataWriter,
436+
writer: RollingSpillsDataWriter,
427437
}
428438

429439
impl GraceJoinPartition {
@@ -432,15 +442,24 @@ impl GraceJoinPartition {
432442

433443
let operator = data_operator.spill_operator();
434444
let buffer_pool = SpillsBufferPool::instance();
435-
let file_path = format!("{}/{}", prefix, GlobalUniq::unique());
436-
let spills_data_writer =
437-
buffer_pool.writer(operator, file_path.clone(), writer_pool_bytes)?;
438445

439446
Ok(GraceJoinPartition {
440-
path: file_path,
441-
writer: spills_data_writer,
447+
writer: RollingSpillsDataWriter::create(
448+
operator,
449+
prefix.to_string(),
450+
writer_pool_bytes,
451+
buffer_pool,
452+
),
442453
})
443454
}
455+
456+
fn write_block(&mut self, block: DataBlock) -> Result<()> {
457+
self.writer.write_and_flush(block)
458+
}
459+
460+
fn close(self) -> Result<Vec<SpilledDataFile>> {
461+
self.writer.close()
462+
}
444463
}
445464

446465
pub struct RestoreProbeStream<'a, T: GraceMemoryJoin> {

0 commit comments

Comments
 (0)