Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions datafusion/datasource-json/src/boundary_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::task::{Context, Poll};
use bytes::Bytes;
use futures::stream::{BoxStream, Stream};
use futures::{StreamExt, TryFutureExt};
use object_store::{GetOptions, GetRange, ObjectStore};
use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore};

/// How far past `raw_end` the initial bounded fetch covers. If the terminating
/// newline is not found within this window, `ScanningLastTerminator` issues
Expand Down Expand Up @@ -90,10 +90,52 @@ async fn get_stream(
range: std::ops::Range<u64>,
) -> object_store::Result<BoxStream<'static, object_store::Result<Bytes>>> {
let opts = GetOptions {
range: Some(GetRange::Bounded(range)),
range: Some(GetRange::Bounded(range.clone())),
..Default::default()
};
let result = store.get_opts(&location, opts).await?;

#[cfg(not(target_arch = "wasm32"))]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like it may be improved upstream as well? Doing a spawn_blocking for each (small) read seems not great.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if let GetResultPayload::File(mut file, _path) = result.payload {
use std::io::{Read, Seek, SeekFrom};
const CHUNK_SIZE: u64 = 8 * 1024;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this ideal? Maybe a bit bigger gets higher throughput?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've picked this value because that's the default for both

  • BufReader (which is used when reading the entire json file as shown below)
                GetResultPayload::File(file, _) => {
                    let bytes = file_compression_type.convert_read(file)?;

                    if newline_delimited {
                        // NDJSON: use BufReader directly
                        let reader = BufReader::new(bytes);
                        let arrow_reader = ReaderBuilder::new(schema)
                            .with_batch_size(batch_size)
                            .build(reader)?;

                        Ok(futures::stream::iter(arrow_reader)
                            .map(|r| r.map_err(Into::into))
                            .boxed())
                    }
  • object_store's GetResult.into_stream:
    pub fn into_stream(self) -> BoxStream<'static, Result<Bytes>> {
        match self.payload {
            #[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
            GetResultPayload::File(file, path) => {
                const CHUNK_SIZE: usize = 8 * 1024;
                local::chunked_stream(file, path, self.range, CHUNK_SIZE)
            }
            GetResultPayload::Stream(s) => s,
        }
    }

I think this value should be kept in sync with the first bullet point (where we read the entire json file).


file.seek(SeekFrom::Start(range.start)).map_err(|e| {
object_store::Error::Generic {
store: "local",
source: Box::new(e),
}
})?;

return Ok(futures::stream::try_unfold(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like the upstream object store tries to do the same thing

https://github.com/apache/arrow-rs-object-store/blob/v0.13.2/src/lib.rs#L1636-L1701

Which then calls local stream: https://github.com/apache/arrow-rs-object-store/blob/main/src/local.rs#L926

The major difference is that in object_store the work is done on a spawn_blocking thread rather than inline.

I am a bit worried about doing blocking IO on the main thread here

I wonder if we could try the same approach as done upstream and run this on a different thread, but to Dandan's point:

  1. Use a larger buffer for the read (e.g. 256k, then slice to 8k for the output)
  2. buffer some of the IO to minimize the overhead (e.g. use StreamExt::buffered

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: after some more review, it seems like the current GetResult::File path does blocking IO as well so this isn't a regression

🤷

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, I implemented blocking IO on the GetResult::File path because that's how it worked previously. Other approaches I've considered:

  • block_in_place - it panics with single-threaded tokio runtimes
  • a single spawn_blocking thread with an mpsc channel - this could work but not as straightforward, it requires more careful design, see Remove waits from blocking threads reading spill files. #15654
  • using tokio's ReaderStream - this would be similar to the existing into_stream approach, but the default buffer size (in tokio-1.50.0/src/io/blocking.rs) is 2MiB DEFAULT_MAX_BUF_SIZE: usize = 2 * 1024 * 1024; it still involves context switching overhead and it requires importing additional libraries, such as tokio-util and the tokio fs feature
    #[cfg(not(target_arch = "wasm32"))]
    if let GetResultPayload::File(file, _path) = result.payload {
        use std::io::SeekFrom;
        use tokio::io::{AsyncReadExt, AsyncSeekExt, BufReader};
        let mut tokio_file: tokio::fs::File = tokio::fs::File::from_std(file);
        tokio_file
            .seek(SeekFrom::Start(range.start))
            .await
            .map_err(|e| object_store::Error::Generic {
                store: "local",
                source: Box::new(e),
            })?;
        const BUF_SIZE: usize = 4 * 1024 * 1024; // 4 MiB
        let limited = tokio_file.take(range.end - range.start);
        return Ok(tokio_util::io::ReaderStream::with_capacity(
            BufReader::with_capacity(BUF_SIZE, limited),
            BUF_SIZE,
        )
        .map_err(|e| object_store::Error::Generic {
            store: "local",
            source: Box::new(e),
        })
        .boxed());
    }
  • keep using into_stream but increase the buffer size (from the original 8KiB) - this would significantly reduce the number of context switches

I think there are better approaches, but since the original code was doing blocking IO in an async context (which it probably shouldn't do), it would be difficult to achieve the same performance with any of these other approaches.

(file, range.end - range.start),
move |(mut file, remaining)| async move {
if remaining == 0 {
return Ok(None);
}
let to_read = remaining.min(CHUNK_SIZE);
let cap = usize::try_from(to_read).map_err(|e| {
object_store::Error::Generic {
store: "local",
source: Box::new(e),
}
})?;

let mut buf = Vec::with_capacity(cap);
let read =
(&mut file)
.take(to_read)
.read_to_end(&mut buf)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is blocking IO on a tokio task right?

.map_err(|e| object_store::Error::Generic {
store: "local",
source: Box::new(e),
})?;
Ok(Some((Bytes::from(buf), (file, remaining - read as u64))))
},
)
.boxed());
}

Ok(result.into_stream())
}

Expand Down
Loading