Skip to content

fix: json scan performance on local files#21478

Merged
alamb merged 1 commit intoapache:mainfrom
ariel-miculas:fix-local-json-performance
Apr 15, 2026
Merged

fix: json scan performance on local files#21478
alamb merged 1 commit intoapache:mainfrom
ariel-miculas:fix-local-json-performance

Conversation

@ariel-miculas
Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Rationale for this change

The into_stream() implementation of GetResult (from arrow-rs-objectstore) fetches every 8KiB chunk using a spawn_blocking() task, resulting in a lot of scheduling overhead.

Fix this by reading the data directly from the async context, using a buffer size of 8KiBs. This avoids any context switch.

What changes are included in this PR?

Are these changes tested?

Validated that the initial reported overhead is now much smaller:
Comparing json-test-on-main and test-json-improvement
--------------------
Benchmark clickbench_2.json
--------------------
┏━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query     ┃ json-test-on-main ┃ test-json-improvement ┃       Change ┃
┡━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 0  │        2421.62 ms │            2521.19 ms │    no change │
│ QQuery 1  │        2584.29 ms │            2729.98 ms │ 1.06x slower │
│ QQuery 2  │        2662.11 ms │            2782.29 ms │    no change │
│ QQuery 3  │              FAIL │                  FAIL │ incomparable │
│ QQuery 4  │        2764.78 ms │            2896.46 ms │    no change │
│ QQuery 5  │        2676.46 ms │            2758.01 ms │    no change │
│ QQuery 6  │              FAIL │                  FAIL │ incomparable │
│ QQuery 7  │        2684.50 ms │            2752.37 ms │    no change │
│ QQuery 8  │        2781.21 ms │            2827.46 ms │    no change │
│ QQuery 9  │        3039.17 ms │            3165.29 ms │    no change │
│ QQuery 10 │        2791.32 ms │            2843.44 ms │    no change │
│ QQuery 11 │        2839.05 ms │            3011.84 ms │ 1.06x slower │
│ QQuery 12 │        2691.51 ms │            2839.97 ms │ 1.06x slower │
│ QQuery 13 │        2768.57 ms │            2860.68 ms │    no change │
│ QQuery 14 │        2712.50 ms │            2856.80 ms │ 1.05x slower │
│ QQuery 15 │        2807.64 ms │            2888.94 ms │    no change │
│ QQuery 16 │        2774.87 ms │            2875.44 ms │    no change │
│ QQuery 17 │        2797.28 ms │            2850.17 ms │    no change │
│ QQuery 18 │        3017.75 ms │            3111.64 ms │    no change │
│ QQuery 19 │        2801.30 ms │            2927.25 ms │    no change │
│ QQuery 20 │        2743.43 ms │            2862.10 ms │    no change │
│ QQuery 21 │        2811.41 ms │            2906.42 ms │    no change │
│ QQuery 22 │        2953.66 ms │            3038.23 ms │    no change │
│ QQuery 23 │              FAIL │                  FAIL │ incomparable │
│ QQuery 24 │        2862.27 ms │            2940.31 ms │    no change │
│ QQuery 25 │        2763.40 ms │            2848.55 ms │    no change │
│ QQuery 26 │        2840.39 ms │            2950.47 ms │    no change │
│ QQuery 27 │        2886.70 ms │            2921.28 ms │    no change │
│ QQuery 28 │        3145.39 ms │            3221.27 ms │    no change │
│ QQuery 29 │        2821.87 ms │            2869.85 ms │    no change │
│ QQuery 30 │        2953.55 ms │            2990.15 ms │    no change │
│ QQuery 31 │        2997.81 ms │            3049.28 ms │    no change │
│ QQuery 32 │        2969.14 ms │            3126.79 ms │ 1.05x slower │
│ QQuery 33 │        2764.80 ms │            2866.63 ms │    no change │
│ QQuery 34 │        2828.77 ms │            2848.54 ms │    no change │
│ QQuery 35 │        2812.55 ms │            2793.79 ms │    no change │
│ QQuery 36 │              FAIL │                  FAIL │ incomparable │
│ QQuery 37 │              FAIL │                  FAIL │ incomparable │
│ QQuery 38 │              FAIL │                  FAIL │ incomparable │
│ QQuery 39 │              FAIL │                  FAIL │ incomparable │
│ QQuery 40 │              FAIL │                  FAIL │ incomparable │
│ QQuery 41 │              FAIL │                  FAIL │ incomparable │
│ QQuery 42 │              FAIL │                  FAIL │ incomparable │
└───────────┴───────────────────┴───────────────────────┴──────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                    ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (json-test-on-main)       │ 92771.07ms │
│ Total Time (test-json-improvement)   │ 95732.89ms │
│ Average Time (json-test-on-main)     │  2811.24ms │
│ Average Time (test-json-improvement) │  2901.00ms │
│ Queries Faster                       │          0 │
│ Queries Slower                       │          5 │
│ Queries with No Change               │         28 │
│ Queries with Failure                 │         10 │
└──────────────────────────────────────┴────────────┘

and with SIMULATE_LATENCY:

--------------------
Benchmark clickbench_2.json
--------------------
┏━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query     ┃ json-test-on-main ┃ test-json-improvement ┃        Change ┃
┡━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0  │        2795.68 ms │            2687.68 ms │     no change │
│ QQuery 1  │        2880.50 ms │            2768.30 ms │     no change │
│ QQuery 2  │        2960.75 ms │            2826.89 ms │     no change │
│ QQuery 3  │              FAIL │                  FAIL │  incomparable │
│ QQuery 4  │        3140.38 ms │            2963.15 ms │ +1.06x faster │
│ QQuery 5  │        2926.66 ms │            2830.43 ms │     no change │
│ QQuery 6  │              FAIL │                  FAIL │  incomparable │
│ QQuery 7  │        3026.29 ms │            2858.30 ms │ +1.06x faster │
│ QQuery 8  │        4302.35 ms │            2954.96 ms │ +1.46x faster │
│ QQuery 9  │        4439.83 ms │            3200.43 ms │ +1.39x faster │
│ QQuery 10 │        3028.32 ms │            2969.32 ms │     no change │
│ QQuery 11 │        3147.81 ms │            3040.74 ms │     no change │
│ QQuery 12 │        4169.45 ms │            2886.59 ms │ +1.44x faster │
│ QQuery 13 │        3839.01 ms │            2997.80 ms │ +1.28x faster │
│ QQuery 14 │        4086.30 ms │            2907.42 ms │ +1.41x faster │
│ QQuery 15 │        4308.07 ms │            3025.22 ms │ +1.42x faster │
│ QQuery 16 │        3084.89 ms │            2984.34 ms │     no change │
│ QQuery 17 │        4287.89 ms │            2984.27 ms │ +1.44x faster │
│ QQuery 18 │        3542.80 ms │            3144.98 ms │ +1.13x faster │
│ QQuery 19 │        4388.70 ms │            3014.37 ms │ +1.46x faster │
│ QQuery 20 │        3149.54 ms │            2986.73 ms │ +1.05x faster │
│ QQuery 21 │        3250.81 ms │            2906.60 ms │ +1.12x faster │
│ QQuery 22 │        3265.98 ms │            3122.25 ms │     no change │
│ QQuery 23 │              FAIL │                  FAIL │  incomparable │
│ QQuery 24 │        3066.52 ms │            2997.55 ms │     no change │
│ QQuery 25 │        4289.31 ms │            2884.22 ms │ +1.49x faster │
│ QQuery 26 │        4223.03 ms │            2933.16 ms │ +1.44x faster │
│ QQuery 27 │        3156.86 ms │            3001.17 ms │     no change │
│ QQuery 28 │        4831.42 ms │            3318.89 ms │ +1.46x faster │
│ QQuery 29 │        3252.45 ms │            4375.90 ms │  1.35x slower │
│ QQuery 30 │        4460.06 ms │            3153.77 ms │ +1.41x faster │
│ QQuery 31 │        4235.85 ms │            3171.58 ms │ +1.34x faster │
│ QQuery 32 │        3435.14 ms │            3202.64 ms │ +1.07x faster │
│ QQuery 33 │        3147.21 ms │            3031.54 ms │     no change │
│ QQuery 34 │        4378.41 ms │            3008.79 ms │ +1.46x faster │
│ QQuery 35 │        4224.36 ms │            2897.53 ms │ +1.46x faster │
│ QQuery 36 │              FAIL │                  FAIL │  incomparable │
│ QQuery 37 │              FAIL │                  FAIL │  incomparable │
│ QQuery 38 │              FAIL │                  FAIL │  incomparable │
│ QQuery 39 │              FAIL │                  FAIL │  incomparable │
│ QQuery 40 │              FAIL │                  FAIL │  incomparable │
│ QQuery 41 │              FAIL │                  FAIL │  incomparable │
│ QQuery 42 │              FAIL │                  FAIL │  incomparable │
└───────────┴───────────────────┴───────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓
┃ Benchmark Summary                    ┃             ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩
│ Total Time (json-test-on-main)       │ 120722.63ms │
│ Total Time (test-json-improvement)   │ 100037.48ms │
│ Average Time (json-test-on-main)     │   3658.26ms │
│ Average Time (test-json-improvement) │   3031.44ms │
│ Queries Faster                       │          21 │
│ Queries Slower                       │           1 │
│ Queries with No Change               │          11 │
│ Queries with Failure                 │          10 │
└──────────────────────────────────────┴─────────────┘

For the tests I've used a c7a.16xlarge ec2 instance, with a trimmed down version of hits.json to 51G (original has 217 GiB), with a warm cache (by running cat hits_50.json > /dev/null)

Are there any user-facing changes?

No

The into_stream() implementation of GetResult (from
arrow-rs-objectstore) fetches every 8KiB chunk using a spawn_blocking()
task, resulting in a lot of scheduling overhead.

Fix this by reading the data directly from the async context, using a
buffer size of 8KiBs. This avoids any context switch.
@github-actions github-actions bot added the datasource Changes to the datasource crate label Apr 8, 2026
@ariel-miculas
Copy link
Copy Markdown
Contributor Author

@Weijun-H please take a look

};
let result = store.get_opts(&location, opts).await?;

#[cfg(not(target_arch = "wasm32"))]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like it may be improved upstream as well? Doing a spawn_blocking for each (small) read seems not great.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#[cfg(not(target_arch = "wasm32"))]
if let GetResultPayload::File(mut file, _path) = result.payload {
use std::io::{Read, Seek, SeekFrom};
const CHUNK_SIZE: u64 = 8 * 1024;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this ideal? Maybe a bit bigger gets higher throughput?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've picked this value because that's the default for both

  • BufReader (which is used when reading the entire json file as shown below)
                GetResultPayload::File(file, _) => {
                    let bytes = file_compression_type.convert_read(file)?;

                    if newline_delimited {
                        // NDJSON: use BufReader directly
                        let reader = BufReader::new(bytes);
                        let arrow_reader = ReaderBuilder::new(schema)
                            .with_batch_size(batch_size)
                            .build(reader)?;

                        Ok(futures::stream::iter(arrow_reader)
                            .map(|r| r.map_err(Into::into))
                            .boxed())
                    }
  • object_store's GetResult.into_stream:
    pub fn into_stream(self) -> BoxStream<'static, Result<Bytes>> {
        match self.payload {
            #[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
            GetResultPayload::File(file, path) => {
                const CHUNK_SIZE: usize = 8 * 1024;
                local::chunked_stream(file, path, self.range, CHUNK_SIZE)
            }
            GetResultPayload::Stream(s) => s,
        }
    }

I think this value should be kept in sync with the first bullet point (where we read the entire json file).

flarion-weijun
flarion-weijun approved these changes Apr 9, 2026
Copy link
Copy Markdown
Member

@Weijun-H Weijun-H left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @ariel-miculas

One suggestion before merge. Can we add a small test that exercises the LocalFileSystem / GetResultPayload::File path? The current tests seem to cover the stream-based path only, while this change is specifically for local files.

@Dandandan
Copy link
Copy Markdown
Contributor

and with SIMULATE_LATENCY:

Nice use of the feature :)

Copy link
Copy Markdown
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

THank you for this @ariel-miculas @Dandandan and @Weijun-H

I agree with @Weijun-H 's suggestion for testing

Can you possible make a PR to add the benchmark you are using? I would love to see if we can help optimize this more

let read =
(&mut file)
.take(to_read)
.read_to_end(&mut buf)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is blocking IO on a tokio task right?

}
})?;

return Ok(futures::stream::try_unfold(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like the upstream object store tries to do the same thing

https://github.com/apache/arrow-rs-object-store/blob/v0.13.2/src/lib.rs#L1636-L1701

Which then calls local stream: https://github.com/apache/arrow-rs-object-store/blob/main/src/local.rs#L926

The major difference is that in object_store the work is done on a spawn_blocking thread rather than inline.

I am a bit worried about doing blocking IO on the main thread here

I wonder if we could try the same approach as done upstream and run this on a different thread, but to Dandan's point:

  1. Use a larger buffer for the read (e.g. 256k, then slice to 8k for the output)
  2. buffer some of the IO to minimize the overhead (e.g. use StreamExt::buffered

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: after some more review, it seems like the current GetResult::File path does blocking IO as well so this isn't a regression

🤷

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, I implemented blocking IO on the GetResult::File path because that's how it worked previously. Other approaches I've considered:

  • block_in_place - it panics with single-threaded tokio runtimes
  • a single spawn_blocking thread with an mpsc channel - this could work but not as straightforward, it requires more careful design, see Remove waits from blocking threads reading spill files. #15654
  • using tokio's ReaderStream - this would be similar to the existing into_stream approach, but the default buffer size (in tokio-1.50.0/src/io/blocking.rs) is 2MiB DEFAULT_MAX_BUF_SIZE: usize = 2 * 1024 * 1024; it still involves context switching overhead and it requires importing additional libraries, such as tokio-util and the tokio fs feature
    #[cfg(not(target_arch = "wasm32"))]
    if let GetResultPayload::File(file, _path) = result.payload {
        use std::io::SeekFrom;
        use tokio::io::{AsyncReadExt, AsyncSeekExt, BufReader};
        let mut tokio_file: tokio::fs::File = tokio::fs::File::from_std(file);
        tokio_file
            .seek(SeekFrom::Start(range.start))
            .await
            .map_err(|e| object_store::Error::Generic {
                store: "local",
                source: Box::new(e),
            })?;
        const BUF_SIZE: usize = 4 * 1024 * 1024; // 4 MiB
        let limited = tokio_file.take(range.end - range.start);
        return Ok(tokio_util::io::ReaderStream::with_capacity(
            BufReader::with_capacity(BUF_SIZE, limited),
            BUF_SIZE,
        )
        .map_err(|e| object_store::Error::Generic {
            store: "local",
            source: Box::new(e),
        })
        .boxed());
    }
  • keep using into_stream but increase the buffer size (from the original 8KiB) - this would significantly reduce the number of context switches

I think there are better approaches, but since the original code was doing blocking IO in an async context (which it probably shouldn't do), it would be difficult to achieve the same performance with any of these other approaches.

@alamb
Copy link
Copy Markdown
Contributor

alamb commented Apr 15, 2026

Thanks again @ariel-miculas and everyone

Merged via the queue into apache:main with commit 7b2f284 Apr 15, 2026
31 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

datasource Changes to the datasource crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Regression in json performance for local files

5 participants