perf: optimize object store requests when reading JSON#20823
perf: optimize object store requests when reading JSON#20823alamb merged 12 commits intoapache:mainfrom
Conversation
|
a bit ironic |
This is an alternative approach to apache#19687 Instead of reading the entire range in the json FileOpener, implement an AlignedBoundaryStream which scans the range for newlines as the FileStream requests data from the stream, by wrapping the original stream returned by the ObjectStore. This eliminated the overhead of the extra two get_opts requests needed by calculate_range and more importantly, it allows for efficient read-ahead implementations by the underlying ObjectStore. Previously this was inefficient because the streams opened by calculate_range included a stream from (start - 1) to file_size and another one from (end - 1) to end_of_file, just to find the two relevant newlines.
e3b5355 to
f5b3811
Compare
|
Thanks for this PR @ariel-miculas Do you have any benchmark results for this change? Even some example queries @Weijun-H do you know of any benchmarks to run? |
|
No, I'm having troubles coming up with a realistic benchmark. The previous benchmark https://github.com/apache/datafusion/pull/19687/changes#diff-5358b38b6265d769b66b614f7ba88ed9320f7a9fce5197330b7c01c2a8a3ed3b incorrectly assumes that all the requested bytes (via get_opts) will be read, while you can actually request a 10GiB stream of bytes and read only 16KiB from it. As a result, the benchmark of the previous PR for reducing the read amplification shows impressive improvements, but it hides the fact that it breaks the parallelization between data fetching and json decoding (by doing all the data fetching in the JsonOpener instead of allowing FileStream to do its magic). So I'm not sure how to write a benchmark that can prove at the same time that:
|
| terminator: u8, | ||
| /// Effective end boundary. Set to `u64::MAX` when `end >= file_size` | ||
| /// (last partition), so `FetchingChunks` never transitions to | ||
| /// `ScanningLastTerminator` and simply streams to EOF. |
There was a problem hiding this comment.
... streams to EOF is not clear to me. What do you mean ?
There was a problem hiding this comment.
It means we passthrough all the chunks to the json decoder (the caller which polls AlignedBoundaryStream), staying in the FetchingChunks phase until we consume the entire inner stream; this only happens when raw_end >= file_size, i.e. for the last file range in a file, in which case there's nothing else to scan past raw_end for a terminator (nor is there any need to do so). So we consume only the initial stream, but since that one includes the end of the file, we passthrough all the remaining chunks until end of file (EOF) is reached.
There was a problem hiding this comment.
Thanks!
How about changing it to this ?
... and simply streams until EOF is reached
I am not native English speaker and "verb to EOF" does not sound correct to me.
There was a problem hiding this comment.
you're right, until is the right word here
Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com>
Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com>
| terminator: u8, | ||
| /// Effective end boundary. Set to `u64::MAX` when `end >= file_size` | ||
| /// (last partition), so `FetchingChunks` never transitions to | ||
| /// `ScanningLastTerminator` and simply streams to EOF. |
There was a problem hiding this comment.
Thanks!
How about changing it to this ?
... and simply streams until EOF is reached
I am not native English speaker and "verb to EOF" does not sound correct to me.
| ) | ||
| .await?; | ||
|
|
||
| // Last partition reads to EOF — no end-boundary scanning needed. |
There was a problem hiding this comment.
// Last partition reads until EOF is reached — no end-boundary scanning needed.
| let pos_after = this.abs_pos(); | ||
|
|
||
| // When end == u64::MAX (last partition), this is always | ||
| // true and we stream straight through to EOF. |
There was a problem hiding this comment.
// true and we stream straight through until EOF is reached.
| async fn test_no_trailing_newline() { | ||
| // Last partition of a file that does not end with a newline. | ||
| // end >= file_size → this.end = u64::MAX, so Passthrough streams | ||
| // straight to EOF and yields the final incomplete line as-is. |
There was a problem hiding this comment.
// straight until EOF is reached and yields the final incomplete line as-is.
|
thanks for reviewing, @martin-g ! |
|
@alamb can this be merged or is there something more you'd like me to do? |
|
Checking it out... |
Yes, this is the kind of thing I am worried about -- that there is some code churn but it does not
One thing we could use potentially is this change from @Dandandan that can simulate high latency storess Another thing we could potentially do is to use the clickhouse benchmark dataset: And put it somewhere on object store 🤔 and show the performance improvement wget https://datasets.clickhouse.com/hits_compatible/hits.json.gz
gunzip hits.json.gz |
alamb
left a comment
There was a problem hiding this comment.
Thank you @ariel-miculas and @martin-g -- I think this is a nice written PR and a good approach to the problem.
In my opinion the only thing missing is some sort of test that ensures that there are fewer object store requests than before, and that future refactors don't accidentally change the pattern again.
I think you can find an example of such tests for the CSV source in https://github.com/apache/datafusion/blob/73fbd4807011ee102c9db67d40a511d0ece8c65a/datafusion/core/tests/datasource/object_store_access.rs#L27-L26
Would it be possible to add the equivalent tests for the json reader so that we are confident that we will catch any regressions / changes in object store access patterns?
I am also going to do some local testing with the clickbench json file and see if I can find a performance difference too
I agree with @alamb that an object-store access regression test would make this much safer to merge. A good minimal test might be a JSON equivalent of the CSV access-pattern test: repartition a single NDJSON file, run a simple query, and assert we only issue the expected ranged GETs for the partitions, without extra boundary-probing requests. |
alamb
left a comment
There was a problem hiding this comment.
Looks great to me -- thank you @ariel-miculas and @Weijun-H and @martin-g
| ); | ||
| } | ||
|
|
||
| #[tokio::test] |
There was a problem hiding this comment.
these tests are probably not strictly necessary as they cover the same behavior as is tested in the parquet opener. However, I think having the additional coverage is a good idea in case we ever specialize the opening process more.
| } | ||
|
|
||
| /// Test that a JSON file split into byte ranges via repartitioning produces | ||
| /// exactly one GET request per byte range — no extra requests for boundary seeking. |
| /// which issues exactly one bounded `get_opts` call, so there are 3 data GETs | ||
| /// plus 1 HEAD (to determine file size) = **4 total**. | ||
| /// | ||
| /// This differs from the CSV reader, which needs multiple GETs per range. |
There was a problem hiding this comment.
This would be a nice follow on PR (add CSV tests to document the current behavior). Maybe someone wants to do that
| /// | ||
| /// This test documents the current request pattern to catch regressions. | ||
| #[tokio::test] | ||
| async fn query_json_file_with_byte_range_partitions() { |
There was a problem hiding this comment.
I ran this test locally without the code changes in this PR and it fails like this (as expected):
• === query_json_file_with_byte_range_partitions ===
Finished `test` profile [unoptimized + debuginfo] target(s) in 0.15s
Running tests/core_integration.rs (target/debug/deps/core_integration-13d29de1dea6b31c)
running 1 test
test datasource::object_store_access::query_json_file_with_byte_range_partitions ... FAILED
failures:
---- datasource::object_store_access::query_json_file_with_byte_range_partitions stdout ----
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ Snapshot Summary ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Snapshot: query_json_file_with_byte_range_partitions
Source: datafusion/core/tests/datasource/object_store_access.rs:720
────────────────────────────────────────────────────────────────────────────────
Expression: test.query("select * from json_range_table").await
────────────────────────────────────────────────────────────────────────────────
-old snapshot
+new results
────────────┬───────────────────────────────────────────────────────────────────
10 10 │ | 0.00006 | 6e-12 | true |
11 11 │ +---------+-------+------+
12 12 │ ------- Object Store Request Summary -------
13 13 │ RequestCountingObjectStore()
14 │-Total Requests: 4
14 │+Total Requests: 8
15 15 │ - GET (opts) path=json_range_table.json head=true
16 │-- GET (opts) path=json_range_table.json range=0-216
17 16 │ - GET (opts) path=json_range_table.json range=71-216
18 │-- GET (opts) path=json_range_table.json range=143-216
17 │+- GET (opts) path=json_range_table.json range=0-72
18 │+- GET (opts) path=json_range_table.json range=71-216
19 │+- GET (opts) path=json_range_table.json range=143-216
20 │+- GET (opts) path=json_range_table.json range=72-144
21 │+- GET (opts) path=json_range_table.json range=143-216
22 │+- GET (opts) path=json_range_table.json range=144-216
────────────┴───────────────────────────────────────────────────────────────────
To update snapshots run `cargo insta review`
Stopped on the first failure. Run `cargo insta test` to run all snapshots.
thread 'datasource::object_store_access::query_json_file_with_byte_range_partitions' (24436221) panicked at /Users/andrewlamb/.cargo/registry/src/index.crates.io-
1949cf8c6b5b557f/insta-1.47.2/src/runtime.rs:719:13:
snapshot assertion for 'query_json_file_with_byte_range_partitions' failed in line 720
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
failures:
datasource::object_store_access::query_json_file_with_byte_range_partitions
test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 934 filtered out; finished in 0.13s
error: test failed, to rerun pass `-p datafusion --test core_integration`
Codex summarized this nicely:
- PR expectation: 4 requests total
- head=true
- range=0-216
- range=71-216
- range=143-216
- main actual: 8 requests total
- head=true
- range=71-216
- range=0-72
- range=71-216
- range=143-216
- range=72-144
- range=143-216
- range=144-216
|
I ran some tests with clickbench, reading from local files is worse: The issue is the |
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #. ## Rationale for this change This is an alternative approach to - apache#19687 Instead of reading the entire range in the json FileOpener, implement an AlignedBoundaryStream which scans the range for newlines as the FileStream requests data from the stream, by wrapping the original stream returned by the ObjectStore. This eliminated the overhead of the extra two get_opts requests needed by calculate_range and more importantly, it allows for efficient read-ahead implementations by the underlying ObjectStore. Previously this was inefficient because the streams opened by calculate_range included a stream from `(start - 1)` to file_size and another one from `(end - 1)` to end_of_file, just to find the two relevant newlines. ## What changes are included in this PR? Added the AlignedBoundaryStream which wraps a stream returned by the object store and finds the delimiting newlines for a particular file range. Notably it doesn't do any standalone reads (unlike the calculate_range function), eliminating two calls to get_opts. ## Are these changes tested? Yes, added unit tests. <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? No --------- Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Which issue does this PR close?
Rationale for this change
This is an alternative approach to
Instead of reading the entire range in the json FileOpener, implement an
AlignedBoundaryStream which scans the range for newlines as the FileStream
requests data from the stream, by wrapping the original stream returned by the
ObjectStore.
This eliminated the overhead of the extra two get_opts requests needed by
calculate_range and more importantly, it allows for efficient read-ahead
implementations by the underlying ObjectStore. Previously this was inefficient
because the streams opened by calculate_range included a stream from
(start - 1)to file_size and another one from(end - 1)to end_of_file, just tofind the two relevant newlines.
What changes are included in this PR?
Added the AlignedBoundaryStream which wraps a stream returned by the object
store and finds the delimiting newlines for a particular file range. Notably it doesn't
do any standalone reads (unlike the calculate_range function), eliminating two calls
to get_opts.
Are these changes tested?
Yes, added unit tests.
Are there any user-facing changes?
No