Skip to content

Commit d2c75b5

Browse files
committed
refactor(spill): unify window/cte spill to async_buffer path
Migrate WindowPartitionBufferV2 and MaterializedCTE from the BackpressureSpiller -> WriterCreator -> SpillWriter -> AnyFileWriter chain to the SpillsBufferPool -> SpillsDataWriter/SpillsDataReader path used by aggregate and join operators. Key changes: - Rewrite window_partition_buffer_v2 to use SpillsDataWriter directly - Rewrite materialized_cte to use SpillsBufferPool.writer() - Remove BackpressureSpiller, WriterCreator, SpillWriter, SpillReader - Remove row_group_encoder.rs (FileWriter, LocalWriter, AnyFileWriter, RowGroupEncoder, Properties, RangeFetchPlan, FileReader) - Remove V1 WindowPartitionBuffer (async Spiller path) - Drop local DMA support for window/cte spill - Add SpillsDataWriter::flush_row_group() for explicit row group control
1 parent db6b04e commit d2c75b5

15 files changed

Lines changed: 241 additions & 1538 deletions

src/query/service/src/physical_plans/physical_materialized_cte.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ use crate::physical_plans::format::PhysicalFormat;
3232
use crate::pipelines::PipelineBuilder;
3333
use crate::pipelines::memory_settings::MemorySettingsExt;
3434
use crate::pipelines::processors::transforms::MaterializedCteSink;
35-
use crate::pipelines::processors::transforms::create_materialized_cte_spiller;
3635

3736
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
3837
pub struct MaterializedCTE {
@@ -121,15 +120,19 @@ impl IPhysicalPlan for MaterializedCTE {
121120
self.ref_count,
122121
self.channel_size,
123122
);
124-
let spiller =
125-
create_materialized_cte_spiller(builder.ctx.clone(), builder.settings.clone())?;
126123
let memory_settings =
127124
databend_common_pipeline_transforms::MemorySettings::from_materialized_cte_settings(
128125
&builder.ctx,
129126
&builder.settings,
130127
)?;
131128
builder.main_pipeline.add_sink(|input| {
132-
MaterializedCteSink::create(input, tx.clone(), spiller.clone(), memory_settings.clone())
129+
MaterializedCteSink::create(
130+
builder.ctx.clone(),
131+
input,
132+
tx.clone(),
133+
&builder.settings,
134+
memory_settings.clone(),
135+
)
133136
})
134137
}
135138
}

src/query/service/src/physical_plans/physical_recluster.rs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use databend_common_catalog::plan::DataSourceInfo;
2121
use databend_common_catalog::plan::DataSourcePlan;
2222
use databend_common_catalog::plan::ReclusterTask;
2323
use databend_common_catalog::table::Table;
24-
use databend_common_config::GlobalConfig;
2524
use databend_common_exception::ErrorCode;
2625
use databend_common_exception::Result;
2726
use databend_common_expression::DataSchemaRef;
@@ -45,7 +44,6 @@ use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD;
4544
use databend_common_storages_fuse::FuseTable;
4645
use databend_common_storages_fuse::operations::TransformSerializeBlock;
4746
use databend_common_storages_fuse::statistics::ClusterStatsGenerator;
48-
use databend_storages_common_cache::TempDirManager;
4947
use databend_storages_common_table_meta::meta::TableMetaTimestamps;
5048

5149
use crate::physical_plans::physical_plan::IPhysicalPlan;
@@ -58,9 +56,7 @@ use crate::pipelines::processors::transforms::CompactStrategy;
5856
use crate::pipelines::processors::transforms::HilbertPartitionExchange;
5957
use crate::pipelines::processors::transforms::TransformWindowPartitionCollect;
6058
use crate::sessions::TableContextPartitionStats;
61-
use crate::sessions::TableContextQueryIdentity;
6259
use crate::sessions::TableContextSettings;
63-
use crate::spillers::SpillerDiskConfig;
6460

6561
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
6662
pub struct Recluster {
@@ -334,17 +330,6 @@ impl IPhysicalPlan for HilbertPartition {
334330
)?;
335331

336332
let settings = builder.settings.clone();
337-
let temp_dir_manager = TempDirManager::instance();
338-
339-
let disk_bytes_limit = GlobalConfig::instance()
340-
.spill
341-
.window_partition_spill_bytes_limit();
342-
343-
let enable_dio = settings.get_enable_dio()?;
344-
let disk_spill = temp_dir_manager
345-
.get_disk_spill_dir(disk_bytes_limit, &builder.ctx.get_id())
346-
.map(|temp_dir| SpillerDiskConfig::new(temp_dir, enable_dio))
347-
.transpose()?;
348333

349334
let window_spill_settings = MemorySettings::from_window_settings(&builder.ctx)?;
350335
let processor_id = AtomicUsize::new(0);
@@ -366,7 +351,6 @@ impl IPhysicalPlan for HilbertPartition {
366351
num_processors,
367352
self.num_partitions,
368353
window_spill_settings.clone(),
369-
disk_spill.clone(),
370354
CompactStrategy::new(self.rows_per_block, max_bytes_per_block),
371355
)?,
372356
)))

src/query/service/src/physical_plans/physical_window.rs

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use std::sync::atomic;
1717
use std::sync::atomic::AtomicUsize;
1818

1919
use databend_common_catalog::plan::DataSourcePlan;
20-
use databend_common_config::GlobalConfig;
2120
use databend_common_exception::ErrorCode;
2221
use databend_common_exception::Result;
2322
use databend_common_expression::Constant;
@@ -48,7 +47,6 @@ use databend_common_sql::optimizer::ir::SExpr;
4847
use databend_common_sql::plans::WindowFuncFrame;
4948
use databend_common_sql::plans::WindowFuncFrameBound;
5049
use databend_common_sql::plans::WindowFuncType;
51-
use databend_storages_common_cache::TempDirManager;
5250

5351
use super::LagLeadDefault;
5452
use super::LagLeadFunctionDesc;
@@ -79,8 +77,6 @@ use crate::pipelines::processors::transforms::WindowFunctionInfo;
7977
use crate::pipelines::processors::transforms::WindowPartitionExchange;
8078
use crate::pipelines::processors::transforms::WindowPartitionTopNExchange;
8179
use crate::pipelines::processors::transforms::WindowSortDesc;
82-
use crate::sessions::TableContextQueryIdentity;
83-
use crate::spillers::SpillerDiskConfig;
8480

8581
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
8682
pub struct Window {
@@ -473,16 +469,6 @@ fn apply_window_partition(
473469
)?;
474470
}
475471

476-
let temp_dir_manager = TempDirManager::instance();
477-
let disk_bytes_limit = GlobalConfig::instance()
478-
.spill
479-
.window_partition_spill_bytes_limit();
480-
let enable_dio = settings.get_enable_dio()?;
481-
let disk_spill = temp_dir_manager
482-
.get_disk_spill_dir(disk_bytes_limit, &builder.ctx.get_id())
483-
.map(|temp_dir| SpillerDiskConfig::new(temp_dir, enable_dio))
484-
.transpose()?;
485-
486472
let window_spill_settings = MemorySettings::from_window_settings(&builder.ctx)?;
487473
let plan_schema = DataSchemaRefExt::create(input_schema.fields().clone());
488474
let processor_id = AtomicUsize::new(0);
@@ -498,7 +484,6 @@ fn apply_window_partition(
498484
num_processors,
499485
num_partitions,
500486
window_spill_settings.clone(),
501-
disk_spill.clone(),
502487
strategy,
503488
)?,
504489
)))

src/query/service/src/physical_plans/physical_window_partition.rs

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,12 @@ use std::sync::atomic;
1717
use std::sync::atomic::AtomicUsize;
1818

1919
use databend_common_catalog::plan::DataSourcePlan;
20-
use databend_common_config::GlobalConfig;
2120
use databend_common_exception::Result;
2221
use databend_common_expression::SortColumnDescription;
2322
use databend_common_pipeline::core::ProcessorPtr;
2423
use databend_common_pipeline_transforms::MemorySettings;
2524
use databend_common_sql::Symbol;
2625
use databend_common_sql::executor::physical_plans::SortDesc;
27-
use databend_storages_common_cache::TempDirManager;
2826

2927
use crate::physical_plans::explain::PlanStatsInfo;
3028
use crate::physical_plans::format::PhysicalFormat;
@@ -38,8 +36,6 @@ use crate::pipelines::processors::transforms::SortStrategy;
3836
use crate::pipelines::processors::transforms::TransformWindowPartitionCollect;
3937
use crate::pipelines::processors::transforms::WindowPartitionExchange;
4038
use crate::pipelines::processors::transforms::WindowPartitionTopNExchange;
41-
use crate::sessions::TableContextQueryIdentity;
42-
use crate::spillers::SpillerDiskConfig;
4339

4440
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
4541
pub struct WindowPartition {
@@ -145,16 +141,6 @@ impl IPhysicalPlan for WindowPartition {
145141
)?;
146142
}
147143

148-
let temp_dir_manager = TempDirManager::instance();
149-
let disk_bytes_limit = GlobalConfig::instance()
150-
.spill
151-
.window_partition_spill_bytes_limit();
152-
let enable_dio = settings.get_enable_dio()?;
153-
let disk_spill = temp_dir_manager
154-
.get_disk_spill_dir(disk_bytes_limit, &builder.ctx.get_id())
155-
.map(|temp_dir| SpillerDiskConfig::new(temp_dir, enable_dio))
156-
.transpose()?;
157-
158144
let window_spill_settings = MemorySettings::from_window_settings(&builder.ctx)?;
159145

160146
let processor_id = AtomicUsize::new(0);
@@ -171,7 +157,6 @@ impl IPhysicalPlan for WindowPartition {
171157
num_processors,
172158
num_partitions,
173159
window_spill_settings.clone(),
174-
disk_spill.clone(),
175160
strategy,
176161
)?,
177162
)))

src/query/service/src/pipelines/processors/transforms/materialized_cte.rs

Lines changed: 58 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,16 @@ use std::sync::Arc;
1818

1919
use async_channel::Receiver;
2020
use async_channel::Sender;
21+
use databend_base::uniq_id::GlobalUniq;
2122
use databend_common_base::base::Progress;
2223
use databend_common_base::base::ProgressValues;
2324
use databend_common_base::runtime::profile::Profile;
2425
use databend_common_base::runtime::profile::ProfileStatisticsName;
2526
use databend_common_catalog::table_context::TableContextProgress;
26-
use databend_common_config::GlobalConfig;
2727
use databend_common_exception::ErrorCode;
2828
use databend_common_exception::Result;
2929
use databend_common_expression::DataBlock;
30+
use databend_common_expression::DataSchemaRef;
3031
use databend_common_pipeline::core::Event;
3132
use databend_common_pipeline::core::InputPort;
3233
use databend_common_pipeline::core::OutputPort;
@@ -35,21 +36,16 @@ use databend_common_pipeline::core::ProcessorPtr;
3536
use databend_common_pipeline_transforms::MemorySettings;
3637
use databend_common_settings::Settings;
3738
use databend_common_storage::DataOperator;
38-
use databend_storages_common_cache::TempDirManager;
39+
use databend_common_storages_parquet::ReadSettings;
40+
use parquet::file::metadata::RowGroupMetaData;
3941

4042
use crate::sessions::QueryContext;
41-
use crate::sessions::TableContextQueryIdentity;
42-
use crate::spillers::BackpressureSpiller;
43-
use crate::spillers::SpillReader;
44-
use crate::spillers::SpillWriter;
45-
use crate::spillers::SpillerConfig;
46-
use crate::spillers::SpillerDiskConfig;
47-
use crate::spillers::SpillerType;
43+
use crate::spillers::SpillTarget;
4844
use crate::spillers::SpillsBufferPool;
4945

50-
pub type MaterializedCteSpiller = BackpressureSpiller;
51-
5246
const MATERIALIZED_CTE_SPILL_UNIT_SIZE: usize = 8 * 1024 * 1024;
47+
/// Maximum number of row groups per file.
48+
const MAX_ROW_GROUPS_PER_FILE: usize = 2 << 15;
5349

5450
#[derive(Clone)]
5551
pub enum MaterializedCtePayload {
@@ -59,60 +55,42 @@ pub enum MaterializedCtePayload {
5955

6056
#[derive(Clone)]
6157
pub struct MaterializedCteSpilledPayload {
62-
reader: SpillReader,
63-
row_group: usize,
58+
path: String,
59+
row_groups: Arc<Vec<RowGroupMetaData>>,
60+
schema: DataSchemaRef,
61+
ordinal: usize,
6462
}
6563

6664
impl MaterializedCteSpilledPayload {
67-
fn restore(mut self) -> Result<DataBlock> {
68-
let blocks = self.reader.restore(vec![self.row_group], usize::MAX)?;
69-
match blocks.len() {
70-
0 => Err(ErrorCode::Internal(
65+
fn restore(self) -> Result<DataBlock> {
66+
let selected = vec![self.row_groups[self.ordinal].clone()];
67+
let data_operator = DataOperator::instance();
68+
let target = SpillTarget::from_storage_params(data_operator.spill_params());
69+
let operator = data_operator.spill_operator();
70+
let buffer_pool = SpillsBufferPool::instance();
71+
let settings = ReadSettings::default();
72+
let mut reader = buffer_pool.reader(
73+
operator,
74+
self.path,
75+
self.schema,
76+
selected,
77+
target,
78+
settings,
79+
)?;
80+
match reader.read()? {
81+
Some(block) => Ok(block),
82+
None => Err(ErrorCode::Internal(
7183
"Failed to restore materialized cte spilled block",
7284
)),
73-
1 => Ok(blocks.into_iter().next().unwrap()),
74-
_ => DataBlock::concat(&blocks),
7585
}
7686
}
7787
}
7888

79-
pub fn create_materialized_cte_spiller(
80-
ctx: Arc<QueryContext>,
81-
settings: Arc<Settings>,
82-
) -> Result<MaterializedCteSpiller> {
83-
let temp_dir_manager = TempDirManager::instance();
84-
let disk_bytes_limit = GlobalConfig::instance()
85-
.spill
86-
.materialized_cte_spill_bytes_limit();
87-
let enable_dio = settings.get_enable_dio()?;
88-
let disk_spill = temp_dir_manager
89-
.get_disk_spill_dir(disk_bytes_limit, &ctx.get_id())
90-
.map(|temp_dir| SpillerDiskConfig::new(temp_dir, enable_dio))
91-
.transpose()?;
92-
93-
let config = SpillerConfig {
94-
spiller_type: SpillerType::MaterializedCTE,
95-
location_prefix: ctx.query_id_spill_prefix(),
96-
disk_spill,
97-
use_parquet: settings.get_spilling_file_format()?.is_parquet(),
98-
writer_pool_bytes: settings
99-
.get_spill_writer_memory_pool_size_mb()?
100-
.saturating_mul(1024 * 1024),
101-
};
102-
let operator = DataOperator::instance().spill_operator();
103-
BackpressureSpiller::create(
104-
ctx,
105-
operator,
106-
config,
107-
SpillsBufferPool::instance(),
108-
MATERIALIZED_CTE_SPILL_UNIT_SIZE,
109-
)
110-
}
111-
11289
pub struct MaterializedCteSink {
11390
input: Arc<InputPort>,
11491
senders: Vec<Sender<MaterializedCtePayload>>,
115-
spiller: MaterializedCteSpiller,
92+
prefix: String,
93+
writer_pool_bytes: usize,
11694
memory_settings: MemorySettings,
11795
input_data: Option<DataBlock>,
11896
pending_payloads: VecDeque<MaterializedCtePayload>,
@@ -123,15 +101,21 @@ pub struct MaterializedCteSink {
123101

124102
impl MaterializedCteSink {
125103
pub fn create(
104+
ctx: Arc<QueryContext>,
126105
input: Arc<InputPort>,
127106
senders: Vec<Sender<MaterializedCtePayload>>,
128-
spiller: MaterializedCteSpiller,
107+
settings: &Settings,
129108
memory_settings: MemorySettings,
130109
) -> Result<ProcessorPtr> {
110+
let prefix = ctx.query_id_spill_prefix();
111+
let writer_pool_bytes = settings
112+
.get_spill_writer_memory_pool_size_mb()?
113+
.saturating_mul(1024 * 1024);
131114
Ok(ProcessorPtr::create(Box::new(Self {
132115
input,
133116
senders,
134-
spiller,
117+
prefix,
118+
writer_pool_bytes,
135119
memory_settings,
136120
input_data: None,
137121
pending_payloads: VecDeque::new(),
@@ -151,27 +135,29 @@ impl MaterializedCteSink {
151135
&self,
152136
data_blocks: Vec<DataBlock>,
153137
) -> Result<Vec<MaterializedCtePayload>> {
154-
let local_file_size = data_blocks
155-
.iter()
156-
.map(DataBlock::memory_size)
157-
.sum::<usize>()
158-
.max(self.spill_unit_size())
159-
.max(1);
138+
let data_operator = DataOperator::instance();
139+
let operator = data_operator.spill_operator();
140+
let buffer_pool = SpillsBufferPool::instance();
141+
let path = format!("{}/{}", self.prefix, GlobalUniq::unique());
142+
let mut writer = buffer_pool.writer(operator, path.clone(), self.writer_pool_bytes)?;
143+
160144
let schema = Arc::new(data_blocks[0].infer_schema());
161-
let mut writer_creator = self.spiller.new_writer_creator(schema)?;
162-
let mut writer = writer_creator.open(Some(local_file_size))?;
163-
let mut row_groups = Vec::with_capacity(data_blocks.len());
164-
for data_block in data_blocks {
165-
row_groups.push(writer.add_row_group(vec![data_block])?);
145+
let mut ordinals = Vec::with_capacity(data_blocks.len());
146+
for block in data_blocks {
147+
writer.write(block)?;
148+
ordinals.push(writer.flush_row_group()?);
166149
}
167-
let reader = writer.close()?;
150+
let (_bytes_written, row_groups) = writer.close()?;
151+
let row_groups = Arc::new(row_groups);
168152

169-
Ok(row_groups
153+
Ok(ordinals
170154
.into_iter()
171-
.map(|row_group| {
155+
.map(|ordinal| {
172156
MaterializedCtePayload::Spilled(MaterializedCteSpilledPayload {
173-
reader: reader.clone(),
174-
row_group,
157+
path: path.clone(),
158+
row_groups: row_groups.clone(),
159+
schema: schema.clone(),
160+
ordinal,
175161
})
176162
})
177163
.collect())
@@ -184,7 +170,7 @@ impl MaterializedCteSink {
184170
self.spilling_blocks.push(data_block);
185171

186172
if self.spilling_bytes >= self.spill_unit_size()
187-
|| self.spilling_blocks.len() >= SpillWriter::MAX_ORDINAL
173+
|| self.spilling_blocks.len() >= MAX_ROW_GROUPS_PER_FILE
188174
{
189175
self.flush_spilling_blocks()?;
190176
}

src/query/service/src/pipelines/processors/transforms/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ pub use hash_join::*;
4343
pub use materialized_cte::CTESource;
4444
pub use materialized_cte::MaterializedCtePayload;
4545
pub use materialized_cte::MaterializedCteSink;
46-
pub use materialized_cte::create_materialized_cte_spiller;
4746
pub use new_hash_join::Join;
4847
pub use new_hash_join::TransformHashJoin;
4948
pub use new_hash_join::*;

0 commit comments

Comments
 (0)