Skip to content

perf: optimize object store requests when reading JSON#20823

Merged
alamb merged 12 commits intoapache:mainfrom
ariel-miculas:improve-json-read
Apr 7, 2026
Merged

perf: optimize object store requests when reading JSON#20823
alamb merged 12 commits intoapache:mainfrom
ariel-miculas:improve-json-read

Conversation

@ariel-miculas
Copy link
Copy Markdown
Contributor

@ariel-miculas ariel-miculas commented Mar 9, 2026

Which issue does this PR close?

  • Closes #.

Rationale for this change

This is an alternative approach to

Instead of reading the entire range in the json FileOpener, implement an
AlignedBoundaryStream which scans the range for newlines as the FileStream
requests data from the stream, by wrapping the original stream returned by the
ObjectStore.

This eliminated the overhead of the extra two get_opts requests needed by
calculate_range and more importantly, it allows for efficient read-ahead
implementations by the underlying ObjectStore. Previously this was inefficient
because the streams opened by calculate_range included a stream from
(start - 1) to file_size and another one from (end - 1) to end_of_file, just to
find the two relevant newlines.

What changes are included in this PR?

Added the AlignedBoundaryStream which wraps a stream returned by the object
store and finds the delimiting newlines for a particular file range. Notably it doesn't
do any standalone reads (unlike the calculate_range function), eliminating two calls
to get_opts.

Are these changes tested?

Yes, added unit tests.

Are there any user-facing changes?

No

@ariel-miculas
Copy link
Copy Markdown
Contributor Author

adding @Weijun-H and @martin-g since you've discussed the previous PR

@github-actions github-actions bot added the datasource Changes to the datasource crate label Mar 9, 2026
@ariel-miculas
Copy link
Copy Markdown
Contributor Author

a bit ironic

Run ci/scripts/typos_check.sh
[typos_check.sh] `typos --config typos.toml`
error: `tpos` should be `typos`
    ╭▸ ./datafusion/datasource-json/src/boundary_stream.rs:279:41
    │
279 │ …                     if let Some(tpos) =
    ╰╴                                  ━━━━
error: `tpos` should be `typos`
    ╭▸ ./datafusion/datasource-json/src/boundary_stream.rs:283:74
    │
283 │ …                     return Poll::Ready(Some(Ok(chunk.slice(..tpos + 1))));
    ╰╴                                                               ━━━━

This is an alternative approach to
apache#19687

Instead of reading the entire range in the json FileOpener, implement an
AlignedBoundaryStream which scans the range for newlines as the
FileStream requests data from the stream, by wrapping the original
stream returned by the ObjectStore.

This eliminated the overhead of the extra two get_opts requests needed
by calculate_range and more importantly, it allows for efficient
read-ahead implementations by the underlying ObjectStore. Previously
this was inefficient because the streams opened by calculate_range
included a stream from (start - 1) to file_size and another one from
(end - 1) to end_of_file, just to find the two relevant newlines.
@ariel-miculas
Copy link
Copy Markdown
Contributor Author

Who can help me with reviews? @martin-g @alamb

@alamb
Copy link
Copy Markdown
Contributor

alamb commented Mar 19, 2026

Thanks for this PR @ariel-miculas

Do you have any benchmark results for this change? Even some example queries

@Weijun-H do you know of any benchmarks to run?

@ariel-miculas
Copy link
Copy Markdown
Contributor Author

No, I'm having troubles coming up with a realistic benchmark.

The previous benchmark https://github.com/apache/datafusion/pull/19687/changes#diff-5358b38b6265d769b66b614f7ba88ed9320f7a9fce5197330b7c01c2a8a3ed3b incorrectly assumes that all the requested bytes (via get_opts) will be read, while you can actually request a 10GiB stream of bytes and read only 16KiB from it.

As a result, the benchmark of the previous PR for reducing the read amplification shows impressive improvements, but it hides the fact that it breaks the parallelization between data fetching and json decoding (by doing all the data fetching in the JsonOpener instead of allowing FileStream to do its magic).

So I'm not sure how to write a benchmark that can prove at the same time that:

  • I'm increasing performance (because there are no more read requests in the JsonOpener)
  • This solution is better than the original proposal perf: reduce read amplification for partitioned JSON file scanning #19687 because it doesn't break parallelization between fetching and decoding
  • This optimization is relevant for real-world object store implementations (where network latency matters, network speed matters, data computation can happen while waiting for bytes to be read, read-ahead is a relevant optimization etc.)

@martin-g martin-g self-requested a review March 20, 2026 03:02
terminator: u8,
/// Effective end boundary. Set to `u64::MAX` when `end >= file_size`
/// (last partition), so `FetchingChunks` never transitions to
/// `ScanningLastTerminator` and simply streams to EOF.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... streams to EOF is not clear to me. What do you mean ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means we passthrough all the chunks to the json decoder (the caller which polls AlignedBoundaryStream), staying in the FetchingChunks phase until we consume the entire inner stream; this only happens when raw_end >= file_size, i.e. for the last file range in a file, in which case there's nothing else to scan past raw_end for a terminator (nor is there any need to do so). So we consume only the initial stream, but since that one includes the end of the file, we passthrough all the remaining chunks until end of file (EOF) is reached.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

How about changing it to this ?

... and simply streams until EOF is reached

I am not native English speaker and "verb to EOF" does not sound correct to me.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right, until is the right word here

ariel-miculas and others added 4 commits March 20, 2026 11:29
Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com>
Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com>
terminator: u8,
/// Effective end boundary. Set to `u64::MAX` when `end >= file_size`
/// (last partition), so `FetchingChunks` never transitions to
/// `ScanningLastTerminator` and simply streams to EOF.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

How about changing it to this ?

... and simply streams until EOF is reached

I am not native English speaker and "verb to EOF" does not sound correct to me.

)
.await?;

// Last partition reads to EOF — no end-boundary scanning needed.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Last partition reads until EOF is reached — no end-boundary scanning needed.

let pos_after = this.abs_pos();

// When end == u64::MAX (last partition), this is always
// true and we stream straight through to EOF.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// true and we stream straight through until EOF is reached.

async fn test_no_trailing_newline() {
// Last partition of a file that does not end with a newline.
// end >= file_size → this.end = u64::MAX, so Passthrough streams
// straight to EOF and yields the final incomplete line as-is.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// straight until EOF is reached and yields the final incomplete line as-is.

@ariel-miculas
Copy link
Copy Markdown
Contributor Author

thanks for reviewing, @martin-g !

@ariel-miculas
Copy link
Copy Markdown
Contributor Author

@alamb can this be merged or is there something more you'd like me to do?

@alamb
Copy link
Copy Markdown
Contributor

alamb commented Apr 4, 2026

Checking it out...

@alamb
Copy link
Copy Markdown
Contributor

alamb commented Apr 4, 2026

No, I'm having troubles coming up with a realistic benchmark.

The previous benchmark https://github.com/apache/datafusion/pull/19687/changes#diff-5358b38b6265d769b66b614f7ba88ed9320f7a9fce5197330b7c01c2a8a3ed3b incorrectly assumes that all the requested bytes (via get_opts) will be read, while you can actually request a 10GiB stream of bytes and read only 16KiB from it.

As a result, the benchmark of the previous PR for reducing the read amplification shows impressive improvements, but it hides the fact that it breaks the parallelization between data fetching and json decoding (by doing all the data fetching in the JsonOpener instead of allowing FileStream to do its magic).

Yes, this is the kind of thing I am worried about -- that there is some code churn but it does not

  • I'm increasing performance (because there are no more read requests in the JsonOpener)
  • This optimization is relevant for real-world object store implementations (where network latency matters, network speed matters, data computation can happen while waiting for bytes to be read, read-ahead is a relevant optimization etc.)

One thing we could use potentially is this change from @Dandandan that can simulate high latency storess

Another thing we could potentially do is to use the clickhouse benchmark dataset:
https://datasets.clickhouse.com/hits_compatible/hits.json.gz

And put it somewhere on object store 🤔 and show the performance improvement

wget https://datasets.clickhouse.com/hits_compatible/hits.json.gz
gunzip hits.json.gz

@alamb alamb added the performance Make DataFusion faster label Apr 4, 2026
@alamb alamb changed the title perf: improve json read perf: optimize object store requests when reading JSON Apr 4, 2026
Copy link
Copy Markdown
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @ariel-miculas and @martin-g -- I think this is a nice written PR and a good approach to the problem.

In my opinion the only thing missing is some sort of test that ensures that there are fewer object store requests than before, and that future refactors don't accidentally change the pattern again.

I think you can find an example of such tests for the CSV source in https://github.com/apache/datafusion/blob/73fbd4807011ee102c9db67d40a511d0ece8c65a/datafusion/core/tests/datasource/object_store_access.rs#L27-L26

Would it be possible to add the equivalent tests for the json reader so that we are confident that we will catch any regressions / changes in object store access patterns?

I am also going to do some local testing with the clickbench json file and see if I can find a performance difference too

@Weijun-H
Copy link
Copy Markdown
Member

Weijun-H commented Apr 4, 2026

Thank you @ariel-miculas and @martin-g -- I think this is a nice written PR and a good approach to the problem.

In my opinion the only thing missing is some sort of test that ensures that there are fewer object store requests than before, and that future refactors don't accidentally change the pattern again.

I think you can find an example of such tests for the CSV source in 73fbd48/datafusion/core/tests/datasource/object_store_access.rs#L27-L26

Would it be possible to add the equivalent tests for the json reader so that we are confident that we will catch any regressions / changes in object store access patterns?

I am also going to do some local testing with the clickbench json file and see if I can find a performance difference too

I agree with @alamb that an object-store access regression test would make this much safer to merge.

A good minimal test might be a JSON equivalent of the CSV access-pattern test: repartition a single NDJSON file, run a simple query, and assert we only issue the expected ranged GETs for the partitions, without extra boundary-probing requests.

Copy link
Copy Markdown
Member

@Weijun-H Weijun-H left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Waiting on @alamb for another look

Copy link
Copy Markdown
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great to me -- thank you @ariel-miculas and @Weijun-H and @martin-g

);
}

#[tokio::test]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these tests are probably not strictly necessary as they cover the same behavior as is tested in the parquet opener. However, I think having the additional coverage is a good idea in case we ever specialize the opening process more.

}

/// Test that a JSON file split into byte ranges via repartitioning produces
/// exactly one GET request per byte range — no extra requests for boundary seeking.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

/// which issues exactly one bounded `get_opts` call, so there are 3 data GETs
/// plus 1 HEAD (to determine file size) = **4 total**.
///
/// This differs from the CSV reader, which needs multiple GETs per range.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a nice follow on PR (add CSV tests to document the current behavior). Maybe someone wants to do that

///
/// This test documents the current request pattern to catch regressions.
#[tokio::test]
async fn query_json_file_with_byte_range_partitions() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran this test locally without the code changes in this PR and it fails like this (as expected):

• === query_json_file_with_byte_range_partitions ===
      Finished `test` profile [unoptimized + debuginfo] target(s) in 0.15s
       Running tests/core_integration.rs (target/debug/deps/core_integration-13d29de1dea6b31c)

  running 1 test
  test datasource::object_store_access::query_json_file_with_byte_range_partitions ... FAILED

  failures:

  ---- datasource::object_store_access::query_json_file_with_byte_range_partitions stdout ----
  ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ Snapshot Summary ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
  Snapshot: query_json_file_with_byte_range_partitions
  Source: datafusion/core/tests/datasource/object_store_access.rs:720
  ────────────────────────────────────────────────────────────────────────────────
  Expression: test.query("select * from json_range_table").await
  ────────────────────────────────────────────────────────────────────────────────
  -old snapshot
  +new results
  ────────────┬───────────────────────────────────────────────────────────────────
     10    10 │ | 0.00006 | 6e-12 | true |
     11    11 │ +---------+-------+------+
     12    12 │ ------- Object Store Request Summary -------
     13    13 │ RequestCountingObjectStore()
     14       │-Total Requests: 4
           14 │+Total Requests: 8
     15    15 │ - GET  (opts) path=json_range_table.json head=true
     16       │-- GET  (opts) path=json_range_table.json range=0-216
     17    16 │ - GET  (opts) path=json_range_table.json range=71-216
     18       │-- GET  (opts) path=json_range_table.json range=143-216
           17 │+- GET  (opts) path=json_range_table.json range=0-72
           18 │+- GET  (opts) path=json_range_table.json range=71-216
           19 │+- GET  (opts) path=json_range_table.json range=143-216
           20 │+- GET  (opts) path=json_range_table.json range=72-144
           21 │+- GET  (opts) path=json_range_table.json range=143-216
           22 │+- GET  (opts) path=json_range_table.json range=144-216
  ────────────┴───────────────────────────────────────────────────────────────────
  To update snapshots run `cargo insta review`
  Stopped on the first failure. Run `cargo insta test` to run all snapshots.

  thread 'datasource::object_store_access::query_json_file_with_byte_range_partitions' (24436221) panicked at /Users/andrewlamb/.cargo/registry/src/index.crates.io-
  1949cf8c6b5b557f/insta-1.47.2/src/runtime.rs:719:13:
  snapshot assertion for 'query_json_file_with_byte_range_partitions' failed in line 720
  note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace


  failures:
      datasource::object_store_access::query_json_file_with_byte_range_partitions

  test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 934 filtered out; finished in 0.13s

  error: test failed, to rerun pass `-p datafusion --test core_integration`

Codex summarized this nicely:

  • PR expectation: 4 requests total
    • head=true
    • range=0-216
    • range=71-216
    • range=143-216
  • main actual: 8 requests total
    • head=true
    • range=71-216
    • range=0-72
    • range=71-216
    • range=143-216
    • range=72-144
    • range=143-216
    • range=144-216

@alamb alamb added this pull request to the merge queue Apr 7, 2026
Merged via the queue into apache:main with commit 8a48a87 Apr 7, 2026
31 checks passed
@ariel-miculas
Copy link
Copy Markdown
Contributor Author

I ran some tests with clickbench, reading from local files is worse:

[ec2-user@ip-172-31-0-185 datafusion]$ ./benchmarks/bench.sh compare json-test-on-main test-json-improvement
Comparing json-test-on-main and test-json-improvement
--------------------
Benchmark clickbench_2.json
--------------------
┏━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query     ┃ json-test-on-main ┃ test-json-improvement ┃        Change ┃
┡━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0  │        2938.54 ms │           36468.92 ms │ 12.41x slower │
│ QQuery 1  │        4189.48 ms │           36706.26 ms │  8.76x slower │
│ QQuery 2  │        3021.24 ms │           36695.04 ms │ 12.15x slower │
│ QQuery 3  │              FAIL │                  FAIL │  incomparable │
│ QQuery 4  │        3518.24 ms │           37016.08 ms │ 10.52x slower │
│ QQuery 5  │        3138.41 ms │           37131.63 ms │ 11.83x slower │
│ QQuery 6  │              FAIL │                  FAIL │  incomparable │
│ QQuery 7  │        4191.68 ms │           36874.60 ms │  8.80x slower │
│ QQuery 8  │        4405.33 ms │           37054.97 ms │  8.41x slower │
│ QQuery 9  │        3473.41 ms │           37308.28 ms │ 10.74x slower │
│ QQuery 10 │        4351.06 ms │           36934.39 ms │  8.49x slower │
│ QQuery 11 │        3306.45 ms │           37101.39 ms │ 11.22x slower │
│ QQuery 12 │        3226.21 ms │           37235.60 ms │ 11.54x slower │
│ QQuery 13 │        3970.11 ms │           37244.27 ms │  9.38x slower │
│ QQuery 14 │        3246.59 ms │           37085.69 ms │ 11.42x slower │
│ QQuery 15 │        4563.53 ms │           37182.89 ms │  8.15x slower │
│ QQuery 16 │        4506.85 ms │           37391.07 ms │  8.30x slower │
│ QQuery 17 │        4377.16 ms │           37381.49 ms │  8.54x slower │
│ QQuery 18 │        3555.18 ms │           37603.25 ms │ 10.58x slower │
│ QQuery 19 │        4568.01 ms │           36996.50 ms │  8.10x slower │
│ QQuery 20 │        3193.87 ms │           37069.19 ms │ 11.61x slower │
│ QQuery 21 │        4415.33 ms │           37185.73 ms │  8.42x slower │
│ QQuery 22 │        3312.73 ms │           37190.81 ms │ 11.23x slower │
│ QQuery 23 │              FAIL │                  FAIL │  incomparable │
│ QQuery 24 │        4382.53 ms │           37093.81 ms │  8.46x slower │
│ QQuery 25 │        4339.69 ms │           37121.90 ms │  8.55x slower │
│ QQuery 26 │        4425.42 ms │           37106.02 ms │  8.38x slower │
│ QQuery 27 │        4505.30 ms │           37059.04 ms │  8.23x slower │
│ QQuery 28 │        3582.82 ms │           37409.12 ms │ 10.44x slower │
│ QQuery 29 │        4440.96 ms │           36868.93 ms │  8.30x slower │
│ QQuery 30 │        4675.71 ms │           37081.23 ms │  7.93x slower │
│ QQuery 31 │        4276.55 ms │           37165.64 ms │  8.69x slower │
│ QQuery 32 │        3615.42 ms │           37662.39 ms │ 10.42x slower │
│ QQuery 33 │        4446.09 ms │           37558.30 ms │  8.45x slower │
│ QQuery 34 │        4521.66 ms │           37647.72 ms │  8.33x slower │
│ QQuery 35 │        4321.41 ms │           37225.06 ms │  8.61x slower │
│ QQuery 36 │              FAIL │                  FAIL │  incomparable │
│ QQuery 37 │              FAIL │                  FAIL │  incomparable │
│ QQuery 38 │              FAIL │                  FAIL │  incomparable │
│ QQuery 39 │              FAIL │                  FAIL │  incomparable │
│ QQuery 40 │              FAIL │                  FAIL │  incomparable │
│ QQuery 41 │              FAIL │                  FAIL │  incomparable │
│ QQuery 42 │              FAIL │                  FAIL │  incomparable │
└───────────┴───────────────────┴───────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Benchmark Summary                    ┃              ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ Total Time (json-test-on-main)       │  131002.95ms │
│ Total Time (test-json-improvement)   │ 1225857.24ms │
│ Average Time (json-test-on-main)     │    3969.79ms │
│ Average Time (test-json-improvement) │   37147.19ms │
│ Queries Faster                       │            0 │
│ Queries Slower                       │           33 │
│ Queries with No Change               │            0 │
│ Queries with Failure                 │           10 │
└──────────────────────────────────────┴──────────────┘

The issue is the into_stream function of objects store's get_result reads data in 8KiB chunks for local files, so we need to either replace it with custom code or use a completely separate path for local files, as it was done previously

Dandandan pushed a commit to Dandandan/arrow-datafusion that referenced this pull request Apr 8, 2026
## Which issue does this PR close?

<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->

- Closes #.

## Rationale for this change

This is an alternative approach to
- apache#19687

Instead of reading the entire range in the json FileOpener, implement an
AlignedBoundaryStream which scans the range for newlines as the
FileStream
requests data from the stream, by wrapping the original stream returned
by the
ObjectStore.

This eliminated the overhead of the extra two get_opts requests needed
by
calculate_range and more importantly, it allows for efficient read-ahead
implementations by the underlying ObjectStore. Previously this was
inefficient
because the streams opened by calculate_range included a stream from 
`(start - 1)` to file_size and another one from `(end - 1)` to
end_of_file, just to
find the two relevant newlines.


## What changes are included in this PR?
Added the AlignedBoundaryStream which wraps a stream returned by the
object
store and finds the delimiting newlines for a particular file range.
Notably it doesn't
do any standalone reads (unlike the calculate_range function),
eliminating two calls
to get_opts.

## Are these changes tested?
Yes, added unit tests.
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code

If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->

## Are there any user-facing changes?
No

---------

Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate datasource Changes to the datasource crate performance Make DataFusion faster

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants