Skip to content

Commit 1a1094d

Browse files
westonpaceclaude
andauthored
feat: object store decides scheduler type (#6373)
## Summary - Add `ObjectStore::prefers_lite_scheduler()` that returns `true` for `file+uring://` stores, so the lite scheduler is used automatically without needing the env var - Change `SchedulerConfig::use_lite_scheduler` from `bool` to `Option<bool>` — `Some(true/false)` overrides, `None` defers to the object store's preference - `LANCE_USE_LITE_SCHEDULER` env var still works as an override when explicitly set ## Test plan - [x] `cargo check -p lance-io --tests --benches` compiles cleanly - [x] `cargo test -p lance-io` — all 148 tests pass - [x] `cargo clippy -p lance-io --tests --benches -- -D warnings` — no warnings 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 21d830a commit 1a1094d

3 files changed

Lines changed: 98 additions & 26 deletions

File tree

rust/lance-io/benches/scheduler.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@ use pprof::criterion::{Output, PProfProfiler};
2222
struct FullReadParams {
2323
io_parallelism: u32,
2424
page_size: u64,
25-
use_lite_scheduler: bool,
25+
use_lite_scheduler: Option<bool>,
2626
}
2727

2828
impl Display for FullReadParams {
2929
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3030
write!(
3131
f,
32-
"full_read,parallel={},read_size={},use_lite_scheduler={}",
32+
"full_read,parallel={},read_size={},use_lite_scheduler={:?}",
3333
self.io_parallelism, self.page_size, self.use_lite_scheduler
3434
)
3535
}
@@ -74,7 +74,7 @@ fn bench_full_read(c: &mut Criterion) {
7474
let runtime = Runtime::new().unwrap();
7575
let (obj_store, tmp_file) = runtime.block_on(create_data(DATA_SIZE));
7676

77-
for use_lite_scheduler in [false, true] {
77+
for use_lite_scheduler in [Some(false), Some(true)] {
7878
for io_parallelism in [1, 16] {
7979
for page_size in [4096, 1024 * 1024] {
8080
let params = FullReadParams {
@@ -101,10 +101,10 @@ fn bench_full_read(c: &mut Criterion) {
101101
unsafe {
102102
std::env::set_var("IO_THREADS", io_parallelism.to_string());
103103
}
104-
let mut config = SchedulerConfig::default_for_testing();
105-
if use_lite_scheduler {
106-
config = config.with_lite_scheduler();
107-
}
104+
let config = SchedulerConfig {
105+
use_lite_scheduler,
106+
..SchedulerConfig::default_for_testing()
107+
};
108108
runtime.block_on(async {
109109
let scheduler = ScanScheduler::new(obj_store, config);
110110
let file_scheduler = scheduler
@@ -142,15 +142,15 @@ struct RandomReadParams {
142142
io_parallelism: u32,
143143
item_size: u32,
144144
indices: Arc<Vec<u32>>,
145-
use_lite_scheduler: bool,
145+
use_lite_scheduler: Option<bool>,
146146
noisy_runtime: bool,
147147
}
148148

149149
impl Display for RandomReadParams {
150150
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151151
write!(
152152
f,
153-
"random_read,parallel={},item_size={},use_lite_scheduler={},noisy={}",
153+
"random_read,parallel={},item_size={},use_lite_scheduler={:?},noisy={}",
154154
self.io_parallelism, self.item_size, self.use_lite_scheduler, self.noisy_runtime
155155
)
156156
}
@@ -187,7 +187,7 @@ fn bench_random_read(c: &mut Criterion) {
187187
));
188188

189189
for noisy_runtime in [false, true] {
190-
for use_lite_scheduler in [false, true] {
190+
for use_lite_scheduler in [Some(false), Some(true)] {
191191
for io_parallelism in [1, 16] {
192192
for item_size in [4096, 32 * 1024] {
193193
let runtime = Runtime::new().unwrap();
@@ -240,10 +240,10 @@ fn bench_random_read(c: &mut Criterion) {
240240
}
241241
}
242242

243-
let mut config = SchedulerConfig::default_for_testing();
244-
if use_lite_scheduler {
245-
config = config.with_lite_scheduler();
246-
}
243+
let config = SchedulerConfig {
244+
use_lite_scheduler,
245+
..SchedulerConfig::default_for_testing()
246+
};
247247
let scheduler = ScanScheduler::new(obj_store, config);
248248
let file_scheduler = scheduler
249249
.open_file(&tmp_file, &CachedFileSize::unknown())

rust/lance-io/src/object_store.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -534,11 +534,19 @@ impl ObjectStore {
534534

535535
/// Returns true if the object store pointed to a local file system.
536536
pub fn is_local(&self) -> bool {
537-
self.scheme == "file"
537+
self.scheme == "file" || self.scheme == "file+uring"
538538
}
539539

540540
pub fn is_cloud(&self) -> bool {
541-
self.scheme != "file" && self.scheme != "memory"
541+
!self.is_local() && self.scheme != "memory"
542+
}
543+
544+
/// Whether this object store prefers the lite scheduler.
545+
///
546+
/// The lite scheduler is designed for backends like io_uring where
547+
/// tasks should only be polled when the consumer polls them.
548+
pub fn prefers_lite_scheduler(&self) -> bool {
549+
self.scheme == "file+uring"
542550
}
543551

544552
pub fn scheme(&self) -> &str {

rust/lance-io/src/scheduler.rs

Lines changed: 74 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use std::sync::{Arc, Mutex};
1515
use std::time::Instant;
1616
use tokio::sync::Notify;
1717

18+
use lance_core::utils::parse::str_is_truthy;
1819
use lance_core::{Error, Result};
1920

2021
use crate::object_store::ObjectStore;
@@ -505,23 +506,29 @@ pub struct SchedulerConfig {
505506
/// This controls back pressure. If data is not processed quickly enough then this
506507
/// buffer will fill up and the I/O loop will pause until the buffer is drained.
507508
pub io_buffer_size_bytes: u64,
508-
/// Whether to use the new lite scheduler
509-
pub use_lite_scheduler: bool,
509+
/// Whether to use the lite scheduler.
510+
///
511+
/// - `Some(true)` forces the lite scheduler (e.g. from env var or programmatic).
512+
/// - `Some(false)` forces the standard scheduler.
513+
/// - `None` defers to the object store's preference (see [`ObjectStore::prefers_lite_scheduler`]).
514+
pub use_lite_scheduler: Option<bool>,
510515
}
511516

512517
impl SchedulerConfig {
513518
pub fn new(io_buffer_size_bytes: u64) -> Self {
514519
Self {
515520
io_buffer_size_bytes,
516-
use_lite_scheduler: std::env::var("LANCE_USE_LITE_SCHEDULER").is_ok(),
521+
use_lite_scheduler: std::env::var("LANCE_USE_LITE_SCHEDULER")
522+
.ok()
523+
.map(|v| str_is_truthy(v.trim())),
517524
}
518525
}
519526

520527
/// Big enough for unit testing
521528
pub fn default_for_testing() -> Self {
522529
Self {
523530
io_buffer_size_bytes: 256 * 1024 * 1024,
524-
use_lite_scheduler: false,
531+
use_lite_scheduler: None,
525532
}
526533
}
527534

@@ -533,7 +540,7 @@ impl SchedulerConfig {
533540

534541
pub fn with_lite_scheduler(self) -> Self {
535542
Self {
536-
use_lite_scheduler: true,
543+
use_lite_scheduler: Some(true),
537544
..self
538545
}
539546
}
@@ -548,7 +555,10 @@ impl ScanScheduler {
548555
/// * config - configuration settings for the scheduler
549556
pub fn new(object_store: Arc<ObjectStore>, config: SchedulerConfig) -> Arc<Self> {
550557
let io_capacity = object_store.io_parallelism();
551-
let io_queue = if config.use_lite_scheduler {
558+
let use_lite = config
559+
.use_lite_scheduler
560+
.unwrap_or_else(|| object_store.prefers_lite_scheduler());
561+
let io_queue = if use_lite {
552562
let io_queue = Arc::new(lite::IoQueue::new(
553563
io_capacity as u64,
554564
config.io_buffer_size_bytes,
@@ -743,6 +753,11 @@ impl ScanScheduler {
743753
pub fn stats(&self) -> ScanStats {
744754
ScanStats::new(self.stats.as_ref())
745755
}
756+
757+
#[cfg(test)]
758+
fn uses_lite_scheduler(&self) -> bool {
759+
matches!(self.io_queue, IoQueueType::Lite(_))
760+
}
746761
}
747762

748763
impl Drop for ScanScheduler {
@@ -1120,7 +1135,7 @@ mod tests {
11201135

11211136
let config = SchedulerConfig {
11221137
io_buffer_size_bytes: 1024 * 1024,
1123-
use_lite_scheduler: false,
1138+
use_lite_scheduler: None,
11241139
};
11251140

11261141
let scan_scheduler = ScanScheduler::new(obj_store, config);
@@ -1211,7 +1226,7 @@ mod tests {
12111226

12121227
let config = SchedulerConfig {
12131228
io_buffer_size_bytes: 10,
1214-
use_lite_scheduler: false,
1229+
use_lite_scheduler: None,
12151230
};
12161231

12171232
let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
@@ -1286,7 +1301,7 @@ mod tests {
12861301
// Ensure deadlock prevention timeout can be disabled
12871302
let config = SchedulerConfig {
12881303
io_buffer_size_bytes: 10,
1289-
use_lite_scheduler: false,
1304+
use_lite_scheduler: None,
12901305
};
12911306

12921307
let scan_scheduler = ScanScheduler::new(obj_store, config);
@@ -1374,6 +1389,55 @@ mod tests {
13741389
assert_eq!(fut3.await.unwrap()[0].len(), 100);
13751390
}
13761391

1392+
#[tokio::test]
1393+
async fn test_object_store_selects_scheduler() {
1394+
// A memory:// store should use the standard scheduler when config is None
1395+
let memory_store = Arc::new(ObjectStore::memory());
1396+
assert!(!memory_store.prefers_lite_scheduler());
1397+
let config = SchedulerConfig {
1398+
io_buffer_size_bytes: 256 * 1024 * 1024,
1399+
use_lite_scheduler: None,
1400+
};
1401+
let scheduler = ScanScheduler::new(memory_store.clone(), config);
1402+
assert!(!scheduler.uses_lite_scheduler());
1403+
1404+
// A file+uring:// store should use the lite scheduler when config is None
1405+
let uring_store = Arc::new(ObjectStore::new(
1406+
Arc::new(InMemory::new()),
1407+
Url::parse("file+uring:///tmp").unwrap(),
1408+
None,
1409+
None,
1410+
false,
1411+
false,
1412+
8,
1413+
DEFAULT_DOWNLOAD_RETRY_COUNT,
1414+
None,
1415+
));
1416+
assert!(uring_store.prefers_lite_scheduler());
1417+
let config = SchedulerConfig {
1418+
io_buffer_size_bytes: 256 * 1024 * 1024,
1419+
use_lite_scheduler: None,
1420+
};
1421+
let scheduler = ScanScheduler::new(uring_store.clone(), config);
1422+
assert!(scheduler.uses_lite_scheduler());
1423+
1424+
// Explicit Some(false) overrides a file+uring:// store's preference
1425+
let config = SchedulerConfig {
1426+
io_buffer_size_bytes: 256 * 1024 * 1024,
1427+
use_lite_scheduler: Some(false),
1428+
};
1429+
let scheduler = ScanScheduler::new(uring_store, config);
1430+
assert!(!scheduler.uses_lite_scheduler());
1431+
1432+
// Explicit Some(true) overrides a memory:// store's preference
1433+
let config = SchedulerConfig {
1434+
io_buffer_size_bytes: 256 * 1024 * 1024,
1435+
use_lite_scheduler: Some(true),
1436+
};
1437+
let scheduler = ScanScheduler::new(memory_store, config);
1438+
assert!(scheduler.uses_lite_scheduler());
1439+
}
1440+
13771441
#[test_log::test(tokio::test(flavor = "multi_thread"))]
13781442
async fn stress_backpressure() {
13791443
// This test ensures that the backpressure mechanism works correctly with
@@ -1389,7 +1453,7 @@ mod tests {
13891453
// Only one request will be allowed in
13901454
let config = SchedulerConfig {
13911455
io_buffer_size_bytes: 1,
1392-
use_lite_scheduler: false,
1456+
use_lite_scheduler: None,
13931457
};
13941458
let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
13951459
let file_scheduler = scan_scheduler

0 commit comments

Comments
 (0)