Skip to content

Commit 1bda0d9

Browse files
westonpaceclaude
andauthored
feat(io): bypass backpressure for io_buffer_size=0 and 2.0 indirect I/O (#6627)
## Summary - When `SchedulerConfig` is created with `io_buffer_size_bytes = 0`, the byte-based backpressure system is disabled entirely — all I/O proceeds without waiting for budget. Previously a zero buffer size would deadlock (every read exceeds a zero budget). - New env var `LANCE_BYPASS_INDIRECT_IO_BACKPRESSURE`: when set, indirect I/O in the **2.0 list scheduler** (`indirect_schedule_task` — items fetched after decoding offsets) bypasses the backpressure budget. This is surfaced via a new `EncodingsIo::with_bypass_backpressure()` method implemented by `LanceEncodingsIo`. Both the standard and lite schedulers are updated consistently. The IOPS concurrency limit is unaffected; only the byte-budget gate is bypassed. ## Changed files | File | Change | |------|--------| | `rust/lance-io/src/scheduler.rs` | `no_backpressure` on `IoQueueState`; `bypass_backpressure` on `IoTask`/`MutableBatch`; `FileScheduler::with_bypass_backpressure()` | | `rust/lance-io/src/scheduler/lite.rs` | `no_backpressure` on `SimpleBackpressureThrottle`; `force_acquire` on `BackpressureThrottle` trait; `bypass_backpressure` on lite `IoTask` | | `rust/lance-encoding/src/lib.rs` | `EncodingsIo::with_bypass_backpressure()` default method | | `rust/lance-file/src/io.rs` | `LanceEncodingsIo` override of `with_bypass_backpressure()` | | `rust/lance-encoding/src/previous/encodings/logical/list.rs` | `LANCE_BYPASS_INDIRECT_IO_BACKPRESSURE` env var + bypass io wrapping in indirect schedule | ## Test plan - [ ] `cargo test -p lance-io --lib` — 155 tests pass (io_uring tests excluded, require kernel support) - [ ] `cargo test -p lance-encoding --lib` — 369 tests pass - [ ] `cargo check --workspace --tests` — clean --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 2fb821c commit 1bda0d9

5 files changed

Lines changed: 453 additions & 39 deletions

File tree

rust/lance-encoding/src/lib.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright The Lance Authors
33

4-
use std::ops::Range;
4+
use std::{ops::Range, sync::Arc};
55

66
use bytes::Bytes;
77
use futures::{FutureExt, TryFutureExt, future::BoxFuture};
@@ -75,6 +75,17 @@ pub trait EncodingsIo: std::fmt::Debug + Send + Sync {
7575
.map_ok(|mut v| v.pop().unwrap())
7676
.boxed()
7777
}
78+
79+
/// Returns a version of this I/O service that bypasses backpressure for all requests.
80+
///
81+
/// This is intended for indirect I/O (e.g. fetching items after decoding offsets) where
82+
/// blocking on backpressure could cause deadlocks or excessive latency.
83+
///
84+
/// Returns `None` if this implementation does not support bypass (e.g. in-memory or test
85+
/// schedulers), in which case the caller should fall back to using self.
86+
fn with_bypass_backpressure(&self) -> Option<Arc<dyn EncodingsIo>> {
87+
None
88+
}
7889
}
7990

8091
/// An implementation of EncodingsIo that serves data from an in-memory buffer

rust/lance-encoding/src/previous/encodings/logical/list.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright The Lance Authors
33

4-
use std::{collections::VecDeque, ops::Range, sync::Arc};
4+
use std::{
5+
collections::VecDeque,
6+
ops::Range,
7+
sync::{Arc, OnceLock},
8+
};
59

610
use arrow_array::{
711
Array, ArrayRef, BooleanArray, Int32Array, Int64Array, LargeListArray, ListArray, UInt64Array,
@@ -12,7 +16,7 @@ use arrow_array::{
1216
use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, Buffer, NullBuffer, OffsetBuffer};
1317
use arrow_schema::{DataType, Field, Fields};
1418
use futures::{FutureExt, future::BoxFuture};
15-
use lance_core::{Error, Result, cache::LanceCache};
19+
use lance_core::{Error, Result, cache::LanceCache, utils::parse::str_is_truthy};
1620
use log::trace;
1721
use tokio::task::JoinHandle;
1822

@@ -35,6 +39,20 @@ use crate::{
3539
utils::accumulation::AccumulationQueue,
3640
};
3741

42+
/// When set, indirect I/O in the 2.0 list scheduler bypasses the backpressure system.
43+
///
44+
/// This can be a blunt instrument to avoid deadlocks in 2.0 scenarios
45+
/// Set LANCE_BYPASS_INDIRECT_IO_BACKPRESSURE=1 to enable.
46+
static BYPASS_INDIRECT_IO_BACKPRESSURE: OnceLock<bool> = OnceLock::new();
47+
48+
fn bypass_indirect_io_backpressure() -> bool {
49+
*BYPASS_INDIRECT_IO_BACKPRESSURE.get_or_init(|| {
50+
std::env::var("LANCE_BYPASS_INDIRECT_IO_BACKPRESSURE")
51+
.map(|val| str_is_truthy(&val))
52+
.unwrap_or(false)
53+
})
54+
}
55+
3856
// Scheduling lists is tricky. Imagine the following scenario:
3957
//
4058
// * There are 2000 offsets per offsets page
@@ -454,7 +472,12 @@ impl SchedulingJob for ListFieldSchedulingJob<'_> {
454472

455473
let items_scheduler = self.scheduler.items_scheduler.clone();
456474
let items_type = self.scheduler.items_field.data_type().clone();
457-
let io = context.io().clone();
475+
let base_io = context.io().clone();
476+
let io = if bypass_indirect_io_backpressure() {
477+
base_io.with_bypass_backpressure().unwrap_or(base_io)
478+
} else {
479+
base_io
480+
};
458481
let cache = context.cache().clone();
459482

460483
// Immediately spawn the indirect scheduling

rust/lance-file/src/io.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright The Lance Authors
33

4+
use std::sync::Arc;
5+
46
use futures::{FutureExt, future::BoxFuture};
57
use lance_encoding::EncodingsIo;
68
use lance_io::scheduler::FileScheduler;
@@ -29,6 +31,13 @@ impl LanceEncodingsIo {
2931
}
3032

3133
impl EncodingsIo for LanceEncodingsIo {
34+
fn with_bypass_backpressure(&self) -> Option<Arc<dyn EncodingsIo>> {
35+
Some(Arc::new(Self {
36+
scheduler: self.scheduler.with_bypass_backpressure(),
37+
read_chunk_size: self.read_chunk_size,
38+
}))
39+
}
40+
3241
fn submit_request(
3342
&self,
3443
ranges: Vec<std::ops::Range<u64>>,

0 commit comments

Comments
 (0)