Skip to content

Commit 9df651b

Browse files
authored
perf: submit I/O requests eagerly in FullZipScheduler (#6513)
## Summary Refactor `FullZipScheduler::create_page_load_task` to accept a pre-submitted I/O future instead of deferring I/O submission until the async task executes. This allows the I/O requests to be submitted immediately during scheduling, enabling the object store layer to batch and parallelize them. close #6504 ## I/O Model Change ### Before: Lazy I/O submission (serialized) Previously, `create_page_load_task` received a `FullZipReadSource::Remote(io)` along with byte ranges and priority. The actual `io.submit_request()` call happened **inside** the async block, meaning the I/O request was not submitted until the future was first polled. When decoding multiple pages (e.g. across many fragments), this created a sequential I/O pattern: ``` Page 1: [schedule] -> [poll] -> [submit I/O] -> [wait response] -> [decode] Page 2: [schedule] -> [poll] -> [submit I/O] -> [wait response] -> [decode] Page 3: [schedule] -> [poll] -> ... ``` Each page's I/O request could only be submitted after the previous task started executing. The I/O scheduler had no visibility into upcoming requests, preventing it from batching or parallelizing them effectively. ### After: Eager I/O submission (pipelined) Now, `io.submit_request()` is called **before** constructing the `PageLoadTask`, and the resulting future is passed into `create_page_load_task`. All I/O requests for all pages are submitted upfront during the scheduling phase: ``` [schedule all pages] --> submit I/O page 1 -+ --> submit I/O page 2 -+ --> submit I/O page 3 -+ (all in-flight concurrently) --> submit I/O page N -+ | [poll] -> [await page 1 response] -> [decode] [poll] -> [await page 2 response] -> [decode] [poll] -> [await page 3 response] -> [decode] ``` The object store layer can now see all pending requests at once and optimize I/O through batching, connection multiplexing, and parallel fetches. The async tasks only await the already-in-flight I/O futures. ## Changes - `rust/lance-encoding/src/encodings/logical/primitive.rs`: - Changed `create_page_load_task` signature to accept `BoxFuture<'static, Result<Vec<Bytes>>>` instead of `FullZipReadSource` + byte ranges + priority - Moved `io.submit_request()` calls to happen eagerly at both call sites (`schedule_ranges_with_rep_index` and the non-rep-index path), before constructing the page load task ## Performance Tested with a multi-fragment dataset containing fixed-width columns (768-dim float32 vectors, 40 fragments, 50 rows/fragment): | Benchmark | Before (p50) | After (p50) | Speedup | |---|---|---|---| | Fixed-width column scan | 3453 ms | 523 ms | **6.6x** | The improvement comes entirely from I/O pipelining — the decoding logic itself is unchanged. The effect is most pronounced with many fragments or pages, where the serialized I/O submission was the dominant bottleneck.
1 parent 97a89a6 commit 9df651b

1 file changed

Lines changed: 11 additions & 15 deletions

File tree

rust/lance-encoding/src/encodings/logical/primitive.rs

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2165,15 +2165,17 @@ impl FullZipScheduler {
21652165
}
21662166

21672167
fn create_page_load_task(
2168-
read_source: FullZipReadSource,
2169-
byte_ranges: Vec<Range<u64>>,
2170-
priority: u64,
2168+
io_future: BoxFuture<'static, Result<Vec<Bytes>>>,
21712169
num_rows: u64,
21722170
details: Arc<FullZipDecodeDetails>,
21732171
bits_per_offset: u8,
21742172
) -> PageLoadTask {
21752173
let load_task = async move {
2176-
let data = read_source.fetch(&byte_ranges, priority).await?;
2174+
let buffers = io_future.await?;
2175+
let data = buffers
2176+
.into_iter()
2177+
.map(|bytes| LanceBuffer::from_bytes(bytes, 1))
2178+
.collect::<VecDeque<_>>();
21772179
Self::create_decoder(details, data, num_rows, bits_per_offset)
21782180
}
21792181
.boxed();
@@ -2333,14 +2335,9 @@ impl FullZipScheduler {
23332335
rep_index.bytes_per_value,
23342336
data_buf_position,
23352337
);
2336-
let page_load_task = Self::create_page_load_task(
2337-
FullZipReadSource::Remote(io.clone()),
2338-
byte_ranges,
2339-
priority,
2340-
num_rows,
2341-
details,
2342-
bits_per_offset,
2343-
);
2338+
let io_future = io.submit_request(byte_ranges, priority);
2339+
let page_load_task =
2340+
Self::create_page_load_task(io_future, num_rows, details, bits_per_offset);
23442341
return Ok(vec![page_load_task]);
23452342
}
23462343

@@ -2403,10 +2400,9 @@ impl FullZipScheduler {
24032400
})
24042401
.collect::<Vec<_>>();
24052402

2403+
let io_future = io.submit_request(byte_ranges, self.priority);
24062404
let page_load_task = Self::create_page_load_task(
2407-
FullZipReadSource::Remote(io.clone()),
2408-
byte_ranges,
2409-
self.priority,
2405+
io_future,
24102406
num_rows,
24112407
self.details.clone(),
24122408
self.bits_per_offset,

0 commit comments

Comments
 (0)