fix: json scan performance on local files#21478
Conversation
The into_stream() implementation of GetResult (from arrow-rs-objectstore) fetches every 8KiB chunk using a spawn_blocking() task, resulting in a lot of scheduling overhead. Fix this by reading the data directly from the async context, using a buffer size of 8KiBs. This avoids any context switch.
|
@Weijun-H please take a look |
| }; | ||
| let result = store.get_opts(&location, opts).await?; | ||
|
|
||
| #[cfg(not(target_arch = "wasm32"))] |
There was a problem hiding this comment.
Sounds like it may be improved upstream as well? Doing a spawn_blocking for each (small) read seems not great.
There was a problem hiding this comment.
| #[cfg(not(target_arch = "wasm32"))] | ||
| if let GetResultPayload::File(mut file, _path) = result.payload { | ||
| use std::io::{Read, Seek, SeekFrom}; | ||
| const CHUNK_SIZE: u64 = 8 * 1024; |
There was a problem hiding this comment.
Is this ideal? Maybe a bit bigger gets higher throughput?
There was a problem hiding this comment.
I've picked this value because that's the default for both
BufReader(which is used when reading the entire json file as shown below)
GetResultPayload::File(file, _) => {
let bytes = file_compression_type.convert_read(file)?;
if newline_delimited {
// NDJSON: use BufReader directly
let reader = BufReader::new(bytes);
let arrow_reader = ReaderBuilder::new(schema)
.with_batch_size(batch_size)
.build(reader)?;
Ok(futures::stream::iter(arrow_reader)
.map(|r| r.map_err(Into::into))
.boxed())
}
- object_store's
GetResult.into_stream:
pub fn into_stream(self) -> BoxStream<'static, Result<Bytes>> {
match self.payload {
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
GetResultPayload::File(file, path) => {
const CHUNK_SIZE: usize = 8 * 1024;
local::chunked_stream(file, path, self.range, CHUNK_SIZE)
}
GetResultPayload::Stream(s) => s,
}
}
I think this value should be kept in sync with the first bullet point (where we read the entire json file).
Weijun-H
left a comment
There was a problem hiding this comment.
LGTM, thanks @ariel-miculas
One suggestion before merge. Can we add a small test that exercises the LocalFileSystem / GetResultPayload::File path? The current tests seem to cover the stream-based path only, while this change is specifically for local files.
Nice use of the feature :) |
alamb
left a comment
There was a problem hiding this comment.
THank you for this @ariel-miculas @Dandandan and @Weijun-H
I agree with @Weijun-H 's suggestion for testing
Can you possible make a PR to add the benchmark you are using? I would love to see if we can help optimize this more
| let read = | ||
| (&mut file) | ||
| .take(to_read) | ||
| .read_to_end(&mut buf) |
There was a problem hiding this comment.
This is blocking IO on a tokio task right?
| } | ||
| })?; | ||
|
|
||
| return Ok(futures::stream::try_unfold( |
There was a problem hiding this comment.
I feel like the upstream object store tries to do the same thing
https://github.com/apache/arrow-rs-object-store/blob/v0.13.2/src/lib.rs#L1636-L1701
Which then calls local stream: https://github.com/apache/arrow-rs-object-store/blob/main/src/local.rs#L926
The major difference is that in object_store the work is done on a spawn_blocking thread rather than inline.
I am a bit worried about doing blocking IO on the main thread here
I wonder if we could try the same approach as done upstream and run this on a different thread, but to Dandan's point:
- Use a larger buffer for the read (e.g. 256k, then slice to 8k for the output)
- buffer some of the IO to minimize the overhead (e.g. use StreamExt::buffered
There was a problem hiding this comment.
Update: after some more review, it seems like the current GetResult::File path does blocking IO as well so this isn't a regression
🤷
There was a problem hiding this comment.
Indeed, I implemented blocking IO on the GetResult::File path because that's how it worked previously. Other approaches I've considered:
- block_in_place - it panics with single-threaded tokio runtimes
- a single spawn_blocking thread with an mpsc channel - this could work but not as straightforward, it requires more careful design, see Remove waits from blocking threads reading spill files. #15654
- using tokio's ReaderStream - this would be similar to the existing
into_streamapproach, but the default buffer size (in tokio-1.50.0/src/io/blocking.rs) is 2MiBDEFAULT_MAX_BUF_SIZE: usize = 2 * 1024 * 1024; it still involves context switching overhead and it requires importing additional libraries, such astokio-utiland thetokio fsfeature
#[cfg(not(target_arch = "wasm32"))]
if let GetResultPayload::File(file, _path) = result.payload {
use std::io::SeekFrom;
use tokio::io::{AsyncReadExt, AsyncSeekExt, BufReader};
let mut tokio_file: tokio::fs::File = tokio::fs::File::from_std(file);
tokio_file
.seek(SeekFrom::Start(range.start))
.await
.map_err(|e| object_store::Error::Generic {
store: "local",
source: Box::new(e),
})?;
const BUF_SIZE: usize = 4 * 1024 * 1024; // 4 MiB
let limited = tokio_file.take(range.end - range.start);
return Ok(tokio_util::io::ReaderStream::with_capacity(
BufReader::with_capacity(BUF_SIZE, limited),
BUF_SIZE,
)
.map_err(|e| object_store::Error::Generic {
store: "local",
source: Box::new(e),
})
.boxed());
}
- keep using
into_streambut increase the buffer size (from the original 8KiB) - this would significantly reduce the number of context switches
I think there are better approaches, but since the original code was doing blocking IO in an async context (which it probably shouldn't do), it would be difficult to achieve the same performance with any of these other approaches.
|
Thanks again @ariel-miculas and everyone |
Which issue does this PR close?
Rationale for this change
The into_stream() implementation of GetResult (from arrow-rs-objectstore) fetches every 8KiB chunk using a spawn_blocking() task, resulting in a lot of scheduling overhead.
Fix this by reading the data directly from the async context, using a buffer size of 8KiBs. This avoids any context switch.
What changes are included in this PR?
Are these changes tested?
and with SIMULATE_LATENCY:
For the tests I've used a c7a.16xlarge ec2 instance, with a trimmed down version of hits.json to 51G (original has 217 GiB), with a warm cache (by running
cat hits_50.json > /dev/null)Are there any user-facing changes?
No