Skip to content

Commit 9036e2d

Browse files
committed
Sample-based sort spill reservation to mitigate merge OOM
DataFusion 52.1.0 has a TOCTOU race in ExternalSorter where merge reservations are freed and re-created empty, letting other partitions steal the memory (apache/datafusion#20642). Until the upstream fix lands, compute a data-aware sort_spill_reservation_bytes by sampling actual Arrow row sizes from the input, estimating spill file count, and reserving enough for the merge phase.
1 parent e88a7ec commit 9036e2d

3 files changed

Lines changed: 223 additions & 57 deletions

File tree

src/commands/transform.rs

Lines changed: 88 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,12 @@ use crate::{
55
ColumnDictionaryConfig, ColumnEncodingConfig, DEFAULT_BLOOM_FILTER_FPP, DataFormat,
66
DictionaryMode, ListOutputsFormat, ParquetCompression, ParquetEncoding, ParquetStatistics,
77
ParquetWriterVersion, PartitionStrategy, SortSpec, TransformCommand, default_thread_budget,
8-
io_strategies::{OutputFileInfo, output_strategy::SinkFactory, path_template::PathTemplate},
8+
io_strategies::{
9+
OutputFileInfo,
10+
input_strategy::InputStrategy,
11+
output_strategy::SinkFactory,
12+
path_template::PathTemplate,
13+
},
914
operations::{query::QueryOperation, sort::SortOperation},
1015
pipeline::Pipeline,
1116
sinks::{
@@ -18,6 +23,7 @@ use crate::{
1823
arrow::ArrowDataSource, data_source::DataSource, parquet::ParquetDataSource,
1924
vortex::VortexDataSource,
2025
},
26+
utils::memory::{estimate_sort_spill_reservation, sample_avg_row_bytes},
2127
};
2228
use anyhow::{Result, anyhow};
2329
use arrow::datatypes::SchemaRef;
@@ -178,70 +184,84 @@ pub async fn run(args: TransformCommand) -> Result<()> {
178184
(from_many, true)
179185
};
180186

181-
let setup_result: Result<()> = {
182-
if !should_glob && input_paths.len() == 1 {
183-
let input_path = &input_paths[0];
184-
let detected_input_format = detect_format(input_path, input_format)?;
185-
186-
let source: Box<dyn DataSource> = match detected_input_format {
187-
DataFormat::Arrow => Box::new(ArrowDataSource::new(input_path.clone())),
188-
DataFormat::Parquet => Box::new(ParquetDataSource::new(input_path.clone())),
189-
DataFormat::Vortex => Box::new(VortexDataSource::new(input_path.clone())),
190-
};
191-
192-
pipeline = pipeline.with_input_strategy_with_single_source(source);
193-
Ok(())
194-
} else {
195-
let mut expanded_paths = Vec::new();
196-
197-
for pattern in &input_paths {
198-
for entry in glob(pattern)
199-
.map_err(|e| anyhow!("Error expanding glob pattern {}: {}", pattern, e))?
200-
{
201-
expanded_paths.push(
202-
entry
203-
.map_err(|e| anyhow!("Error decoding file path: {}", e))?
204-
.to_string_lossy()
205-
.to_string(),
206-
);
207-
}
187+
// resolve input paths (glob-expand if needed), build sources, and create InputStrategy
188+
let resolved_paths: Vec<String>;
189+
let input_strategy = if !should_glob && input_paths.len() == 1 {
190+
let input_path = &input_paths[0];
191+
let source = make_source(input_path, input_format)?;
192+
resolved_paths = vec![input_path.clone()];
193+
InputStrategy::Single(source)
194+
} else {
195+
let mut expanded_paths = Vec::new();
196+
197+
for pattern in &input_paths {
198+
for entry in glob(pattern)
199+
.map_err(|e| anyhow!("Error expanding glob pattern {}: {}", pattern, e))?
200+
{
201+
expanded_paths.push(
202+
entry
203+
.map_err(|e| anyhow!("Error decoding file path: {}", e))?
204+
.to_string_lossy()
205+
.to_string(),
206+
);
208207
}
208+
}
209209

210-
expanded_paths.sort();
211-
expanded_paths.dedup();
210+
expanded_paths.sort();
211+
expanded_paths.dedup();
212212

213-
if expanded_paths.is_empty() {
214-
anyhow::bail!("No input files found matching patterns: {:?}", input_paths);
215-
}
213+
if expanded_paths.is_empty() {
214+
anyhow::bail!("No input files found matching patterns: {:?}", input_paths);
215+
}
216216

217-
let mut sources: Vec<Box<dyn DataSource>> = Vec::new();
218-
let mut schema: Option<SchemaRef> = None;
219-
for input_path in expanded_paths {
220-
let detected_input_format = detect_format(&input_path, input_format)?;
221-
let source: Box<dyn DataSource> = match detected_input_format {
222-
DataFormat::Arrow => Box::new(ArrowDataSource::new(input_path.clone())),
223-
DataFormat::Parquet => Box::new(ParquetDataSource::new(input_path.clone())),
224-
DataFormat::Vortex => Box::new(VortexDataSource::new(input_path.clone())),
225-
};
226-
if let Some(ref schema) = schema {
227-
let source_schema = source.schema()?;
228-
if *schema != source_schema {
229-
anyhow::bail!(
230-
"Schema mismatch for input file {} (does not match other file(s))",
231-
&input_path
232-
);
233-
}
234-
} else {
235-
schema = Some(source.schema()?);
217+
let mut sources: Vec<Box<dyn DataSource>> = Vec::new();
218+
let mut schema: Option<SchemaRef> = None;
219+
for input_path in &expanded_paths {
220+
let source = make_source(input_path, input_format)?;
221+
if let Some(ref schema) = schema {
222+
let source_schema = source.schema()?;
223+
if *schema != source_schema {
224+
anyhow::bail!(
225+
"Schema mismatch for input file {} (does not match other file(s))",
226+
input_path
227+
);
236228
}
237-
sources.push(source);
229+
} else {
230+
schema = Some(source.schema()?);
238231
}
239-
pipeline = pipeline.with_input_strategy_with_multiple_sources(sources);
240-
Ok(())
232+
sources.push(source);
241233
}
234+
resolved_paths = expanded_paths;
235+
InputStrategy::Multiple(sources)
242236
};
243237

244-
setup_result?;
238+
// sample rows to estimate sort spill reservation before handing strategy to pipeline
239+
if has_sort {
240+
let avg_row_bytes = sample_avg_row_bytes(&input_strategy, 100_000).await?;
241+
242+
if avg_row_bytes > 0 {
243+
let total_input_bytes: u64 = resolved_paths
244+
.iter()
245+
.filter_map(|p| std::fs::metadata(p).ok())
246+
.map(|m| m.len())
247+
.sum();
248+
249+
let memory_limit = effective_memory_limit.unwrap_or(total_budget * 60 / 100);
250+
let partitions = effective_target_partitions.unwrap_or(three_quarter_cpus);
251+
let memory_per_partition = memory_limit / partitions.max(1);
252+
253+
let reservation = estimate_sort_spill_reservation(
254+
avg_row_bytes,
255+
total_input_bytes,
256+
memory_per_partition,
257+
8192, // DataFusion default batch size
258+
);
259+
260+
pipeline = pipeline.with_sort_spill_reservation_bytes(Some(reservation));
261+
}
262+
}
263+
264+
pipeline = pipeline.with_input_strategy(input_strategy);
245265

246266
let list_outputs_format = list_outputs;
247267

@@ -483,6 +503,18 @@ fn to_title_case(s: &str) -> String {
483503
.join(" ")
484504
}
485505

506+
fn make_source(
507+
path: &str,
508+
input_format: Option<DataFormat>,
509+
) -> Result<Box<dyn DataSource>> {
510+
let format = detect_format(path, input_format)?;
511+
Ok(match format {
512+
DataFormat::Arrow => Box::new(ArrowDataSource::new(path.to_string())),
513+
DataFormat::Parquet => Box::new(ParquetDataSource::new(path.to_string())),
514+
DataFormat::Vortex => Box::new(VortexDataSource::new(path.to_string())),
515+
})
516+
}
517+
486518
fn detect_format(path: &str, explicit_format: Option<DataFormat>) -> Result<DataFormat> {
487519
if let Some(format) = explicit_format {
488520
return Ok(format);

src/pipeline/mod.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ pub struct PipelineConfig {
2828
pub target_partitions: Option<usize>,
2929
pub spill_path: Option<Utf8PathBuf>,
3030
pub spill_compression: SpillCompression,
31+
pub sort_spill_reservation_bytes: Option<usize>,
3132
}
3233

3334
#[derive(Default)]
@@ -45,6 +46,11 @@ impl Pipeline {
4546
Self::default()
4647
}
4748

49+
pub fn with_input_strategy(mut self, input_strategy: InputStrategy) -> Self {
50+
self.input_strategy = Some(input_strategy);
51+
self
52+
}
53+
4854
pub fn with_input_strategy_with_single_source(mut self, source: Box<dyn DataSource>) -> Self {
4955
self.input_strategy = Some(InputStrategy::Single(source));
5056

@@ -163,6 +169,14 @@ impl Pipeline {
163169
self
164170
}
165171

172+
pub fn with_sort_spill_reservation_bytes(
173+
mut self,
174+
sort_spill_reservation_bytes: Option<usize>,
175+
) -> Self {
176+
self.config.sort_spill_reservation_bytes = sort_spill_reservation_bytes;
177+
self
178+
}
179+
166180
pub async fn execute(&mut self) -> Result<Vec<OutputFileInfo>> {
167181
let mut ctx = self.build_session_context()?;
168182
self.execute_with_session_context(&mut ctx).await
@@ -214,6 +228,10 @@ impl Pipeline {
214228
cfg.options_mut().sql_parser.dialect = self.config.query_dialect.into();
215229
cfg.options_mut().execution.spill_compression = self.config.spill_compression.into();
216230

231+
if let Some(reservation) = self.config.sort_spill_reservation_bytes {
232+
cfg.options_mut().execution.sort_spill_reservation_bytes = reservation;
233+
}
234+
217235
if let Some(target_partitions) = self.config.target_partitions {
218236
cfg = cfg.with_target_partitions(target_partitions);
219237
}

src/utils/memory.rs

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
1-
//! Container-aware memory detection.
1+
//! Container-aware memory detection and sort memory estimation.
22
//!
33
//! Uses cgroup limits when running in a container (Linux), falls back to system
44
//! memory on macOS or when not containerized.
55
66
use arrow::datatypes::{DataType, Schema};
7+
use futures::StreamExt;
78
use sysinfo::System;
89

10+
use crate::io_strategies::input_strategy::InputStrategy;
11+
912
/// Returns total memory in bytes, respecting container cgroup limits.
1013
///
1114
/// In containerized environments (Docker, Kubernetes), this returns the cgroup memory
@@ -165,6 +168,67 @@ fn parse_memory_stat_net_used(content: &str) -> Option<u64> {
165168
Some(total.saturating_sub(slab_reclaimable))
166169
}
167170

171+
/// Sample actual rows from an `InputStrategy` to measure average in-memory Arrow row size.
172+
///
173+
/// Creates a throwaway `SessionContext` and streams batches until `target_rows` rows are
174+
/// read (or EOF). Returns the average bytes per row based on
175+
/// `RecordBatch::get_array_memory_size()`, which reflects the real in-memory footprint
176+
/// including variable-width columns like strings and lists.
177+
pub async fn sample_avg_row_bytes(
178+
input_strategy: &InputStrategy,
179+
target_rows: usize,
180+
) -> anyhow::Result<usize> {
181+
let mut ctx = datafusion::prelude::SessionContext::new();
182+
let mut stream = input_strategy.as_stream(&mut ctx).await?;
183+
184+
let mut total_bytes: usize = 0;
185+
let mut total_rows: usize = 0;
186+
while let Some(batch) = stream.next().await {
187+
let batch = batch?;
188+
total_bytes += batch.get_array_memory_size();
189+
total_rows += batch.num_rows();
190+
if total_rows >= target_rows {
191+
break;
192+
}
193+
}
194+
195+
if total_rows == 0 {
196+
return Ok(0);
197+
}
198+
Ok(total_bytes / total_rows)
199+
}
200+
201+
const MIN_SORT_SPILL_RESERVATION: usize = 10 * 1024 * 1024; // 10MB (DataFusion default)
202+
203+
/// Estimate `sort_spill_reservation_bytes` from sampled row size and input characteristics.
204+
///
205+
/// The merge phase holds one batch per spill file in memory simultaneously.
206+
/// We estimate spill file count from total input size vs per-partition memory budget,
207+
/// then multiply by `batch_size * avg_row_bytes` to get the reservation.
208+
///
209+
/// Clamped to [10MB, memory_per_partition / 2] so there's always room for the sorter
210+
/// to accumulate batches before spilling.
211+
pub fn estimate_sort_spill_reservation(
212+
avg_row_bytes: usize,
213+
total_input_bytes: u64,
214+
memory_per_partition: usize,
215+
batch_size: usize,
216+
) -> usize {
217+
if avg_row_bytes == 0 || memory_per_partition == 0 {
218+
return MIN_SORT_SPILL_RESERVATION;
219+
}
220+
221+
let estimated_spill_files = (total_input_bytes as usize)
222+
.checked_div(memory_per_partition)
223+
.unwrap_or(1)
224+
.max(1);
225+
let merge_batch_bytes = batch_size.saturating_mul(avg_row_bytes);
226+
let reservation = estimated_spill_files.saturating_mul(merge_batch_bytes);
227+
let max_reservation = memory_per_partition / 2;
228+
229+
reservation.clamp(MIN_SORT_SPILL_RESERVATION, max_reservation)
230+
}
231+
168232
/// Returns the exact byte size for fixed-width Arrow types, or `None` for variable-width types.
169233
#[allow(clippy::cast_sign_loss)]
170234
pub fn estimate_fixed_type_bytes(dt: &DataType) -> Option<usize> {
@@ -384,4 +448,56 @@ kernel 500";
384448
let schema = Schema::empty();
385449
assert_eq!(estimate_row_bytes(&schema), 0);
386450
}
451+
452+
#[test]
453+
fn test_sort_spill_reservation_hits_floor() {
454+
// small input that would compute below 10MB
455+
let reservation = estimate_sort_spill_reservation(
456+
100, // 100 bytes/row
457+
1_000_000, // 1MB input
458+
100_000_000, // 100MB per partition
459+
8192, // batch size
460+
);
461+
assert_eq!(reservation, 10 * 1024 * 1024); // 10MB floor
462+
}
463+
464+
#[test]
465+
fn test_sort_spill_reservation_scales_up() {
466+
// large input: 10GB input, 500MB per partition = ~20 spill files
467+
// 20 * 8192 * 200 = ~32MB
468+
let reservation = estimate_sort_spill_reservation(
469+
200, // 200 bytes/row
470+
10_000_000_000, // 10GB input
471+
500_000_000, // 500MB per partition
472+
8192, // batch size
473+
);
474+
assert_eq!(reservation, 20 * 8192 * 200);
475+
assert!(reservation > 10 * 1024 * 1024); // above floor
476+
assert!(reservation < 250_000_000); // below ceiling (250MB)
477+
}
478+
479+
#[test]
480+
fn test_sort_spill_reservation_hits_ceiling() {
481+
// huge input with wide rows: would exceed half of per-partition budget
482+
let memory_per_partition = 100_000_000; // 100MB
483+
let reservation = estimate_sort_spill_reservation(
484+
2000, // 2KB/row (wide rows)
485+
100_000_000_000, // 100GB input
486+
memory_per_partition,
487+
8192,
488+
);
489+
assert_eq!(reservation, memory_per_partition / 2); // ceiling
490+
}
491+
492+
#[test]
493+
fn test_sort_spill_reservation_zero_row_bytes() {
494+
let reservation = estimate_sort_spill_reservation(0, 1_000_000, 100_000_000, 8192);
495+
assert_eq!(reservation, 10 * 1024 * 1024); // falls back to floor
496+
}
497+
498+
#[test]
499+
fn test_sort_spill_reservation_zero_memory_per_partition() {
500+
let reservation = estimate_sort_spill_reservation(100, 1_000_000, 0, 8192);
501+
assert_eq!(reservation, 10 * 1024 * 1024); // falls back to floor
502+
}
387503
}

0 commit comments

Comments
 (0)