Skip to content

Commit eb6c472

Browse files
committed
refactor: Update SortMergeJoin to use async spill abstractions and remove open_sync
1 parent c5a8ee1 commit eb6c472

4 files changed

Lines changed: 441 additions & 157 deletions

File tree

datafusion/execution/src/disk_manager.rs

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,115 @@ fn create_local_dirs(local_dirs: &[PathBuf]) -> Result<Vec<Arc<TempDir>>> {
473473
.collect()
474474
}
475475

476+
pub struct FileSpillWriter {
477+
file: std::fs::File,
478+
disk_manager: Arc<DiskManager>,
479+
current_file_disk_usage: Arc<AtomicU64>,
480+
}
481+
482+
impl SpillWriter for FileSpillWriter {
483+
fn write(&mut self, data: Bytes) -> Result<()> {
484+
use std::io::Write;
485+
486+
let len = data.len() as u64;
487+
if len == 0 {
488+
return Ok(());
489+
}
490+
491+
self.file
492+
.write_all(&data)
493+
.map_err(DataFusionError::IoError)?;
494+
self.current_file_disk_usage
495+
.fetch_add(len, Ordering::Relaxed);
496+
let new_global = self
497+
.disk_manager
498+
.used_disk_space
499+
.fetch_add(len, Ordering::Relaxed)
500+
+ len;
501+
502+
if new_global > self.disk_manager.max_temp_directory_size {
503+
return resources_err!(
504+
"The used disk space during the spilling process has exceeded the allowable limit of {}. \
505+
Please try increasing the config: `datafusion.runtime.max_temp_directory_size`.",
506+
human_readable_size(self.disk_manager.max_temp_directory_size as usize)
507+
);
508+
}
509+
510+
Ok(())
511+
}
512+
513+
fn flush(&mut self) -> Result<()> {
514+
use std::io::Write;
515+
self.file.flush().map_err(DataFusionError::IoError)
516+
}
517+
518+
fn finish(&mut self) -> Result<()> {
519+
// flush() already called by SpillWriteAdapter before finish()
520+
Ok(())
521+
}
522+
}
523+
524+
impl SpillFile for RefCountedTempFile {
525+
fn path(&self) -> Option<&Path> {
526+
Some(self.tempfile.path())
527+
}
528+
529+
fn size(&self) -> Option<u64> {
530+
Some(self.current_disk_usage())
531+
}
532+
#[cfg(not(target_arch = "wasm32"))]
533+
fn read_stream(
534+
&self,
535+
) -> Result<std::pin::Pin<Box<dyn futures::Stream<Item = Result<Bytes>> + Send>>>
536+
{
537+
let path = self.path().to_owned();
538+
539+
let stream =
540+
futures::stream::once(async move {
541+
tokio::fs::File::open(&path)
542+
.await
543+
.map_err(DataFusionError::IoError)
544+
})
545+
.flat_map(
546+
|open_result| -> std::pin::Pin<
547+
Box<dyn futures::Stream<Item = Result<Bytes>> + Send>,
548+
> {
549+
match open_result {
550+
Ok(file) => Box::pin(
551+
tokio_util::io::ReaderStream::new(file)
552+
.map(|r| r.map_err(DataFusionError::IoError)),
553+
),
554+
Err(e) => Box::pin(futures::stream::once(async move { Err(e) })),
555+
}
556+
},
557+
);
558+
559+
Ok(Box::pin(stream))
560+
}
561+
562+
#[cfg(target_arch = "wasm32")]
563+
fn read_stream(
564+
&self,
565+
) -> Result<std::pin::Pin<Box<dyn futures::Stream<Item = Result<Bytes>> + Send>>>
566+
{
567+
datafusion_common::exec_err!(
568+
"Default OS file spilling is not supported on WASM. Configure DiskManager with a Custom TempFileFactory."
569+
)
570+
}
571+
572+
fn open_writer(&self) -> Result<Box<dyn SpillWriter>> {
573+
let file = self
574+
.tempfile
575+
.as_file()
576+
.try_clone()
577+
.map_err(DataFusionError::IoError)?;
578+
Ok(Box::new(FileSpillWriter {
579+
file,
580+
disk_manager: Arc::clone(&self.disk_manager),
581+
current_file_disk_usage: Arc::clone(&self.current_file_disk_usage),
582+
}))
583+
}
584+
}
476585
#[cfg(test)]
477586
mod tests {
478587
use super::*;
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use bytes::Bytes;
19+
use datafusion_common::Result;
20+
use futures::Stream;
21+
use std::path::Path;
22+
use std::pin::Pin;
23+
use std::sync::Arc;
24+
25+
/// Abstraction over a spill file backend.
26+
/// Implementations handle their own quota enforcement and blocking concerns.
27+
pub trait SpillFile: Send + Sync {
28+
/// Returns the OS path if this is a local file, None otherwise.
29+
fn path(&self) -> Option<&Path> {
30+
None
31+
}
32+
33+
/// Returns current size in bytes if cheaply available.
34+
fn size(&self) -> Option<u64>;
35+
36+
/// Returns file contents as an async stream of byte chunks.
37+
fn read_stream(&self) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes>> + Send>>>;
38+
39+
/// Opens a writer for appending data to this file.
40+
fn open_writer(&self) -> Result<Box<dyn SpillWriter>>;
41+
}
42+
43+
/// Writer for spill file backends.
44+
/// Receives zero-copy `Bytes` payloads from the IPCStreamWriter adapter.
45+
pub trait SpillWriter: Send {
46+
fn write(&mut self, data: Bytes) -> Result<()>;
47+
fn flush(&mut self) -> Result<()>;
48+
/// Finalizes the write after all data has been flushed.
49+
///
50+
/// Implementations must not call `flush` internally.
51+
/// Intended for close/sync/commit operations.
52+
fn finish(&mut self) -> Result<()>;
53+
}
54+
55+
/// Factory for creating spill files.
56+
pub trait TempFileFactory:
57+
Send + Sync + std::panic::UnwindSafe + std::panic::RefUnwindSafe
58+
{
59+
fn create_temp_file(&self, description: &str) -> Result<Arc<dyn SpillFile>>;
60+
}

0 commit comments

Comments
 (0)