Skip to content

Refactor prefetcher code#818

Open
googlyrahman wants to merge 3 commits into
fsspec:mainfrom
ankitaluthra1:prefetchx
Open

Refactor prefetcher code#818
googlyrahman wants to merge 3 commits into
fsspec:mainfrom
ankitaluthra1:prefetchx

Conversation

@googlyrahman
Copy link
Copy Markdown
Collaborator

@googlyrahman googlyrahman commented Apr 20, 2026

Summary of the changes:

Refactor Prefetcher file:

  • Replaced the passing of individual lambda functions to the PrefetchProducer and PrefetchConsumer with direct object references. This streamlines the initialization process and makes the internal dependency graph much cleaner.

Start prefetching on 3rd read, instead of second

  • Previously, prefetching started on the second read. However, when used in conjunction with caches like readahead_chunked (which generates two underlying requests for a single user read), this triggered false positive signals, causing the engine to prefetch unnecessary extra data. Delaying the start until the third read corrects this behavior.

Disable Prefetching for variable reads with average greater than 64MB

When read chunk sizes are highly variable, the producer fetches data based on a rolling average, forcing the consumer to slice and stitch multiple partial chunks to match the exact user request.

  • For average chunk sizes <= 64MB, this memory assembly is fast, the bottleneck remains network I/O, and prefetching is highly beneficial.
  • For average chunk sizes > 64MB, memory allocation and byte concatenation shift the bottleneck to the CPU, which slows down the entire operation.
Numbers before Change

Before

SCENARIO SUMMARY CONCURRENCY (GB/s) PREFETCHER+CONCURRENCY (GB/s) DIFF
Sequential fixed 64KB 0.03 0.37 +1065.2%
Sequential fixed 1MB 0.17 1.13 +558.7%
Sequential fixed 16MB 1.08 2.04 +89.7%
Sequential fixed 150MB 2.09 2.32 +11.2%
Sequential fixed 256MB 2.32 2.30 -1.0%
Sequential fixed 1GB 2.43 2.41 -0.7%
Sequential fixed 4MB (Below MIN_CHUNK) 0.36 1.44 +293.5%
Sequential fixed 6MB (Above MIN_CHUNK) 0.66 1.59 +140.0%
Sparse Read: Read 1MB, Skip 10MB forward 0.20 0.19 -3.6%
Sparse Read: Read 16MB, Skip 100MB forward 1.22 1.20 -1.5%
Sequential variable (1KB - 1MB) [Micro Jitter] 0.14 1.18 +717.4%
Sequential variable (1MB - 32MB) [Medium Jitter] 1.07 1.94 +80.7%
Sequential variable (64KB - 100MB) [Good Jitter] 1.74 1.63 -6.6%
Sequential variable (64KB - 1GB) [Large Jitter] 2.42 1.16 -51.9%
Sequential variable (54MB - 62MB) [Jitter Below 64MB] 1.67 1.41 -15.9%
Sequential variable (66MB - 74MB) [Jitter Above 64MB] 1.73 1.34 -22.7%
Sequential variable (90MB - 98MB) [Deep into CPU limits] 1.98 1.56 -21.2%
Step Up/Down (10MB to 100MB to 10MB) 1.25 1.74 +39.5%
Step Up/Down (10MB to 150MB to 120MB) 1.57 1.56 -0.4%
Step Up/Down Extreme Drop (1MB to 500MB to 1MB) 0.77 1.33 +73.1%
Pure Seek fixed (1MB) 0.17 0.19 +11.6%
Pure Seek fixed (16MB) 1.12 1.07 -5.0%
Pure Seek variable (64KB - 100MB) 1.71 1.79 +4.7%
30% Seq / 70% Seek (fixed 16MB) 1.09 0.76 -30.6%
50% Seq / 50% Seek (fixed 16MB) 1.12 0.70 -37.2%
70% Seq / 30% Seek (fixed 16MB) 1.14 0.77 -33.0%
90% Seq / 10% Seek (fixed 16MB) 1.06 1.09 +2.8%
50% Seq/Seek variable (64KB - 100MB) 1.58 0.96 -39.2%
Numbers After Change

After

SCENARIO SUMMARY CONCURRENCY (GB/s) PREFETCHER+CONCURRENCY (GB/s) DIFF
Sequential fixed 64KB 0.03 0.37 +1002.9%
Sequential fixed 1MB 0.20 1.28 +524.2%
Sequential fixed 16MB 1.06 2.15 +102.9%
Sequential fixed 150MB 2.09 2.11 +1.3%
Sequential fixed 256MB 2.22 2.31 +3.9%
Sequential fixed 1GB 2.41 2.41 +0.4%
Sequential fixed 4MB (Below MIN_CHUNK) 0.40 1.20 +197.9%
Sequential fixed 6MB (Above MIN_CHUNK) 0.56 1.46 +160.4%
Sparse Read: Read 1MB, Skip 10MB forward 0.21 0.15 -26.2%
Sparse Read: Read 16MB, Skip 100MB forward 1.14 1.17 +3.3%
Sequential variable (1KB - 1MB) [Micro Jitter] 0.14 1.17 +727.9%
Sequential variable (1MB - 32MB) [Medium Jitter] 1.13 1.91 +69.8%
Sequential variable (64KB - 100MB) [Good Jitter] 1.71 1.57 -8.1%
Sequential variable (64KB - 1GB) [Large Jitter] 2.32 2.38 +2.6%
Sequential variable (54MB - 62MB) [Jitter Below 64MB] 1.70 1.49 -11.8%
Sequential variable (66MB - 74MB) [Jitter Above 64MB] 1.69 1.85 +9.1%
Sequential variable (90MB - 98MB) [Deep into CPU limits] 1.91 1.83 -3.9%
Step Up/Down (10MB to 100MB to 10MB) 1.25 1.61 +29.2%
Step Up/Down (10MB to 150MB to 120MB) 1.70 1.37 -19.4%
Step Up/Down Extreme Drop (1MB to 500MB to 1MB) 0.90 1.69 +87.0%
Pure Seek fixed (1MB) 0.19 0.21 +8.5%
Pure Seek fixed (16MB) 1.05 1.04 -0.4%
Pure Seek variable (64KB - 100MB) 1.55 1.71 +10.4%
30% Seq / 70% Seek (fixed 16MB) 1.09 1.02 -6.4%
50% Seq / 50% Seek (fixed 16MB) 1.08 0.94 -13.4%
70% Seq / 30% Seek (fixed 16MB) 1.09 0.87 -19.7%
90% Seq / 10% Seek (fixed 16MB) 1.12 1.09 -2.4%
50% Seq/Seek variable (64KB - 100MB) 1.68 1.44 -14.2%

@martindurant
Copy link
Copy Markdown
Member

when used in conjunction with caches like readahead_chunked

This seems like unnecessary coupling between the behaviours of the "cache" and "fetcher", just the kind of thing you were wishing to avoid. This way around, if the cache is None (which is recommended), you may end up waiting, no?

@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 20, 2026

Codecov Report

❌ Patch coverage is 92.64706% with 5 lines in your changes missing coverage. Please review.
✅ Project coverage is 88.30%. Comparing base (eaf6e33) to head (bf79c7b).
⚠️ Report is 5 commits behind head on main.

Files with missing lines Patch % Lines
gcsfs/prefetcher.py 92.64% 5 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #818      +/-   ##
==========================================
- Coverage   88.35%   88.30%   -0.06%     
==========================================
  Files          15       15              
  Lines        2989     3035      +46     
==========================================
+ Hits         2641     2680      +39     
- Misses        348      355       +7     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@googlyrahman
Copy link
Copy Markdown
Collaborator Author

This seems like unnecessary coupling between the behaviours of the "cache" and "fetcher", just the kind of thing you were wishing to avoid.

Yes Ideally, one shouldn't layer the existing caches on top of the prefetcher. However, as we discussed, customers might still enable this experimental flag to test it with default cache (currently readahead_chunked for zonal) during a trial. This change should support that use case.

if the cache is None (which is recommended), you may end up waiting, no?

This is correct, if cache_type = none - we do not start prefetching on the first sequential read, we start prefetching on second sequential read.


While I agree that starting prefetching on the second read is generally more logical, we have to consider existing users who might enable this experimental flag while still using current caches. We haven't transitioned the default cache setting to none yet because removing buffering entirely could negatively impact performance for large portion of our user base who do not use this experimental flag.

@zhixiangli
Copy link
Copy Markdown
Collaborator

zhixiangli commented Apr 21, 2026

What happens if read-ahead runs out of cache? I think it will trigger the exact same false-positive signals again?

@googlyrahman
Copy link
Copy Markdown
Collaborator Author

googlyrahman commented Apr 21, 2026

What happens if read-ahead runs out of cache? I think it will trigger the exact same false-positive signals again?

Yes, so basically in cache_type="none" the prefetching would start from actual third read, and in cache_type="readahead_chunked" the prefetching would start from actual second second.

I'm not concerned about handling false-positive here, I'm only concerned about the prefetching shouldn't happen at the first read, because then it would destroy the throughput of users doing completely random reads.

If the goal is to fix this false positive, then this variable MIN_STREAKS_FOR_PREFETCHING needs to be adjusted with respect to cache_types, which would introduce some un-necessary code in core.py which i think might not be desirable at the moment, The core.py is anyway very big, and we wish to simplify the code in future + readahead_chunked will gonna deprecate in future (while there are other caches with similar behaviour of readahead_chunked such as background block but they are rarely used as per my past converation with Martin).

@googlyrahman googlyrahman marked this pull request as ready for review April 21, 2026 05:17
@googlyrahman
Copy link
Copy Markdown
Collaborator Author

@martindurant can you review this?

Comment thread gcsfs/prefetcher.py Outdated
Comment thread gcsfs/prefetcher.py
if not available:
if self.is_producer_stopped() and self.queue.empty():
is_producer_stopped = (
not hasattr(self.orchestrator, "producer")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: how about setting the producer to None then we do self.orchestrator.producer is None check here? It's more readable.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(or set it to None as a class attribute)

@googlyrahman
Copy link
Copy Markdown
Collaborator Author

@martindurant, can you take a look if you're interested? The benchmarks before and after this change is attached in the description.

@martindurant
Copy link
Copy Markdown
Member

The change should affect the small reads primarily?

@martindurant
Copy link
Copy Markdown
Member

(sorry no, that comment is for #840)

@googlyrahman
Copy link
Copy Markdown
Collaborator Author

The change should affect the small reads primarily?
(sorry no, that comment is for #840)

Yes, it should impact small reads primarily.

Comment on lines +202 to +206
Notice how perfectly the prefetcher mirrors the application's changing behaviour across three distinct phases:

1. **Smooth Sequential Reading (0–10s):** The application starts by reading steady 16MB chunks. After three consistent reads, the prefetcher confirms the pattern and confidently ramps up its background fetching. It maintains a comfortable buffer ahead of the application so the user never waits on the network.
2. **Pausing for Random Seeks (10–20s):** The application suddenly stops reading in a straight line and starts jumping randomly around the file. Prefetching is actively harmful here. The engine instantly detects the broken streak and drops the background buffer to zero, ensuring no network bandwidth or memory is wasted downloading unneeded data.
3. **Adjusting to Massive Reads (20–30s):** The application resumes reading sequentially, but this time stepping up to massive 100MB chunks. The algorithm quickly catches on. It detects the new streak, calculates the new rolling average, and rebuilds the background buffer—this time scaling it up safely to handle the much larger data chunks.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing superlatives - I think it's clearer with neutral language.

Suggested change
Notice how perfectly the prefetcher mirrors the application's changing behaviour across three distinct phases:
1. **Smooth Sequential Reading (0–10s):** The application starts by reading steady 16MB chunks. After three consistent reads, the prefetcher confirms the pattern and confidently ramps up its background fetching. It maintains a comfortable buffer ahead of the application so the user never waits on the network.
2. **Pausing for Random Seeks (10–20s):** The application suddenly stops reading in a straight line and starts jumping randomly around the file. Prefetching is actively harmful here. The engine instantly detects the broken streak and drops the background buffer to zero, ensuring no network bandwidth or memory is wasted downloading unneeded data.
3. **Adjusting to Massive Reads (20–30s):** The application resumes reading sequentially, but this time stepping up to massive 100MB chunks. The algorithm quickly catches on. It detects the new streak, calculates the new rolling average, and rebuilds the background buffer—this time scaling it up safely to handle the much larger data chunks.
Notice how the prefetcher mirrors the application's behaviour across three distinct phases:
1. **Sequential reading (0–10s):** The application starts by reading 16MB chunks. After three reads, the prefetcher ramps up its background fetching. It maintains a buffer ahead of the application so the user never waits on the network.
2. **Random seeks (10–20s):** The application starts jumping randomly around the file. Prefetching would be harmful here. The engine detects the broken streak and drops the background buffer to zero, ensuring no network bandwidth or memory is wasted downloading data.
3. **Larger sequential reads (20–30s):** The application resumes reading sequentially, but this time stepping up to 100MB chunks. The algorithm detects the new streak, calculates the new rolling average, and rebuilds the background buffer—this time scaling it up to handle the larger data chunks.

bp._fetch(100, 150)
# Do 6 reads to push the streak well past the MIN_STREAKS threshold
for i in range(6):
bp._fetch(i * 50, (i + 1) * 50)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like the number here are somewhat arbitrary - or at least, they depend on the current set of defaults assumed by the prefetcher. Maybe they should be explicitly derived from those values?
I don't mind if not, but do add a comment, because they will need to be updated should the defaults change.


assert bp._fetch(0, 100) == b"A" * 100
for i in range(2):
bp._fetch(i * 100, (i + 1) * 100)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before, the read_tracker was directly mutated. Why was that insufficient?

for i in range(4):
bp._fetch(i * 60, (i + 1) * 60)

fsspec.asyn.sync(bp.loop, asyncio.sleep, 0.1)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sleep may need to be bigger in CI machines - can there be some form of wait here?

Comment thread gcsfs/prefetcher.py
Comment on lines +88 to +89
first_val = self._history[0]
return any(val != first_val for val in self._history)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
first_val = self._history[0]
return any(val != first_val for val in self._history)
return len(set(self._history)) > 1

?

Whether this is faster probably depends on where the first non-equal value is and how long the list can be.

Comment thread gcsfs/prefetcher.py
# remains the network I/O. However, for massive reads (>= 64MB), the extra
# step of copying and assembling huge byte strings in memory severely slows
# down the operation.
VARIABLE_IO_THRESHOLD = 64 * 1024 * 1024
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the best value of this depend on the network bandwidth? I bet on slow connections, we always prefer any amount of prefetching and copy time is irrelevant.

Comment thread gcsfs/prefetcher.py
self.on_error = on_error
self.consumer = consumer
self.tracker = tracker
self.orchestrator = orchestrator
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reference cycle - prefer a weakref?

Comment thread gcsfs/prefetcher.py
and avg_io_size > self._user_max_prefetch_size
)

# Disable prefetching ahead if highly variable AND average > 64MB, or if it exceeds user max
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if highly variable

If anything other than exactly constant, no?

Comment thread gcsfs/prefetcher.py
if not available:
if self.is_producer_stopped() and self.queue.empty():
is_producer_stopped = (
not hasattr(self.orchestrator, "producer")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(or set it to None as a class attribute)

Comment thread gcsfs/prefetcher.py
if isinstance(task, Exception):
logger.error("Consumer retrieved an exception: %s", task)
self.on_error(task)
self.orchestrator._set_error(task)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_set_error doesn't need the leading _ really - the only calling context is from this class, and it is meant to be called.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants