Skip to content

Commit 114f643

Browse files
committed
refactor(spill): unify window/cte spill to async_buffer path
Migrate WindowPartitionBufferV2 and MaterializedCTE from the BackpressureSpiller -> WriterCreator -> SpillWriter -> AnyFileWriter chain to the SpillsBufferPool -> SpillsDataWriter/SpillsDataReader path used by aggregate and join operators. Key changes: - Rewrite window_partition_buffer_v2 to use SpillsDataWriter directly - Rewrite materialized_cte to use SpillsBufferPool.writer() - Remove BackpressureSpiller, WriterCreator, SpillWriter, SpillReader - Remove row_group_encoder.rs (FileWriter, LocalWriter, AnyFileWriter, RowGroupEncoder, Properties, RangeFetchPlan, FileReader) - Remove V1 WindowPartitionBuffer (async Spiller path) - Drop local DMA support for window/cte spill - Add SpillsDataWriter::flush_row_group() for explicit row group control Reduce from 5-state machine (Collect/Spill/Process/Restore/Finish) to 3-state (Collect/Output/Finish). Since spill and restore are now both synchronous, the indirection through separate Spill/Restore steps with async fallback is unnecessary. - Collect: pull data, buffer, spill inline when memory pressure is high - Output: restore next partition, process (sort), push downstream - Remove next_step(), collect(), output() helper methods - Remove is_collect_finished, restored_data_blocks fields
1 parent db6b04e commit 114f643

15 files changed

Lines changed: 462 additions & 1794 deletions

src/query/service/src/physical_plans/physical_materialized_cte.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ use crate::physical_plans::format::PhysicalFormat;
3232
use crate::pipelines::PipelineBuilder;
3333
use crate::pipelines::memory_settings::MemorySettingsExt;
3434
use crate::pipelines::processors::transforms::MaterializedCteSink;
35-
use crate::pipelines::processors::transforms::create_materialized_cte_spiller;
3635

3736
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
3837
pub struct MaterializedCTE {
@@ -121,15 +120,19 @@ impl IPhysicalPlan for MaterializedCTE {
121120
self.ref_count,
122121
self.channel_size,
123122
);
124-
let spiller =
125-
create_materialized_cte_spiller(builder.ctx.clone(), builder.settings.clone())?;
126123
let memory_settings =
127124
databend_common_pipeline_transforms::MemorySettings::from_materialized_cte_settings(
128125
&builder.ctx,
129126
&builder.settings,
130127
)?;
131128
builder.main_pipeline.add_sink(|input| {
132-
MaterializedCteSink::create(input, tx.clone(), spiller.clone(), memory_settings.clone())
129+
MaterializedCteSink::create(
130+
builder.ctx.clone(),
131+
input,
132+
tx.clone(),
133+
&builder.settings,
134+
memory_settings.clone(),
135+
)
133136
})
134137
}
135138
}

src/query/service/src/physical_plans/physical_recluster.rs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use databend_common_catalog::plan::DataSourceInfo;
2121
use databend_common_catalog::plan::DataSourcePlan;
2222
use databend_common_catalog::plan::ReclusterTask;
2323
use databend_common_catalog::table::Table;
24-
use databend_common_config::GlobalConfig;
2524
use databend_common_exception::ErrorCode;
2625
use databend_common_exception::Result;
2726
use databend_common_expression::DataSchemaRef;
@@ -45,7 +44,6 @@ use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD;
4544
use databend_common_storages_fuse::FuseTable;
4645
use databend_common_storages_fuse::operations::TransformSerializeBlock;
4746
use databend_common_storages_fuse::statistics::ClusterStatsGenerator;
48-
use databend_storages_common_cache::TempDirManager;
4947
use databend_storages_common_table_meta::meta::TableMetaTimestamps;
5048

5149
use crate::physical_plans::physical_plan::IPhysicalPlan;
@@ -58,9 +56,7 @@ use crate::pipelines::processors::transforms::CompactStrategy;
5856
use crate::pipelines::processors::transforms::HilbertPartitionExchange;
5957
use crate::pipelines::processors::transforms::TransformWindowPartitionCollect;
6058
use crate::sessions::TableContextPartitionStats;
61-
use crate::sessions::TableContextQueryIdentity;
6259
use crate::sessions::TableContextSettings;
63-
use crate::spillers::SpillerDiskConfig;
6460

6561
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
6662
pub struct Recluster {
@@ -334,17 +330,6 @@ impl IPhysicalPlan for HilbertPartition {
334330
)?;
335331

336332
let settings = builder.settings.clone();
337-
let temp_dir_manager = TempDirManager::instance();
338-
339-
let disk_bytes_limit = GlobalConfig::instance()
340-
.spill
341-
.window_partition_spill_bytes_limit();
342-
343-
let enable_dio = settings.get_enable_dio()?;
344-
let disk_spill = temp_dir_manager
345-
.get_disk_spill_dir(disk_bytes_limit, &builder.ctx.get_id())
346-
.map(|temp_dir| SpillerDiskConfig::new(temp_dir, enable_dio))
347-
.transpose()?;
348333

349334
let window_spill_settings = MemorySettings::from_window_settings(&builder.ctx)?;
350335
let processor_id = AtomicUsize::new(0);
@@ -366,7 +351,6 @@ impl IPhysicalPlan for HilbertPartition {
366351
num_processors,
367352
self.num_partitions,
368353
window_spill_settings.clone(),
369-
disk_spill.clone(),
370354
CompactStrategy::new(self.rows_per_block, max_bytes_per_block),
371355
)?,
372356
)))

src/query/service/src/physical_plans/physical_window.rs

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use std::sync::atomic;
1717
use std::sync::atomic::AtomicUsize;
1818

1919
use databend_common_catalog::plan::DataSourcePlan;
20-
use databend_common_config::GlobalConfig;
2120
use databend_common_exception::ErrorCode;
2221
use databend_common_exception::Result;
2322
use databend_common_expression::Constant;
@@ -48,7 +47,6 @@ use databend_common_sql::optimizer::ir::SExpr;
4847
use databend_common_sql::plans::WindowFuncFrame;
4948
use databend_common_sql::plans::WindowFuncFrameBound;
5049
use databend_common_sql::plans::WindowFuncType;
51-
use databend_storages_common_cache::TempDirManager;
5250

5351
use super::LagLeadDefault;
5452
use super::LagLeadFunctionDesc;
@@ -79,8 +77,6 @@ use crate::pipelines::processors::transforms::WindowFunctionInfo;
7977
use crate::pipelines::processors::transforms::WindowPartitionExchange;
8078
use crate::pipelines::processors::transforms::WindowPartitionTopNExchange;
8179
use crate::pipelines::processors::transforms::WindowSortDesc;
82-
use crate::sessions::TableContextQueryIdentity;
83-
use crate::spillers::SpillerDiskConfig;
8480

8581
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
8682
pub struct Window {
@@ -473,16 +469,6 @@ fn apply_window_partition(
473469
)?;
474470
}
475471

476-
let temp_dir_manager = TempDirManager::instance();
477-
let disk_bytes_limit = GlobalConfig::instance()
478-
.spill
479-
.window_partition_spill_bytes_limit();
480-
let enable_dio = settings.get_enable_dio()?;
481-
let disk_spill = temp_dir_manager
482-
.get_disk_spill_dir(disk_bytes_limit, &builder.ctx.get_id())
483-
.map(|temp_dir| SpillerDiskConfig::new(temp_dir, enable_dio))
484-
.transpose()?;
485-
486472
let window_spill_settings = MemorySettings::from_window_settings(&builder.ctx)?;
487473
let plan_schema = DataSchemaRefExt::create(input_schema.fields().clone());
488474
let processor_id = AtomicUsize::new(0);
@@ -498,7 +484,6 @@ fn apply_window_partition(
498484
num_processors,
499485
num_partitions,
500486
window_spill_settings.clone(),
501-
disk_spill.clone(),
502487
strategy,
503488
)?,
504489
)))

src/query/service/src/physical_plans/physical_window_partition.rs

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,12 @@ use std::sync::atomic;
1717
use std::sync::atomic::AtomicUsize;
1818

1919
use databend_common_catalog::plan::DataSourcePlan;
20-
use databend_common_config::GlobalConfig;
2120
use databend_common_exception::Result;
2221
use databend_common_expression::SortColumnDescription;
2322
use databend_common_pipeline::core::ProcessorPtr;
2423
use databend_common_pipeline_transforms::MemorySettings;
2524
use databend_common_sql::Symbol;
2625
use databend_common_sql::executor::physical_plans::SortDesc;
27-
use databend_storages_common_cache::TempDirManager;
2826

2927
use crate::physical_plans::explain::PlanStatsInfo;
3028
use crate::physical_plans::format::PhysicalFormat;
@@ -38,8 +36,6 @@ use crate::pipelines::processors::transforms::SortStrategy;
3836
use crate::pipelines::processors::transforms::TransformWindowPartitionCollect;
3937
use crate::pipelines::processors::transforms::WindowPartitionExchange;
4038
use crate::pipelines::processors::transforms::WindowPartitionTopNExchange;
41-
use crate::sessions::TableContextQueryIdentity;
42-
use crate::spillers::SpillerDiskConfig;
4339

4440
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
4541
pub struct WindowPartition {
@@ -145,16 +141,6 @@ impl IPhysicalPlan for WindowPartition {
145141
)?;
146142
}
147143

148-
let temp_dir_manager = TempDirManager::instance();
149-
let disk_bytes_limit = GlobalConfig::instance()
150-
.spill
151-
.window_partition_spill_bytes_limit();
152-
let enable_dio = settings.get_enable_dio()?;
153-
let disk_spill = temp_dir_manager
154-
.get_disk_spill_dir(disk_bytes_limit, &builder.ctx.get_id())
155-
.map(|temp_dir| SpillerDiskConfig::new(temp_dir, enable_dio))
156-
.transpose()?;
157-
158144
let window_spill_settings = MemorySettings::from_window_settings(&builder.ctx)?;
159145

160146
let processor_id = AtomicUsize::new(0);
@@ -171,7 +157,6 @@ impl IPhysicalPlan for WindowPartition {
171157
num_processors,
172158
num_partitions,
173159
window_spill_settings.clone(),
174-
disk_spill.clone(),
175160
strategy,
176161
)?,
177162
)))

0 commit comments

Comments
 (0)