Refactor prefetcher code#818
Conversation
This seems like unnecessary coupling between the behaviours of the "cache" and "fetcher", just the kind of thing you were wishing to avoid. This way around, if the cache is None (which is recommended), you may end up waiting, no? |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #818 +/- ##
==========================================
- Coverage 88.35% 88.30% -0.06%
==========================================
Files 15 15
Lines 2989 3035 +46
==========================================
+ Hits 2641 2680 +39
- Misses 348 355 +7 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Yes Ideally, one shouldn't layer the existing caches on top of the
This is correct, if While I agree that starting prefetching on the second read is generally more logical, we have to consider existing users who might enable this experimental flag while still using current caches. We haven't transitioned the default cache setting to none yet because removing buffering entirely could negatively impact performance for large portion of our user base who do not use this experimental flag. |
|
What happens if read-ahead runs out of cache? I think it will trigger the exact same false-positive signals again? |
Yes, so basically in I'm not concerned about handling false-positive here, I'm only concerned about the prefetching shouldn't happen at the first read, because then it would destroy the throughput of users doing completely random reads. If the goal is to fix this false positive, then this variable |
|
@martindurant can you review this? |
| if not available: | ||
| if self.is_producer_stopped() and self.queue.empty(): | ||
| is_producer_stopped = ( | ||
| not hasattr(self.orchestrator, "producer") |
There was a problem hiding this comment.
nit: how about setting the producer to None then we do self.orchestrator.producer is None check here? It's more readable.
There was a problem hiding this comment.
(or set it to None as a class attribute)
|
@martindurant, can you take a look if you're interested? The benchmarks before and after this change is attached in the description. |
|
The change should affect the small reads primarily? |
|
(sorry no, that comment is for #840) |
Yes, it should impact small reads primarily. |
| Notice how perfectly the prefetcher mirrors the application's changing behaviour across three distinct phases: | ||
|
|
||
| 1. **Smooth Sequential Reading (0–10s):** The application starts by reading steady 16MB chunks. After three consistent reads, the prefetcher confirms the pattern and confidently ramps up its background fetching. It maintains a comfortable buffer ahead of the application so the user never waits on the network. | ||
| 2. **Pausing for Random Seeks (10–20s):** The application suddenly stops reading in a straight line and starts jumping randomly around the file. Prefetching is actively harmful here. The engine instantly detects the broken streak and drops the background buffer to zero, ensuring no network bandwidth or memory is wasted downloading unneeded data. | ||
| 3. **Adjusting to Massive Reads (20–30s):** The application resumes reading sequentially, but this time stepping up to massive 100MB chunks. The algorithm quickly catches on. It detects the new streak, calculates the new rolling average, and rebuilds the background buffer—this time scaling it up safely to handle the much larger data chunks. |
There was a problem hiding this comment.
Removing superlatives - I think it's clearer with neutral language.
| Notice how perfectly the prefetcher mirrors the application's changing behaviour across three distinct phases: | |
| 1. **Smooth Sequential Reading (0–10s):** The application starts by reading steady 16MB chunks. After three consistent reads, the prefetcher confirms the pattern and confidently ramps up its background fetching. It maintains a comfortable buffer ahead of the application so the user never waits on the network. | |
| 2. **Pausing for Random Seeks (10–20s):** The application suddenly stops reading in a straight line and starts jumping randomly around the file. Prefetching is actively harmful here. The engine instantly detects the broken streak and drops the background buffer to zero, ensuring no network bandwidth or memory is wasted downloading unneeded data. | |
| 3. **Adjusting to Massive Reads (20–30s):** The application resumes reading sequentially, but this time stepping up to massive 100MB chunks. The algorithm quickly catches on. It detects the new streak, calculates the new rolling average, and rebuilds the background buffer—this time scaling it up safely to handle the much larger data chunks. | |
| Notice how the prefetcher mirrors the application's behaviour across three distinct phases: | |
| 1. **Sequential reading (0–10s):** The application starts by reading 16MB chunks. After three reads, the prefetcher ramps up its background fetching. It maintains a buffer ahead of the application so the user never waits on the network. | |
| 2. **Random seeks (10–20s):** The application starts jumping randomly around the file. Prefetching would be harmful here. The engine detects the broken streak and drops the background buffer to zero, ensuring no network bandwidth or memory is wasted downloading data. | |
| 3. **Larger sequential reads (20–30s):** The application resumes reading sequentially, but this time stepping up to 100MB chunks. The algorithm detects the new streak, calculates the new rolling average, and rebuilds the background buffer—this time scaling it up to handle the larger data chunks. |
| bp._fetch(100, 150) | ||
| # Do 6 reads to push the streak well past the MIN_STREAKS threshold | ||
| for i in range(6): | ||
| bp._fetch(i * 50, (i + 1) * 50) |
There was a problem hiding this comment.
It feels like the number here are somewhat arbitrary - or at least, they depend on the current set of defaults assumed by the prefetcher. Maybe they should be explicitly derived from those values?
I don't mind if not, but do add a comment, because they will need to be updated should the defaults change.
|
|
||
| assert bp._fetch(0, 100) == b"A" * 100 | ||
| for i in range(2): | ||
| bp._fetch(i * 100, (i + 1) * 100) |
There was a problem hiding this comment.
Before, the read_tracker was directly mutated. Why was that insufficient?
| for i in range(4): | ||
| bp._fetch(i * 60, (i + 1) * 60) | ||
|
|
||
| fsspec.asyn.sync(bp.loop, asyncio.sleep, 0.1) |
There was a problem hiding this comment.
The sleep may need to be bigger in CI machines - can there be some form of wait here?
| first_val = self._history[0] | ||
| return any(val != first_val for val in self._history) |
There was a problem hiding this comment.
| first_val = self._history[0] | |
| return any(val != first_val for val in self._history) | |
| return len(set(self._history)) > 1 |
?
Whether this is faster probably depends on where the first non-equal value is and how long the list can be.
| # remains the network I/O. However, for massive reads (>= 64MB), the extra | ||
| # step of copying and assembling huge byte strings in memory severely slows | ||
| # down the operation. | ||
| VARIABLE_IO_THRESHOLD = 64 * 1024 * 1024 |
There was a problem hiding this comment.
Does the best value of this depend on the network bandwidth? I bet on slow connections, we always prefer any amount of prefetching and copy time is irrelevant.
| self.on_error = on_error | ||
| self.consumer = consumer | ||
| self.tracker = tracker | ||
| self.orchestrator = orchestrator |
There was a problem hiding this comment.
Reference cycle - prefer a weakref?
| and avg_io_size > self._user_max_prefetch_size | ||
| ) | ||
|
|
||
| # Disable prefetching ahead if highly variable AND average > 64MB, or if it exceeds user max |
There was a problem hiding this comment.
if highly variable
If anything other than exactly constant, no?
| if not available: | ||
| if self.is_producer_stopped() and self.queue.empty(): | ||
| is_producer_stopped = ( | ||
| not hasattr(self.orchestrator, "producer") |
There was a problem hiding this comment.
(or set it to None as a class attribute)
| if isinstance(task, Exception): | ||
| logger.error("Consumer retrieved an exception: %s", task) | ||
| self.on_error(task) | ||
| self.orchestrator._set_error(task) |
There was a problem hiding this comment.
_set_error doesn't need the leading _ really - the only calling context is from this class, and it is meant to be called.
Summary of the changes:
Refactor Prefetcher file:
PrefetchProducerandPrefetchConsumerwith direct object references. This streamlines the initialization process and makes the internal dependency graph much cleaner.Start prefetching on 3rd read, instead of second
readahead_chunked(which generates two underlying requests for a single user read), this triggered false positive signals, causing the engine to prefetch unnecessary extra data. Delaying the start until the third read corrects this behavior.Disable Prefetching for variable reads with average greater than 64MB
When read chunk sizes are highly variable, the producer fetches data based on a rolling average, forcing the consumer to slice and stitch multiple partial chunks to match the exact user request.
Numbers before Change
Before
Numbers After Change
After