Skip to content

Commit 04a39ac

Browse files
committed
fix(query): rotate parquet spill row groups
1 parent 0cfbd97 commit 04a39ac

10 files changed

Lines changed: 472 additions & 185 deletions

File tree

src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs

Lines changed: 63 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use std::sync::Arc;
1717
use std::time::Duration;
1818
use std::time::Instant;
1919

20-
use databend_base::uniq_id::GlobalUniq;
2120
use databend_common_base::base::ProgressValues;
2221
use databend_common_exception::ErrorCode;
2322
use databend_common_exception::Result;
@@ -31,7 +30,6 @@ use databend_common_storages_parquet::ReadSettings;
3130
use log::debug;
3231
use log::info;
3332
use parking_lot::Mutex;
34-
use parquet::file::metadata::RowGroupMetaData;
3533

3634
use crate::pipelines::memory_settings::MemorySettingsExt;
3735
use crate::pipelines::processors::transforms::aggregator::NewSpilledPayload;
@@ -43,27 +41,33 @@ use crate::sessions::TableContextSpillProgress;
4341
use crate::spillers::Layout;
4442
use crate::spillers::SpillAdapter;
4543
use crate::spillers::SpillTarget;
44+
use crate::spillers::SpilledDataFile;
4645
use crate::spillers::SpillsBufferPool;
4746
use crate::spillers::SpillsDataWriter;
4847

4948
const BYTES_PER_MIB: usize = 1024 * 1024;
5049

5150
struct PayloadWriter {
52-
path: String,
5351
writer: SpillsDataWriter,
5452
}
5553

5654
impl PayloadWriter {
57-
fn try_create(prefix: &str, writer_pool_bytes: usize) -> Result<Self> {
55+
fn try_create(
56+
prefix: &str,
57+
writer_pool_bytes: usize,
58+
max_row_groups_per_file: usize,
59+
) -> Result<Self> {
5860
let data_operator = DataOperator::instance();
5961
let operator = data_operator.spill_operator();
6062
let buffer_pool = SpillsBufferPool::instance();
61-
let file_path = format!("{}/{}", prefix, GlobalUniq::unique());
62-
let spills_data_writer =
63-
buffer_pool.writer(operator, file_path.clone(), writer_pool_bytes)?;
63+
let spills_data_writer = buffer_pool.writer(
64+
operator,
65+
prefix.to_string(),
66+
writer_pool_bytes,
67+
max_row_groups_per_file,
68+
)?;
6469

6570
Ok(PayloadWriter {
66-
path: file_path,
6771
writer: spills_data_writer,
6872
})
6973
}
@@ -77,9 +81,8 @@ impl PayloadWriter {
7781
self.writer.flush()
7882
}
7983

80-
fn close(self) -> Result<(String, usize, Vec<RowGroupMetaData>)> {
81-
let (bytes_written, row_groups) = self.writer.close()?;
82-
Ok((self.path, bytes_written, row_groups))
84+
fn close(self) -> Result<Vec<SpilledDataFile>> {
85+
self.writer.close()
8386
}
8487
}
8588

@@ -114,6 +117,7 @@ struct AggregatePayloadWriters {
114117
spill_prefix: String,
115118
partition_count: usize,
116119
writer_pool_bytes: usize,
120+
max_row_groups_per_file: usize,
117121
writers: Vec<Option<PayloadWriter>>,
118122
write_stats: WriteStats,
119123
ctx: Arc<QueryContext>,
@@ -124,12 +128,14 @@ impl AggregatePayloadWriters {
124128
prefix: &str,
125129
partition_count: usize,
126130
writer_pool_bytes: usize,
131+
max_row_groups_per_file: usize,
127132
ctx: Arc<QueryContext>,
128133
) -> Self {
129134
AggregatePayloadWriters {
130135
spill_prefix: prefix.to_string(),
131136
partition_count,
132137
writer_pool_bytes,
138+
max_row_groups_per_file,
133139
writers: Self::empty_writers(partition_count),
134140
write_stats: WriteStats::default(),
135141
ctx,
@@ -147,6 +153,7 @@ impl AggregatePayloadWriters {
147153
self.writers[bucket] = Some(PayloadWriter::try_create(
148154
&self.spill_prefix,
149155
self.writer_pool_bytes,
156+
self.max_row_groups_per_file,
150157
)?);
151158
}
152159

@@ -187,40 +194,46 @@ impl AggregatePayloadWriters {
187194
continue;
188195
};
189196

190-
let (path, written_size, row_groups) = writer.close()?;
191-
192-
if written_size != 0 {
193-
info!(
194-
"Write aggregate spill finished: (bucket: {}, location: {}, bytes: {}, rows: {}, batch_count: {})",
195-
partition_id,
197+
for file in writer.close()? {
198+
let SpilledDataFile {
196199
path,
197-
written_size,
198-
row_groups.iter().map(|rg| rg.num_rows()).sum::<i64>(),
199-
row_groups.len()
200-
);
201-
}
200+
bytes_written,
201+
row_groups,
202+
} = file;
203+
204+
if bytes_written != 0 {
205+
info!(
206+
"Write aggregate spill finished: (bucket: {}, location: {}, bytes: {}, rows: {}, batch_count: {})",
207+
partition_id,
208+
path,
209+
bytes_written,
210+
row_groups.iter().map(|rg| rg.num_rows()).sum::<i64>(),
211+
row_groups.len()
212+
);
213+
}
202214

203-
self.ctx.add_spill_file(
204-
Location::Remote(path.clone()),
205-
Layout::Aggregate,
206-
written_size,
207-
);
215+
self.ctx.add_spill_file(
216+
Location::Remote(path.clone()),
217+
Layout::Aggregate,
218+
bytes_written,
219+
);
208220

209-
if row_groups.is_empty() {
210-
continue;
211-
}
221+
if row_groups.is_empty() {
222+
continue;
223+
}
212224

213-
if written_size > 0 {
214-
self.write_stats.add_bytes(written_size);
215-
}
225+
if bytes_written > 0 {
226+
self.write_stats.add_bytes(bytes_written);
227+
}
216228

217-
for row_group in row_groups {
218-
self.write_stats.add_rows(row_group.num_rows() as usize);
219-
spilled_payloads.push(NewSpilledPayload {
220-
bucket: partition_id as isize,
221-
location: path.clone(),
222-
row_group,
223-
});
229+
for row_group in row_groups {
230+
self.write_stats.add_rows(row_group.num_rows() as usize);
231+
spilled_payloads.push(NewSpilledPayload {
232+
bucket: partition_id as isize,
233+
location: path.clone(),
234+
row_group,
235+
});
236+
}
224237
}
225238
}
226239

@@ -353,9 +366,17 @@ impl<P: PartitionStream> NewAggregateSpiller<P> {
353366
.get_settings()
354367
.get_spill_writer_memory_pool_size_mb()?
355368
.saturating_mul(BYTES_PER_MIB);
369+
let max_row_groups_per_file = ctx
370+
.get_settings()
371+
.get_max_parquet_spill_row_groups_per_file()?;
356372

357-
let payload_writers =
358-
AggregatePayloadWriters::create(&spill_prefix, partition_count, writer_pool_bytes, ctx);
373+
let payload_writers = AggregatePayloadWriters::create(
374+
&spill_prefix,
375+
partition_count,
376+
writer_pool_bytes,
377+
max_row_groups_per_file,
378+
ctx,
379+
);
359380

360381
Ok(Self {
361382
memory_settings,

src/query/service/src/pipelines/processors/transforms/materialized_cte.rs

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use std::sync::Arc;
1818

1919
use async_channel::Receiver;
2020
use async_channel::Sender;
21-
use databend_base::uniq_id::GlobalUniq;
2221
use databend_common_base::base::Progress;
2322
use databend_common_base::base::ProgressValues;
2423
use databend_common_base::runtime::profile::Profile;
@@ -47,8 +46,6 @@ use crate::spillers::SpillTarget;
4746
use crate::spillers::SpillsBufferPool;
4847

4948
const MATERIALIZED_CTE_SPILL_UNIT_SIZE: usize = 8 * 1024 * 1024;
50-
/// Maximum number of row groups per file.
51-
const MAX_ROW_GROUPS_PER_FILE: usize = 2 << 15;
5249

5350
#[derive(Clone)]
5451
pub enum MaterializedCtePayload {
@@ -106,6 +103,7 @@ pub struct MaterializedCteSink {
106103
senders: Vec<Sender<MaterializedCtePayload>>,
107104
prefix: String,
108105
writer_pool_bytes: usize,
106+
max_row_groups_per_file: usize,
109107
memory_settings: MemorySettings,
110108
input_data: Option<DataBlock>,
111109
pending_payloads: VecDeque<MaterializedCtePayload>,
@@ -126,12 +124,14 @@ impl MaterializedCteSink {
126124
let writer_pool_bytes = settings
127125
.get_spill_writer_memory_pool_size_mb()?
128126
.saturating_mul(1024 * 1024);
127+
let max_row_groups_per_file = settings.get_max_parquet_spill_row_groups_per_file()?;
129128
Ok(ProcessorPtr::create(Box::new(Self {
130129
ctx,
131130
input,
132131
senders,
133132
prefix,
134133
writer_pool_bytes,
134+
max_row_groups_per_file,
135135
memory_settings,
136136
input_data: None,
137137
pending_payloads: VecDeque::new(),
@@ -154,36 +154,37 @@ impl MaterializedCteSink {
154154
let data_operator = DataOperator::instance();
155155
let operator = data_operator.spill_operator();
156156
let buffer_pool = SpillsBufferPool::instance();
157-
let path = format!("{}/{}", self.prefix, GlobalUniq::unique());
158-
let mut writer = buffer_pool.writer(operator, path.clone(), self.writer_pool_bytes)?;
157+
let mut writer = buffer_pool.writer(
158+
operator,
159+
self.prefix.clone(),
160+
self.writer_pool_bytes,
161+
self.max_row_groups_per_file,
162+
)?;
159163

160164
let schema = Arc::new(data_blocks[0].infer_schema());
161-
let mut row_group_ranges = Vec::with_capacity(data_blocks.len());
162-
let mut next_row_group = 0;
165+
let mut payloads = Vec::with_capacity(data_blocks.len());
163166
for block in data_blocks {
164167
writer.write(block)?;
165-
let row_group_count = writer.flush_row_groups()?;
166-
row_group_ranges.push(next_row_group..row_group_count);
167-
next_row_group = row_group_count;
168+
for row_groups in writer.flush_row_groups()? {
169+
payloads.push(MaterializedCtePayload::Spilled(
170+
MaterializedCteSpilledPayload {
171+
path: row_groups.path,
172+
row_groups: Arc::new(row_groups.row_groups),
173+
schema: schema.clone(),
174+
},
175+
));
176+
}
168177
}
169-
let (bytes_written, row_groups) = writer.close()?;
170-
if bytes_written > 0 {
171-
self.ctx.add_spill_file(
172-
Location::Remote(path.clone()),
173-
Layout::Parquet,
174-
bytes_written,
175-
);
178+
for file in writer.close()? {
179+
if file.bytes_written > 0 {
180+
self.ctx.add_spill_file(
181+
Location::Remote(file.path),
182+
Layout::Parquet,
183+
file.bytes_written,
184+
);
185+
}
176186
}
177-
Ok(row_group_ranges
178-
.into_iter()
179-
.map(|range| {
180-
MaterializedCtePayload::Spilled(MaterializedCteSpilledPayload {
181-
path: path.clone(),
182-
row_groups: Arc::new(row_groups[range].to_vec()),
183-
schema: schema.clone(),
184-
})
185-
})
186-
.collect())
187+
Ok(payloads)
187188
}
188189

189190
fn consume_block(&mut self, data_block: DataBlock) -> Result<()> {
@@ -192,9 +193,7 @@ impl MaterializedCteSink {
192193
self.spilling_bytes += data_block.memory_size();
193194
self.spilling_blocks.push(data_block);
194195

195-
if self.spilling_bytes >= self.spill_unit_size()
196-
|| self.spilling_blocks.len() >= MAX_ROW_GROUPS_PER_FILE
197-
{
196+
if self.spilling_bytes >= self.spill_unit_size() {
198197
self.flush_spilling_blocks()?;
199198
}
200199
} else {

0 commit comments

Comments
 (0)