Skip to content

Commit d82e768

Browse files
authored
fix: respect LANCE_DEFAULT_IO_BUFFER_SIZE if it has been set (#6636)
When we migrated to the filtered read path we ended up ignoring `LANCE_DEFAULT_IO_BUFFER_SIZE`. This is because we would only check the environment variable if the scan had the io_buffer_size set. This change adds an explicit check for the environment variable.
1 parent e57b3df commit d82e768

4 files changed

Lines changed: 90 additions & 1 deletion

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/lance/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ tempfile.workspace = true
114114
test-log.workspace = true
115115
tracing-chrome = "0.7.1"
116116
rstest = { workspace = true }
117+
serial_test = { workspace = true }
117118
tracking-allocator = { version = "0.4", features = ["tracing-compat"] }
118119
paste = "1.0"
119120
# For S3 / DynamoDB tests

rust/lance/src/dataset/scanner.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,18 @@ pub static DEFAULT_IO_BUFFER_SIZE: LazyLock<u64> = LazyLock::new(|| {
167167
.unwrap_or(DEFAULT_IO_BUFFER_SIZE_VALUE)
168168
});
169169

170+
/// The user-set value of `LANCE_DEFAULT_IO_BUFFER_SIZE`, or `None` if the env var
171+
/// is unset or unparsable. Consult this from paths that have a sensible non-fixed
172+
/// default (e.g. `SchedulerConfig::max_bandwidth`) so the env var still takes
173+
/// precedence over that default. Re-reads the env var on each call so tests can
174+
/// mutate it.
175+
pub fn get_default_io_buffer_size_override() -> Option<u64> {
176+
parse_env_var(
177+
"LANCE_DEFAULT_IO_BUFFER_SIZE",
178+
&DEFAULT_IO_BUFFER_SIZE_VALUE.to_string(),
179+
)
180+
}
181+
170182
/// Defines an ordering for a single column
171183
///
172184
/// Floats are sorted using the IEEE 754 total ordering
@@ -10488,6 +10500,75 @@ full_filter=name LIKE Utf8(\"test%2\"), refine_filter=name LIKE Utf8(\"test%2\")
1048810500
);
1048910501
}
1049010502

10503+
fn find_filtered_read(plan: &dyn ExecutionPlan) -> Option<&FilteredReadExec> {
10504+
if let Some(f) = plan.as_any().downcast_ref::<FilteredReadExec>() {
10505+
return Some(f);
10506+
}
10507+
for child in plan.children() {
10508+
if let Some(f) = find_filtered_read(child.as_ref()) {
10509+
return Some(f);
10510+
}
10511+
}
10512+
None
10513+
}
10514+
10515+
#[tokio::test]
10516+
async fn test_io_buffer_size_explicit_propagated() {
10517+
// Sanity check: an explicit .io_buffer_size(N) call must reach the
10518+
// FilteredReadExec options unchanged, and the absence of one must leave
10519+
// io_buffer_size_bytes as None so FilteredReadExec can pick its own
10520+
// fallback (env var or max_bandwidth).
10521+
let data = lance_datagen::gen_batch()
10522+
.col("x", lance_datagen::array::step::<Int32Type>())
10523+
.into_reader_rows(RowCount::from(8), BatchCount::from(1));
10524+
let dataset = Dataset::write(data, "memory://test_io_buffer_explicit", None)
10525+
.await
10526+
.unwrap();
10527+
10528+
let plan = dataset.scan().create_plan().await.unwrap();
10529+
let filtered = find_filtered_read(plan.as_ref())
10530+
.expect("expected a FilteredReadExec in the scan plan");
10531+
assert_eq!(filtered.options().io_buffer_size_bytes, None);
10532+
10533+
let mut scanner = dataset.scan();
10534+
scanner.io_buffer_size(7777);
10535+
let plan = scanner.create_plan().await.unwrap();
10536+
let filtered = find_filtered_read(plan.as_ref())
10537+
.expect("expected a FilteredReadExec in the scan plan");
10538+
assert_eq!(filtered.options().io_buffer_size_bytes, Some(7777));
10539+
}
10540+
10541+
// The env var key scopes serial_test's lock so this test only blocks others
10542+
// that touch LANCE_DEFAULT_IO_BUFFER_SIZE — unrelated tests still run in
10543+
// parallel.
10544+
#[test]
10545+
#[serial_test::serial(LANCE_DEFAULT_IO_BUFFER_SIZE)]
10546+
fn test_default_io_buffer_size_override_env_var() {
10547+
// Force the sibling LazyLock to evaluate before we mutate the env var.
10548+
// It caches forever on first read, so another test concurrently reading
10549+
// *DEFAULT_IO_BUFFER_SIZE during our mutation window would otherwise
10550+
// cache one of our test values and poison the rest of the suite.
10551+
let _ = *DEFAULT_IO_BUFFER_SIZE;
10552+
10553+
// FilteredReadExec consults this when no explicit io_buffer_size was set
10554+
// on the scanner, so the LANCE_DEFAULT_IO_BUFFER_SIZE env var takes
10555+
// precedence over the max_bandwidth fallback.
10556+
unsafe {
10557+
std::env::set_var("LANCE_DEFAULT_IO_BUFFER_SIZE", "4096");
10558+
}
10559+
assert_eq!(get_default_io_buffer_size_override(), Some(4096));
10560+
10561+
unsafe {
10562+
std::env::set_var("LANCE_DEFAULT_IO_BUFFER_SIZE", "not_a_number");
10563+
}
10564+
assert_eq!(get_default_io_buffer_size_override(), None);
10565+
10566+
unsafe {
10567+
std::env::remove_var("LANCE_DEFAULT_IO_BUFFER_SIZE");
10568+
}
10569+
assert_eq!(get_default_io_buffer_size_override(), None);
10570+
}
10571+
1049110572
fn assert_values_in_range(array: &Int32Array, range: std::ops::Range<i32>, msg: &str) {
1049210573
assert!(!array.is_empty(), "Expected some results but got none");
1049310574
assert!(

rust/lance/src/io/exec/filtered_read.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ use crate::dataset::fragment::{FileFragment, FragReadConfig};
5757
use crate::dataset::rowids::load_row_id_sequence;
5858
use crate::dataset::scanner::{
5959
BATCH_SIZE_FALLBACK, DEFAULT_FRAGMENT_READAHEAD, get_default_batch_size,
60+
get_default_io_buffer_size_override,
6061
};
6162

6263
use super::utils::IoMetrics;
@@ -412,7 +413,12 @@ impl FilteredReadStream {
412413
let output_schema = Arc::new(options.projection.to_arrow_schema());
413414

414415
let obj_store = dataset.object_store.clone();
415-
let scheduler_config = if let Some(io_buffer_size_bytes) = options.io_buffer_size_bytes {
416+
// Explicit options take precedence; otherwise fall back to the
417+
// LANCE_DEFAULT_IO_BUFFER_SIZE env var if set; otherwise max_bandwidth.
418+
let scheduler_config = if let Some(io_buffer_size_bytes) = options
419+
.io_buffer_size_bytes
420+
.or_else(get_default_io_buffer_size_override)
421+
{
416422
SchedulerConfig::new(io_buffer_size_bytes)
417423
} else {
418424
SchedulerConfig::max_bandwidth(obj_store.as_ref())

0 commit comments

Comments
 (0)