Skip to content

Commit b9f7afd

Browse files
committed
refactor(spill): extract MemoryPool from SpillsBufferPool and make pool size per-session configurable (#19805)
* refactor(spill): extract MemoryPool from SpillsBufferPool and make pool size per-session configurable - Extract MemoryPool struct from SpillsBufferPool with chunk-based allocation and blocking wait - Add session-level setting 'spill_writer_memory_pool_size_mb' (default 20MB) to control per-writer pool size - Remove global 'buffer_pool_memory' from SpillConfig, making memory management more flexible - MemoryPool uses lazy allocation up to max_chunks, then blocks waiting for buffer release * z * z
1 parent f53b370 commit b9f7afd

21 files changed

Lines changed: 270 additions & 143 deletions

File tree

src/query/config/src/config.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3503,10 +3503,6 @@ pub struct SpillConfig {
35033503
#[clap(long, value_name = "PERCENT", default_value = "0")]
35043504
pub result_set_spilling_disk_quota_ratio: u64,
35053505

3506-
/// Total memory for the spill buffer pool in bytes.
3507-
#[clap(long, value_name = "VALUE", default_value = "209715200")]
3508-
pub spill_buffer_pool_memory: u64,
3509-
35103506
/// Number of worker tasks in the spill buffer pool.
35113507
#[clap(long, value_name = "VALUE", default_value = "2")]
35123508
pub spill_buffer_pool_workers: usize,
@@ -3623,7 +3619,6 @@ mod config_converters {
36233619
window_partition_spilling_disk_quota_ratio: spill
36243620
.window_partition_spilling_disk_quota_ratio,
36253621
result_set_spilling_disk_quota_ratio: spill.result_set_spilling_disk_quota_ratio,
3626-
buffer_pool_memory: spill.spill_buffer_pool_memory,
36273622
buffer_pool_workers: spill.spill_buffer_pool_workers,
36283623
})
36293624
}
@@ -3647,7 +3642,6 @@ mod config_converters {
36473642
window_partition_spilling_disk_quota_ratio: value
36483643
.window_partition_spilling_disk_quota_ratio,
36493644
result_set_spilling_disk_quota_ratio: value.result_set_spilling_disk_quota_ratio,
3650-
spill_buffer_pool_memory: value.buffer_pool_memory,
36513645
spill_buffer_pool_workers: value.buffer_pool_workers,
36523646
}
36533647
}

src/query/config/src/inner.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -434,9 +434,6 @@ pub struct SpillConfig {
434434
/// result-set spilling may use for one query.
435435
pub result_set_spilling_disk_quota_ratio: u64,
436436

437-
/// Total memory for the spill buffer pool in bytes.
438-
pub buffer_pool_memory: u64,
439-
440437
/// Number of worker tasks in the spill buffer pool.
441438
pub buffer_pool_workers: usize,
442439
}
@@ -509,7 +506,6 @@ impl SpillConfig {
509506
window_partition_spilling_disk_quota_ratio: 60,
510507
// TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes.
511508
result_set_spilling_disk_quota_ratio: 0,
512-
buffer_pool_memory: 200 * 1024 * 1024,
513509
buffer_pool_workers: 2,
514510
}
515511
}
@@ -527,7 +523,6 @@ impl Default for SpillConfig {
527523
window_partition_spilling_disk_quota_ratio: 60,
528524
// TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes.
529525
result_set_spilling_disk_quota_ratio: 0,
530-
buffer_pool_memory: 200 * 1024 * 1024,
531526
buffer_pool_workers: 2,
532527
}
533528
}

src/query/config/src/mask.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,6 @@ impl SpillConfig {
218218
sort_spilling_disk_quota_ratio,
219219
window_partition_spilling_disk_quota_ratio,
220220
result_set_spilling_disk_quota_ratio,
221-
spill_buffer_pool_memory,
222221
spill_buffer_pool_workers,
223222
} = *self;
224223

@@ -230,7 +229,6 @@ impl SpillConfig {
230229
sort_spilling_disk_quota_ratio,
231230
window_partition_spilling_disk_quota_ratio,
232231
result_set_spilling_disk_quota_ratio,
233-
spill_buffer_pool_memory,
234232
spill_buffer_pool_workers,
235233
}
236234
}
@@ -388,7 +386,6 @@ mod tests {
388386
sort_spilling_disk_quota_ratio: 60,
389387
window_partition_spilling_disk_quota_ratio: 30,
390388
result_set_spilling_disk_quota_ratio: 0,
391-
spill_buffer_pool_memory: 209715200,
392389
spill_buffer_pool_workers: 2,
393390
storage: Some(StorageConfig {
394391
typ: "s3".to_string(),

src/query/service/src/pipelines/builders/builder_sort.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,9 @@ impl SortPipelineBuilder {
138138
location_prefix,
139139
disk_spill,
140140
use_parquet: settings.get_spilling_file_format()?.is_parquet(),
141+
writer_pool_bytes: settings
142+
.get_spill_writer_memory_pool_size_mb()?
143+
.saturating_mul(1024 * 1024),
141144
};
142145
let op = DataOperator::instance().spill_operator();
143146
SortSpillerImpl::new(self.ctx.clone(), op, config)?
@@ -224,6 +227,9 @@ impl SortPipelineBuilder {
224227
location_prefix,
225228
disk_spill: None,
226229
use_parquet: settings.get_spilling_file_format()?.is_parquet(),
230+
writer_pool_bytes: settings
231+
.get_spill_writer_memory_pool_size_mb()?
232+
.saturating_mul(1024 * 1024),
227233
};
228234
let op = DataOperator::instance().spill_operator();
229235
SortSpillerImpl::new(self.ctx.clone(), op, config)?

src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,21 @@ use crate::spillers::SpillTarget;
4545
use crate::spillers::SpillsBufferPool;
4646
use crate::spillers::SpillsDataWriter;
4747

48+
const BYTES_PER_MIB: usize = 1024 * 1024;
49+
4850
struct PayloadWriter {
4951
path: String,
5052
writer: SpillsDataWriter,
5153
}
5254

5355
impl PayloadWriter {
54-
fn try_create(prefix: &str) -> Result<Self> {
56+
fn try_create(prefix: &str, writer_pool_bytes: usize) -> Result<Self> {
5557
let data_operator = DataOperator::instance();
5658
let operator = data_operator.spill_operator();
5759
let buffer_pool = SpillsBufferPool::instance();
5860
let file_path = format!("{}/{}", prefix, GlobalUniq::unique());
59-
let spills_data_writer = buffer_pool.writer(operator, file_path.clone())?;
61+
let spills_data_writer =
62+
buffer_pool.writer(operator, file_path.clone(), writer_pool_bytes)?;
6063

6164
Ok(PayloadWriter {
6265
path: file_path,
@@ -109,16 +112,23 @@ impl WriteStats {
109112
struct AggregatePayloadWriters {
110113
spill_prefix: String,
111114
partition_count: usize,
115+
writer_pool_bytes: usize,
112116
writers: Vec<Option<PayloadWriter>>,
113117
write_stats: WriteStats,
114118
ctx: Arc<QueryContext>,
115119
}
116120

117121
impl AggregatePayloadWriters {
118-
pub fn create(prefix: &str, partition_count: usize, ctx: Arc<QueryContext>) -> Self {
122+
pub fn create(
123+
prefix: &str,
124+
partition_count: usize,
125+
writer_pool_bytes: usize,
126+
ctx: Arc<QueryContext>,
127+
) -> Self {
119128
AggregatePayloadWriters {
120129
spill_prefix: prefix.to_string(),
121130
partition_count,
131+
writer_pool_bytes,
122132
writers: Self::empty_writers(partition_count),
123133
write_stats: WriteStats::default(),
124134
ctx,
@@ -133,7 +143,10 @@ impl AggregatePayloadWriters {
133143

134144
fn ensure_writer(&mut self, bucket: usize) -> Result<&mut PayloadWriter> {
135145
if self.writers[bucket].is_none() {
136-
self.writers[bucket] = Some(PayloadWriter::try_create(&self.spill_prefix)?);
146+
self.writers[bucket] = Some(PayloadWriter::try_create(
147+
&self.spill_prefix,
148+
self.writer_pool_bytes,
149+
)?);
137150
}
138151

139152
Ok(self.writers[bucket].as_mut().unwrap())
@@ -335,8 +348,13 @@ impl<P: PartitionStream> NewAggregateSpiller<P> {
335348
let table_ctx: Arc<dyn TableContext> = ctx.clone();
336349
let read_setting = ReadSettings::from_settings(&table_ctx.get_settings())?;
337350
let spill_prefix = ctx.query_id_spill_prefix();
351+
let writer_pool_bytes = ctx
352+
.get_settings()
353+
.get_spill_writer_memory_pool_size_mb()?
354+
.saturating_mul(BYTES_PER_MIB);
338355

339-
let payload_writers = AggregatePayloadWriters::create(&spill_prefix, partition_count, ctx);
356+
let payload_writers =
357+
AggregatePayloadWriters::create(&spill_prefix, partition_count, writer_pool_bytes, ctx);
340358

341359
Ok(Self {
342360
memory_settings,

src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_spill_writer.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ impl TransformAggregateSpillWriter {
6868
location_prefix,
6969
disk_spill: None,
7070
use_parquet: ctx.get_settings().get_spilling_file_format()?.is_parquet(),
71+
writer_pool_bytes: ctx
72+
.get_settings()
73+
.get_spill_writer_memory_pool_size_mb()?
74+
.saturating_mul(1024 * 1024),
7175
};
7276

7377
let spiller = Spiller::create(ctx.clone(), operator, config.clone())?;

src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,10 @@ impl TransformExchangeAggregateSerializer {
9797
location_prefix,
9898
disk_spill: None,
9999
use_parquet: ctx.get_settings().get_spilling_file_format()?.is_parquet(),
100+
writer_pool_bytes: ctx
101+
.get_settings()
102+
.get_spill_writer_memory_pool_size_mb()?
103+
.saturating_mul(1024 * 1024),
100104
};
101105

102106
let spiller = Spiller::create(ctx.clone(), operator, config.clone())?;

src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ impl HashJoinSpiller {
8181
location_prefix,
8282
disk_spill: None,
8383
use_parquet: ctx.get_settings().get_spilling_file_format()?.is_parquet(),
84+
writer_pool_bytes: ctx
85+
.get_settings()
86+
.get_spill_writer_memory_pool_size_mb()?
87+
.saturating_mul(1024 * 1024),
8488
};
8589
let operator = DataOperator::instance().spill_operator();
8690
let spiller = Spiller::create(ctx.clone(), operator, spill_config)?;

src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,15 @@ use crate::spillers::SpillsBufferPool;
4646
use crate::spillers::SpillsDataReader;
4747
use crate::spillers::SpillsDataWriter;
4848

49+
const BYTES_PER_MIB: usize = 1024 * 1024;
50+
4951
pub struct GraceHashJoin<T: GraceMemoryJoin> {
5052
pub(crate) desc: Arc<HashJoinDesc>,
5153
pub(crate) hash_method_kind: HashMethodKind,
5254
pub(crate) function_context: FunctionContext,
5355

5456
pub(crate) location_prefix: String,
57+
pub(crate) writer_pool_bytes: usize,
5558
pub(crate) shift_bits: usize,
5659

5760
pub(crate) stage: RestoreStage,
@@ -93,7 +96,8 @@ impl<T: GraceMemoryJoin> Join for GraceHashJoin<T> {
9396

9497
let mut ready_partitions = Vec::with_capacity(self.partitions.len());
9598
for id in 0..self.partitions.len() {
96-
let mut partition = GraceJoinPartition::create(&self.location_prefix)?;
99+
let mut partition =
100+
GraceJoinPartition::create(&self.location_prefix, self.writer_pool_bytes)?;
97101

98102
std::mem::swap(&mut self.partitions[id], &mut partition);
99103
ready_partitions.push(partition);
@@ -206,18 +210,25 @@ impl<T: GraceMemoryJoin> GraceHashJoin<T> {
206210
let rows = settings.get_max_block_size()? as usize;
207211
let bytes = settings.get_max_block_bytes()? as usize;
208212
let location_prefix = ctx.query_id_spill_prefix();
213+
let writer_pool_bytes = settings
214+
.get_spill_writer_memory_pool_size_mb()?
215+
.saturating_mul(BYTES_PER_MIB);
209216

210217
let mut partitions = Vec::with_capacity(16);
211218

212219
for _ in 0..16 {
213-
partitions.push(GraceJoinPartition::create(&location_prefix)?);
220+
partitions.push(GraceJoinPartition::create(
221+
&location_prefix,
222+
writer_pool_bytes,
223+
)?);
214224
}
215225

216226
Ok(GraceHashJoin {
217227
desc,
218228
state,
219229
shift_bits,
220230
location_prefix,
231+
writer_pool_bytes,
221232
hash_method_kind,
222233
memory_hash_join,
223234
function_context: function_ctx,
@@ -416,13 +427,14 @@ pub struct GraceJoinPartition {
416427
}
417428

418429
impl GraceJoinPartition {
419-
pub fn create(prefix: &str) -> Result<GraceJoinPartition> {
430+
pub fn create(prefix: &str, writer_pool_bytes: usize) -> Result<GraceJoinPartition> {
420431
let data_operator = DataOperator::instance();
421432

422433
let operator = data_operator.spill_operator();
423434
let buffer_pool = SpillsBufferPool::instance();
424435
let file_path = format!("{}/{}", prefix, GlobalUniq::unique());
425-
let spills_data_writer = buffer_pool.writer(operator, file_path.clone())?;
436+
let spills_data_writer =
437+
buffer_pool.writer(operator, file_path.clone(), writer_pool_bytes)?;
426438

427439
Ok(GraceJoinPartition {
428440
path: file_path,

src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,9 @@ impl<S: DataProcessorStrategy> TransformWindowPartitionCollect<S> {
193193
location_prefix,
194194
disk_spill,
195195
use_parquet: settings.get_spilling_file_format()?.is_parquet(),
196+
writer_pool_bytes: settings
197+
.get_spill_writer_memory_pool_size_mb()?
198+
.saturating_mul(1024 * 1024),
196199
};
197200

198201
// Create spillers for window operator.

0 commit comments

Comments
 (0)