Skip to content

Commit 7b2f284

Browse files
fix: json scan performance on local files (#21478)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #21450 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> The into_stream() implementation of GetResult (from arrow-rs-objectstore) fetches every 8KiB chunk using a spawn_blocking() task, resulting in a lot of scheduling overhead. Fix this by reading the data directly from the async context, using a buffer size of 8KiBs. This avoids any context switch. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? ``` Validated that the initial reported overhead is now much smaller: Comparing json-test-on-main and test-json-improvement -------------------- Benchmark clickbench_2.json -------------------- ┏━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓ ┃ Query     ┃ json-test-on-main ┃ test-json-improvement ┃       Change ┃ ┡━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩ │ QQuery 0  │        2421.62 ms │            2521.19 ms │    no change │ │ QQuery 1  │        2584.29 ms │            2729.98 ms │ 1.06x slower │ │ QQuery 2  │        2662.11 ms │            2782.29 ms │    no change │ │ QQuery 3  │              FAIL │                  FAIL │ incomparable │ │ QQuery 4  │        2764.78 ms │            2896.46 ms │    no change │ │ QQuery 5  │        2676.46 ms │            2758.01 ms │    no change │ │ QQuery 6  │              FAIL │                  FAIL │ incomparable │ │ QQuery 7  │        2684.50 ms │            2752.37 ms │    no change │ │ QQuery 8  │        2781.21 ms │            2827.46 ms │    no change │ │ QQuery 9  │        3039.17 ms │            3165.29 ms │    no change │ │ QQuery 10 │        2791.32 ms │            2843.44 ms │    no change │ │ QQuery 11 │        2839.05 ms │            3011.84 ms │ 1.06x slower │ │ QQuery 12 │        2691.51 ms │            2839.97 ms │ 1.06x slower │ │ QQuery 13 │        2768.57 ms │            2860.68 ms │    no change │ │ QQuery 14 │        2712.50 ms │            2856.80 ms │ 1.05x slower │ │ QQuery 15 │        2807.64 ms │            2888.94 ms │    no change │ │ QQuery 16 │        2774.87 ms │            2875.44 ms │    no change │ │ QQuery 17 │        2797.28 ms │            2850.17 ms │    no change │ │ QQuery 18 │        3017.75 ms │            3111.64 ms │    no change │ │ QQuery 19 │        2801.30 ms │            2927.25 ms │    no change │ │ QQuery 20 │        2743.43 ms │            2862.10 ms │    no change │ │ QQuery 21 │        2811.41 ms │            2906.42 ms │    no change │ │ QQuery 22 │        2953.66 ms │            3038.23 ms │    no change │ │ QQuery 23 │              FAIL │                  FAIL │ incomparable │ │ QQuery 24 │        2862.27 ms │            2940.31 ms │    no change │ │ QQuery 25 │        2763.40 ms │            2848.55 ms │    no change │ │ QQuery 26 │        2840.39 ms │            2950.47 ms │    no change │ │ QQuery 27 │        2886.70 ms │            2921.28 ms │    no change │ │ QQuery 28 │        3145.39 ms │            3221.27 ms │    no change │ │ QQuery 29 │        2821.87 ms │            2869.85 ms │    no change │ │ QQuery 30 │        2953.55 ms │            2990.15 ms │    no change │ │ QQuery 31 │        2997.81 ms │            3049.28 ms │    no change │ │ QQuery 32 │        2969.14 ms │            3126.79 ms │ 1.05x slower │ │ QQuery 33 │        2764.80 ms │            2866.63 ms │    no change │ │ QQuery 34 │        2828.77 ms │            2848.54 ms │    no change │ │ QQuery 35 │        2812.55 ms │            2793.79 ms │    no change │ │ QQuery 36 │              FAIL │                  FAIL │ incomparable │ │ QQuery 37 │              FAIL │                  FAIL │ incomparable │ │ QQuery 38 │              FAIL │                  FAIL │ incomparable │ │ QQuery 39 │              FAIL │                  FAIL │ incomparable │ │ QQuery 40 │              FAIL │                  FAIL │ incomparable │ │ QQuery 41 │              FAIL │                  FAIL │ incomparable │ │ QQuery 42 │              FAIL │                  FAIL │ incomparable │ └───────────┴───────────────────┴───────────────────────┴──────────────┘ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓ ┃ Benchmark Summary                    ┃            ┃ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩ │ Total Time (json-test-on-main)       │ 92771.07ms │ │ Total Time (test-json-improvement)   │ 95732.89ms │ │ Average Time (json-test-on-main)     │  2811.24ms │ │ Average Time (test-json-improvement) │  2901.00ms │ │ Queries Faster                       │          0 │ │ Queries Slower                       │          5 │ │ Queries with No Change               │         28 │ │ Queries with Failure                 │         10 │ └──────────────────────────────────────┴────────────┘ ``` and with SIMULATE_LATENCY: ``` -------------------- Benchmark clickbench_2.json -------------------- ┏━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓ ┃ Query     ┃ json-test-on-main ┃ test-json-improvement ┃        Change ┃ ┡━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩ │ QQuery 0  │        2795.68 ms │            2687.68 ms │     no change │ │ QQuery 1  │        2880.50 ms │            2768.30 ms │     no change │ │ QQuery 2  │        2960.75 ms │            2826.89 ms │     no change │ │ QQuery 3  │              FAIL │                  FAIL │  incomparable │ │ QQuery 4  │        3140.38 ms │            2963.15 ms │ +1.06x faster │ │ QQuery 5  │        2926.66 ms │            2830.43 ms │     no change │ │ QQuery 6  │              FAIL │                  FAIL │  incomparable │ │ QQuery 7  │        3026.29 ms │            2858.30 ms │ +1.06x faster │ │ QQuery 8  │        4302.35 ms │            2954.96 ms │ +1.46x faster │ │ QQuery 9  │        4439.83 ms │            3200.43 ms │ +1.39x faster │ │ QQuery 10 │        3028.32 ms │            2969.32 ms │     no change │ │ QQuery 11 │        3147.81 ms │            3040.74 ms │     no change │ │ QQuery 12 │        4169.45 ms │            2886.59 ms │ +1.44x faster │ │ QQuery 13 │        3839.01 ms │            2997.80 ms │ +1.28x faster │ │ QQuery 14 │        4086.30 ms │            2907.42 ms │ +1.41x faster │ │ QQuery 15 │        4308.07 ms │            3025.22 ms │ +1.42x faster │ │ QQuery 16 │        3084.89 ms │            2984.34 ms │     no change │ │ QQuery 17 │        4287.89 ms │            2984.27 ms │ +1.44x faster │ │ QQuery 18 │        3542.80 ms │            3144.98 ms │ +1.13x faster │ │ QQuery 19 │        4388.70 ms │            3014.37 ms │ +1.46x faster │ │ QQuery 20 │        3149.54 ms │            2986.73 ms │ +1.05x faster │ │ QQuery 21 │        3250.81 ms │            2906.60 ms │ +1.12x faster │ │ QQuery 22 │        3265.98 ms │            3122.25 ms │     no change │ │ QQuery 23 │              FAIL │                  FAIL │  incomparable │ │ QQuery 24 │        3066.52 ms │            2997.55 ms │     no change │ │ QQuery 25 │        4289.31 ms │            2884.22 ms │ +1.49x faster │ │ QQuery 26 │        4223.03 ms │            2933.16 ms │ +1.44x faster │ │ QQuery 27 │        3156.86 ms │            3001.17 ms │     no change │ │ QQuery 28 │        4831.42 ms │            3318.89 ms │ +1.46x faster │ │ QQuery 29 │        3252.45 ms │            4375.90 ms │  1.35x slower │ │ QQuery 30 │        4460.06 ms │            3153.77 ms │ +1.41x faster │ │ QQuery 31 │        4235.85 ms │            3171.58 ms │ +1.34x faster │ │ QQuery 32 │        3435.14 ms │            3202.64 ms │ +1.07x faster │ │ QQuery 33 │        3147.21 ms │            3031.54 ms │     no change │ │ QQuery 34 │        4378.41 ms │            3008.79 ms │ +1.46x faster │ │ QQuery 35 │        4224.36 ms │            2897.53 ms │ +1.46x faster │ │ QQuery 36 │              FAIL │                  FAIL │  incomparable │ │ QQuery 37 │              FAIL │                  FAIL │  incomparable │ │ QQuery 38 │              FAIL │                  FAIL │  incomparable │ │ QQuery 39 │              FAIL │                  FAIL │  incomparable │ │ QQuery 40 │              FAIL │                  FAIL │  incomparable │ │ QQuery 41 │              FAIL │                  FAIL │  incomparable │ │ QQuery 42 │              FAIL │                  FAIL │  incomparable │ └───────────┴───────────────────┴───────────────────────┴───────────────┘ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓ ┃ Benchmark Summary                    ┃             ┃ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩ │ Total Time (json-test-on-main)       │ 120722.63ms │ │ Total Time (test-json-improvement)   │ 100037.48ms │ │ Average Time (json-test-on-main)     │   3658.26ms │ │ Average Time (test-json-improvement) │   3031.44ms │ │ Queries Faster                       │          21 │ │ Queries Slower                       │           1 │ │ Queries with No Change               │          11 │ │ Queries with Failure                 │          10 │ └──────────────────────────────────────┴─────────────┘ ``` For the tests I've used a c7a.16xlarge ec2 instance, with a trimmed down version of hits.json to 51G (original has 217 GiB), with a warm cache (by running `cat hits_50.json > /dev/null`) ## Are there any user-facing changes? No <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent d0692b8 commit 7b2f284

File tree

1 file changed

+44
-2
lines changed

1 file changed

+44
-2
lines changed

datafusion/datasource-json/src/boundary_stream.rs

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use std::task::{Context, Poll};
2828
use bytes::Bytes;
2929
use futures::stream::{BoxStream, Stream};
3030
use futures::{StreamExt, TryFutureExt};
31-
use object_store::{GetOptions, GetRange, ObjectStore};
31+
use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore};
3232

3333
/// How far past `raw_end` the initial bounded fetch covers. If the terminating
3434
/// newline is not found within this window, `ScanningLastTerminator` issues
@@ -90,10 +90,52 @@ async fn get_stream(
9090
range: std::ops::Range<u64>,
9191
) -> object_store::Result<BoxStream<'static, object_store::Result<Bytes>>> {
9292
let opts = GetOptions {
93-
range: Some(GetRange::Bounded(range)),
93+
range: Some(GetRange::Bounded(range.clone())),
9494
..Default::default()
9595
};
9696
let result = store.get_opts(&location, opts).await?;
97+
98+
#[cfg(not(target_arch = "wasm32"))]
99+
if let GetResultPayload::File(mut file, _path) = result.payload {
100+
use std::io::{Read, Seek, SeekFrom};
101+
const CHUNK_SIZE: u64 = 8 * 1024;
102+
103+
file.seek(SeekFrom::Start(range.start)).map_err(|e| {
104+
object_store::Error::Generic {
105+
store: "local",
106+
source: Box::new(e),
107+
}
108+
})?;
109+
110+
return Ok(futures::stream::try_unfold(
111+
(file, range.end - range.start),
112+
move |(mut file, remaining)| async move {
113+
if remaining == 0 {
114+
return Ok(None);
115+
}
116+
let to_read = remaining.min(CHUNK_SIZE);
117+
let cap = usize::try_from(to_read).map_err(|e| {
118+
object_store::Error::Generic {
119+
store: "local",
120+
source: Box::new(e),
121+
}
122+
})?;
123+
124+
let mut buf = Vec::with_capacity(cap);
125+
let read =
126+
(&mut file)
127+
.take(to_read)
128+
.read_to_end(&mut buf)
129+
.map_err(|e| object_store::Error::Generic {
130+
store: "local",
131+
source: Box::new(e),
132+
})?;
133+
Ok(Some((Bytes::from(buf), (file, remaining - read as u64))))
134+
},
135+
)
136+
.boxed());
137+
}
138+
97139
Ok(result.into_stream())
98140
}
99141

0 commit comments

Comments
 (0)