-
Notifications
You must be signed in to change notification settings - Fork 2k
fix: json scan performance on local files #21478
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,7 +28,7 @@ use std::task::{Context, Poll}; | |
| use bytes::Bytes; | ||
| use futures::stream::{BoxStream, Stream}; | ||
| use futures::{StreamExt, TryFutureExt}; | ||
| use object_store::{GetOptions, GetRange, ObjectStore}; | ||
| use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore}; | ||
|
|
||
| /// How far past `raw_end` the initial bounded fetch covers. If the terminating | ||
| /// newline is not found within this window, `ScanningLastTerminator` issues | ||
|
|
@@ -90,10 +90,52 @@ async fn get_stream( | |
| range: std::ops::Range<u64>, | ||
| ) -> object_store::Result<BoxStream<'static, object_store::Result<Bytes>>> { | ||
| let opts = GetOptions { | ||
| range: Some(GetRange::Bounded(range)), | ||
| range: Some(GetRange::Bounded(range.clone())), | ||
| ..Default::default() | ||
| }; | ||
| let result = store.get_opts(&location, opts).await?; | ||
|
|
||
| #[cfg(not(target_arch = "wasm32"))] | ||
| if let GetResultPayload::File(mut file, _path) = result.payload { | ||
| use std::io::{Read, Seek, SeekFrom}; | ||
| const CHUNK_SIZE: u64 = 8 * 1024; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this ideal? Maybe a bit bigger gets higher throughput?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've picked this value because that's the default for both
I think this value should be kept in sync with the first bullet point (where we read the entire json file). |
||
|
|
||
| file.seek(SeekFrom::Start(range.start)).map_err(|e| { | ||
| object_store::Error::Generic { | ||
| store: "local", | ||
| source: Box::new(e), | ||
| } | ||
| })?; | ||
|
|
||
| return Ok(futures::stream::try_unfold( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel like the upstream object store tries to do the same thing https://github.com/apache/arrow-rs-object-store/blob/v0.13.2/src/lib.rs#L1636-L1701 Which then calls local stream: https://github.com/apache/arrow-rs-object-store/blob/main/src/local.rs#L926 The major difference is that in object_store the work is done on a I am a bit worried about doing blocking IO on the main thread here I wonder if we could try the same approach as done upstream and run this on a different thread, but to Dandan's point:
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Update: after some more review, it seems like the current GetResult::File path does blocking IO as well so this isn't a regression 🤷
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Indeed, I implemented blocking IO on the GetResult::File path because that's how it worked previously. Other approaches I've considered:
I think there are better approaches, but since the original code was doing blocking IO in an async context (which it probably shouldn't do), it would be difficult to achieve the same performance with any of these other approaches. |
||
| (file, range.end - range.start), | ||
| move |(mut file, remaining)| async move { | ||
| if remaining == 0 { | ||
| return Ok(None); | ||
| } | ||
| let to_read = remaining.min(CHUNK_SIZE); | ||
| let cap = usize::try_from(to_read).map_err(|e| { | ||
| object_store::Error::Generic { | ||
| store: "local", | ||
| source: Box::new(e), | ||
| } | ||
| })?; | ||
|
|
||
| let mut buf = Vec::with_capacity(cap); | ||
| let read = | ||
| (&mut file) | ||
| .take(to_read) | ||
| .read_to_end(&mut buf) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is blocking IO on a tokio task right? |
||
| .map_err(|e| object_store::Error::Generic { | ||
| store: "local", | ||
| source: Box::new(e), | ||
| })?; | ||
| Ok(Some((Bytes::from(buf), (file, remaining - read as u64)))) | ||
| }, | ||
| ) | ||
| .boxed()); | ||
| } | ||
|
|
||
| Ok(result.into_stream()) | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds like it may be improved upstream as well? Doing a spawn_blocking for each (small) read seems not great.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
opened apache/arrow-rs-object-store#693