Skip to content

Commit d07c366

Browse files
committed
chore: add back window spill statistics
1 parent b0ce07c commit d07c366

3 files changed

Lines changed: 58 additions & 20 deletions

File tree

src/query/service/src/pipelines/processors/transforms/materialized_cte.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ use databend_common_storages_parquet::ReadSettings;
4040
use parquet::file::metadata::RowGroupMetaData;
4141

4242
use crate::sessions::QueryContext;
43+
use crate::spillers::Layout;
44+
use crate::spillers::Location;
45+
use crate::spillers::SpillAdapter;
4346
use crate::spillers::SpillTarget;
4447
use crate::spillers::SpillsBufferPool;
4548

@@ -81,6 +84,7 @@ impl MaterializedCteSpilledPayload {
8184
}
8285

8386
pub struct MaterializedCteSink {
87+
ctx: Arc<QueryContext>,
8488
input: Arc<InputPort>,
8589
senders: Vec<Sender<MaterializedCtePayload>>,
8690
prefix: String,
@@ -106,6 +110,7 @@ impl MaterializedCteSink {
106110
.get_spill_writer_memory_pool_size_mb()?
107111
.saturating_mul(1024 * 1024);
108112
Ok(ProcessorPtr::create(Box::new(Self {
113+
ctx,
109114
input,
110115
senders,
111116
prefix,
@@ -141,7 +146,14 @@ impl MaterializedCteSink {
141146
writer.write(block)?;
142147
ordinals.push(writer.flush_row_group()?);
143148
}
144-
let (_bytes_written, row_groups) = writer.close()?;
149+
let (bytes_written, row_groups) = writer.close()?;
150+
if bytes_written > 0 {
151+
self.ctx.add_spill_file(
152+
Location::Remote(path.clone()),
153+
Layout::Parquet,
154+
bytes_written,
155+
);
156+
}
145157
let row_groups = Arc::new(row_groups);
146158

147159
Ok(ordinals

src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ impl<S: DataProcessorStrategy> TransformWindowPartitionCollect<S> {
8383

8484
let sort_block_size = settings.get_window_partition_sort_block_size()? as usize;
8585
let buffer = WindowPartitionBuffer::new(
86+
ctx.clone(),
8687
prefix,
8788
writer_pool_bytes,
8889
partitions.len(),

src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,21 @@ use databend_common_exception::Result;
1919
use databend_common_expression::DataBlock;
2020
use databend_common_expression::DataSchemaRef;
2121
use databend_common_pipeline_transforms::MemorySettings;
22+
use databend_common_pipeline_transforms::traits::Location;
2223
use databend_common_storage::DataOperator;
2324
use databend_common_storages_parquet::ReadSettings;
2425
use parquet::file::metadata::RowGroupMetaData;
2526

27+
use crate::sessions::QueryContext;
28+
use crate::spillers::Layout;
29+
use crate::spillers::SpillAdapter;
30+
use crate::spillers::SpillTarget;
31+
use crate::spillers::SpillsBufferPool;
32+
use crate::spillers::SpillsDataWriter;
33+
34+
/// Maximum number of row groups per file before rotating to a new file.
35+
const MAX_ROW_GROUPS_PER_FILE: usize = 2 << 15;
36+
2637
fn concat_data_blocks(data_blocks: Vec<DataBlock>, target_size: usize) -> Result<Vec<DataBlock>> {
2738
let mut num_rows = 0;
2839
let mut result = Vec::new();
@@ -44,14 +55,6 @@ fn concat_data_blocks(data_blocks: Vec<DataBlock>, target_size: usize) -> Result
4455

4556
Ok(result)
4657
}
47-
use crate::spillers::SpillTarget;
48-
use crate::spillers::SpillsBufferPool;
49-
use crate::spillers::SpillsDataWriter;
50-
51-
/// Maximum number of row groups per file before rotating to a new file.
52-
const MAX_ROW_GROUPS_PER_FILE: usize = 2 << 15;
53-
54-
// --- Writer / Reader for a single partition file ---
5558

5659
struct PartitionFileWriter {
5760
path: String,
@@ -83,17 +86,32 @@ impl PartitionFileWriter {
8386
self.writer.flush_row_group()
8487
}
8588

86-
fn close(self) -> Result<PartitionFileReader> {
87-
let (bytes_written, row_groups) = self.writer.close()?;
89+
fn close(self, ctx: &Arc<QueryContext>) -> Result<PartitionFileReader> {
90+
let Self {
91+
path,
92+
writer,
93+
schema,
94+
} = self;
95+
let (bytes_written, row_groups) = writer.close()?;
8896
log::debug!(
89-
path = self.path,
97+
path = path,
9098
bytes_written;
9199
"partition file closed"
92100
);
101+
102+
if bytes_written > 0 {
103+
SpillAdapter::add_spill_file(
104+
ctx,
105+
Location::Remote(path.clone()),
106+
Layout::Parquet,
107+
bytes_written,
108+
);
109+
}
110+
93111
Ok(PartitionFileReader {
94-
path: self.path,
112+
path,
95113
row_groups,
96-
schema: self.schema.unwrap_or_else(|| Arc::new(Default::default())),
114+
schema: schema.unwrap_or_else(|| Arc::new(Default::default())),
97115
})
98116
}
99117
}
@@ -183,6 +201,7 @@ impl PartitionSlot {
183201
#[fastrace::trace(name = "PartitionSlot::spill_blocks")]
184202
fn spill_blocks(
185203
&mut self,
204+
ctx: &Arc<QueryContext>,
186205
prefix: &str,
187206
writer_pool_bytes: usize,
188207
blocks: Vec<DataBlock>,
@@ -213,7 +232,7 @@ impl PartitionSlot {
213232
row_groups.push(ordinal);
214233

215234
if ordinal >= MAX_ROW_GROUPS_PER_FILE {
216-
let reader = writer.close()?;
235+
let reader = writer.close(ctx)?;
217236
self.readers.push((reader, row_groups));
218237
} else {
219238
self.state = PartitionSpillState::Writing { writer, row_groups };
@@ -224,18 +243,22 @@ impl PartitionSlot {
224243
}
225244
}
226245

227-
fn take_readers(&mut self) -> Result<Vec<(PartitionFileReader, Vec<usize>)>> {
246+
fn take_readers(
247+
&mut self,
248+
ctx: &Arc<QueryContext>,
249+
) -> Result<Vec<(PartitionFileReader, Vec<usize>)>> {
228250
if let PartitionSpillState::Writing { writer, row_groups } =
229251
std::mem::replace(&mut self.state, PartitionSpillState::Reading)
230252
{
231-
let reader = writer.close()?;
253+
let reader = writer.close(ctx)?;
232254
self.readers.push((reader, row_groups));
233255
}
234256
Ok(std::mem::take(&mut self.readers))
235257
}
236258
}
237259

238260
pub(super) struct WindowPartitionBuffer {
261+
ctx: Arc<QueryContext>,
239262
prefix: String,
240263
writer_pool_bytes: usize,
241264
partitions: Vec<PartitionSlot>,
@@ -248,6 +271,7 @@ pub(super) struct WindowPartitionBuffer {
248271

249272
impl WindowPartitionBuffer {
250273
pub fn new(
274+
ctx: Arc<QueryContext>,
251275
prefix: String,
252276
writer_pool_bytes: usize,
253277
num_partitions: usize,
@@ -256,6 +280,7 @@ impl WindowPartitionBuffer {
256280
) -> Result<Self> {
257281
let partitions = (0..num_partitions).map(PartitionSlot::new).collect();
258282
Ok(Self {
283+
ctx,
259284
prefix,
260285
writer_pool_bytes,
261286
partitions,
@@ -296,7 +321,7 @@ impl WindowPartitionBuffer {
296321
continue;
297322
}
298323
if let Some(blocks) = partition.take_blocks(Some(spill_unit_size)) {
299-
partition.spill_blocks(&self.prefix, self.writer_pool_bytes, blocks)?;
324+
partition.spill_blocks(&self.ctx, &self.prefix, self.writer_pool_bytes, blocks)?;
300325
return Ok(());
301326
}
302327

@@ -314,7 +339,7 @@ impl WindowPartitionBuffer {
314339
&& size >= self.min_row_group_size
315340
{
316341
let blocks = partition.take_blocks(None).unwrap();
317-
partition.spill_blocks(&self.prefix, self.writer_pool_bytes, blocks)?;
342+
partition.spill_blocks(&self.ctx, &self.prefix, self.writer_pool_bytes, blocks)?;
318343
} else {
319344
self.can_spill = false;
320345
}
@@ -327,7 +352,7 @@ impl WindowPartitionBuffer {
327352
self.next_to_restore += 1;
328353

329354
let mut result = Vec::new();
330-
for (reader, row_groups) in partition.take_readers()? {
355+
for (reader, row_groups) in partition.take_readers(&self.ctx)? {
331356
debug_assert!(!row_groups.is_empty());
332357
let blocks = reader.restore(row_groups)?;
333358
result.extend(blocks);

0 commit comments

Comments
 (0)