Skip to content

Commit b4ba0bc

Browse files
committed
fix: json scan performance on local files
The into_stream() implementation of GetResult (from arrow-rs-objectstore) fetches every 8KiB chunk using a spawn_blocking() task, resulting in a lot of scheduling overhead. Fix this by reading the data directly from the async context, using a buffer size of 8KiBs. This avoids any context switch.
1 parent 5ba06ac commit b4ba0bc

1 file changed

Lines changed: 44 additions & 2 deletions

File tree

datafusion/datasource-json/src/boundary_stream.rs

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use std::task::{Context, Poll};
2828
use bytes::Bytes;
2929
use futures::stream::{BoxStream, Stream};
3030
use futures::{StreamExt, TryFutureExt};
31-
use object_store::{GetOptions, GetRange, ObjectStore};
31+
use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore};
3232

3333
/// How far past `raw_end` the initial bounded fetch covers. If the terminating
3434
/// newline is not found within this window, `ScanningLastTerminator` issues
@@ -90,10 +90,52 @@ async fn get_stream(
9090
range: std::ops::Range<u64>,
9191
) -> object_store::Result<BoxStream<'static, object_store::Result<Bytes>>> {
9292
let opts = GetOptions {
93-
range: Some(GetRange::Bounded(range)),
93+
range: Some(GetRange::Bounded(range.clone())),
9494
..Default::default()
9595
};
9696
let result = store.get_opts(&location, opts).await?;
97+
98+
#[cfg(not(target_arch = "wasm32"))]
99+
if let GetResultPayload::File(mut file, _path) = result.payload {
100+
use std::io::{Read, Seek, SeekFrom};
101+
const CHUNK_SIZE: u64 = 8 * 1024;
102+
103+
file.seek(SeekFrom::Start(range.start)).map_err(|e| {
104+
object_store::Error::Generic {
105+
store: "local",
106+
source: Box::new(e),
107+
}
108+
})?;
109+
110+
return Ok(futures::stream::try_unfold(
111+
(file, range.end - range.start),
112+
move |(mut file, remaining)| async move {
113+
if remaining == 0 {
114+
return Ok(None);
115+
}
116+
let to_read = remaining.min(CHUNK_SIZE);
117+
let cap = usize::try_from(to_read).map_err(|e| {
118+
object_store::Error::Generic {
119+
store: "local",
120+
source: Box::new(e),
121+
}
122+
})?;
123+
124+
let mut buf = Vec::with_capacity(cap);
125+
let read =
126+
(&mut file)
127+
.take(to_read)
128+
.read_to_end(&mut buf)
129+
.map_err(|e| object_store::Error::Generic {
130+
store: "local",
131+
source: Box::new(e),
132+
})?;
133+
Ok(Some((Bytes::from(buf), (file, remaining - read as u64))))
134+
},
135+
)
136+
.boxed());
137+
}
138+
97139
Ok(result.into_stream())
98140
}
99141

0 commit comments

Comments
 (0)