Skip to content

Commit b1cf2d0

Browse files
tsafinclaude
andcommitted
Address PR review comments: correctness and safety fixes
- to_fs_path: don't prepend '/' if path already starts with '/' to avoid double-slash paths like '//tmp/lineitem.lance' - try_sysfs_queue_depth: use /sys/dev/block/MAJOR:MINOR symlink instead of scanning /sys/block/ by major number only; correctly handles partitions (e.g. sda1 vs sdb1 sharing major=8) by resolving the symlink and walking up to the parent disk device that owns queue/nr_requests - spawn_writer_thread: move raw_fd extraction inside the closure, after 'file' is established, making the lifetime relationship explicit - put_part: SeqCst → Relaxed for offset fetch_add; each part writes to its own non-overlapping range so no cross-thread happens-before is needed - lance_writer.cpp: check return value of lance_writer_enable_io_uring and throw on failure, consistent with lance_writer_set_write_params handling - lib.rs: gate io_uring module on target_os = "linux" in addition to the feature flag; the module uses Unix-only APIs (AsRawFd, MetadataExt, sysfs) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent f33f4d0 commit b1cf2d0

3 files changed

Lines changed: 56 additions & 32 deletions

File tree

src/writers/lance_writer.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,10 @@ void LanceWriter::initialize_lance_dataset(
253253
// Enable io_uring write path if requested (--lance-io-uring flag)
254254
#ifdef TPCH_LANCE_IO_URING
255255
if (use_io_uring_) {
256-
lance_writer_enable_io_uring(reinterpret_cast<::LanceWriter*>(rust_writer_), 1);
256+
int result = lance_writer_enable_io_uring(reinterpret_cast<::LanceWriter*>(rust_writer_), 1);
257+
if (result != 0) {
258+
throw std::runtime_error("Failed to enable io_uring for Lance writer");
259+
}
257260
}
258261
#endif
259262

third_party/lance-ffi/src/io_uring_store.rs

Lines changed: 46 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -83,27 +83,39 @@ fn try_sysfs_queue_depth(path: &std::path::Path) -> Option<u32> {
8383
.find(|p| p.exists())
8484
.unwrap_or_else(|| std::path::Path::new("/"));
8585

86-
let dev = std::fs::metadata(probe).ok()?.dev();
87-
// Extract major number. Standard Linux encoding (works for major < 4096).
88-
let target_major = ((dev >> 8) & 0xfff) as u32;
89-
90-
for entry in std::fs::read_dir("/sys/block").ok()?.flatten() {
91-
let dev_file = entry.path().join("dev");
92-
if let Ok(s) = std::fs::read_to_string(&dev_file) {
93-
let block_major: u32 = s.trim().split(':').next()?.parse().ok()?;
94-
if block_major == target_major {
95-
let nr_req_path = entry.path().join("queue/nr_requests");
96-
let nr_req: u32 =
97-
std::fs::read_to_string(nr_req_path).ok()?.trim().parse().ok()?;
98-
let qd = (nr_req / 2).clamp(8, 128);
99-
eprintln!(
100-
"Lance FFI: io_uring calibration: {:?} nr_requests={} → QD={}",
101-
entry.file_name(),
102-
nr_req,
103-
qd
104-
);
105-
return Some(qd);
106-
}
86+
let raw_dev = std::fs::metadata(probe).ok()?.dev();
87+
// Standard Linux dev_t encoding (works for major < 4096, minor < 1 048 576).
88+
let major = (raw_dev >> 8) & 0xfff;
89+
let minor = (raw_dev & 0xff) | ((raw_dev >> 12) & 0xfff00);
90+
91+
// /sys/dev/block/MAJOR:MINOR is a symlink to the device's sysfs entry.
92+
// This correctly handles partitions (e.g. sda1 → block/sda/sda1) without
93+
// false-matching a different disk that shares the same major number.
94+
let dev_link = std::path::PathBuf::from(format!("/sys/dev/block/{}:{}", major, minor));
95+
let sysfs_path = std::fs::canonicalize(&dev_link).ok()?;
96+
97+
// Walk up from the resolved sysfs path until we find queue/nr_requests.
98+
// For a partition (e.g. sda1) this walks up to the parent disk (sda) which
99+
// owns the queue settings; for a whole-disk device it matches immediately.
100+
let mut cur = sysfs_path.as_path();
101+
loop {
102+
let nr_req_file = cur.join("queue/nr_requests");
103+
if nr_req_file.exists() {
104+
let nr_req: u32 =
105+
std::fs::read_to_string(&nr_req_file).ok()?.trim().parse().ok()?;
106+
let dev_name = cur.file_name()?.to_string_lossy().into_owned();
107+
let qd = (nr_req / 2).clamp(8, 128);
108+
eprintln!(
109+
"Lance FFI: io_uring calibration: {:?} nr_requests={} → QD={}",
110+
dev_name,
111+
nr_req,
112+
qd
113+
);
114+
return Some(qd);
115+
}
116+
cur = cur.parent()?;
117+
if cur == std::path::Path::new("/") {
118+
break;
107119
}
108120
}
109121
None
@@ -232,7 +244,12 @@ fn collect_payload(payload: PutPayload) -> Vec<u8> {
232244
// ─── Convert object_store::Path → PathBuf ────────────────────────────────────
233245

234246
fn to_fs_path(path: &Path) -> PathBuf {
235-
PathBuf::from(format!("/{path}"))
247+
let s = path.to_string();
248+
if s.starts_with('/') {
249+
PathBuf::from(s)
250+
} else {
251+
PathBuf::from(format!("/{s}"))
252+
}
236253
}
237254

238255
// ─── Worker thread ────────────────────────────────────────────────────────────
@@ -269,10 +286,10 @@ fn spawn_writer_thread(
269286
// Channel depth = 2×qd: enough headroom for Lance's concurrent-part window.
270287
let cap = (qd as usize).max(32);
271288
let (tx, mut rx) = tokio::sync::mpsc::channel::<WriteJob>(cap);
272-
let raw_fd = file.as_raw_fd();
273289

274290
let handle = std::thread::spawn(move || {
275-
let _file = file; // keep fd alive so raw_fd stays valid
291+
let _file = file; // keep fd alive for the entire thread lifetime
292+
let raw_fd = _file.as_raw_fd(); // extract after move so lifetime is clear
276293
let mut ring = make_ring(qd)?;
277294

278295
// `blocking_recv()` parks the worker thread until a job arrives.
@@ -318,7 +335,9 @@ impl MultipartUpload for IoUringMultipartUpload {
318335
let bytes = collect_payload(data);
319336
let len = bytes.len() as u64;
320337
// Pre-claim write offset so concurrent parts get distinct ranges.
321-
let write_offset = self.offset.fetch_add(len, Ordering::SeqCst);
338+
// Relaxed is sufficient: each part writes to its own non-overlapping
339+
// range, so no happens-before relationship is needed between parts.
340+
let write_offset = self.offset.fetch_add(len, Ordering::Relaxed);
322341

323342
let (done_tx, done_rx) = tokio::sync::oneshot::channel();
324343
let tx = self.tx.clone();
@@ -414,7 +433,7 @@ impl ObjectStore for IoUringStore {
414433
&self,
415434
location: &Path,
416435
payload: PutPayload,
417-
opts: PutOptions,
436+
_opts: PutOptions, // attributes/tags not applicable for local io_uring writes
418437
) -> OSResult<PutResult> {
419438
let bytes = collect_payload(payload);
420439
let path = to_fs_path(location);
@@ -450,7 +469,7 @@ impl ObjectStore for IoUringStore {
450469
async fn put_multipart_opts(
451470
&self,
452471
location: &Path,
453-
opts: PutMultipartOptions,
472+
opts: PutMultipartOptions, // passed through to inner store on fallback
454473
) -> OSResult<Box<dyn MultipartUpload>> {
455474
match IoUringMultipartUpload::create(to_fs_path(location)) {
456475
Ok(upload) => Ok(Box::new(upload)),

third_party/lance-ffi/src/lib.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@ use tokio::runtime::Runtime;
1818
use lance::dataset::{WriteParams, WriteMode, CommitBuilder};
1919
use libc;
2020

21-
// io_uring write path — compiled in when feature is enabled (default on Linux)
21+
// io_uring write path — compiled in when feature is enabled on Linux.
2222
// Activated at runtime via lance_writer_enable_io_uring().
23-
#[cfg(feature = "io-uring")]
23+
// The module uses Unix-only APIs (AsRawFd, MetadataExt, /sys/dev/block/…)
24+
// so it must be guarded by both the feature flag and target_os.
25+
#[cfg(all(feature = "io-uring", target_os = "linux"))]
2426
mod io_uring_store;
2527

2628
fn apply_compression_metadata(schema: &Schema) -> Schema {
@@ -282,8 +284,8 @@ fn build_write_params_from(config: WriteParamsConfig, mode: WriteMode) -> WriteP
282284
params.skip_auto_cleanup = config.skip_auto_cleanup;
283285

284286
// Inject io_uring write path when requested at runtime (--io-uring CLI flag).
285-
// The feature must be compiled in (default on Linux) for this to have any effect.
286-
#[cfg(feature = "io-uring")]
287+
// Only available on Linux (io_uring is a Linux-specific syscall).
288+
#[cfg(all(feature = "io-uring", target_os = "linux"))]
287289
if config.use_io_uring {
288290
let mut store_params = lance_io::object_store::ObjectStoreParams::default();
289291
store_params.object_store_wrapper = Some(Arc::new(io_uring_store::IoUringWrapper));

0 commit comments

Comments
 (0)