-
Notifications
You must be signed in to change notification settings - Fork 4
feat: distributed vector search via index segment selection #24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
e45e42e
9c75e1e
dd6dd79
bd430e5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,9 +8,10 @@ | |
|
|
||
| use std::ffi::{CString, c_char}; | ||
|
|
||
| use lance::index::DatasetIndexExt; | ||
| use lance_core::Result; | ||
| use lance_index::IndexType; | ||
| use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams}; | ||
| use lance_index::{DatasetIndexExt, IndexType}; | ||
|
|
||
| use crate::dataset::LanceDataset; | ||
| use crate::error::{LanceErrorCode, ffi_try, set_last_error}; | ||
|
|
@@ -153,6 +154,113 @@ pub unsafe extern "C" fn lance_dataset_index_count(dataset: *const LanceDataset) | |
| } | ||
| } | ||
|
|
||
| /// Count the segments that make up a logical index. | ||
| /// | ||
| /// A logical index is a set of physical segments (one per distributed-build worker | ||
| /// or one per fragment range). Each segment has a stable UUID. Returns 0 if the | ||
| /// index does not exist (also sets `LANCE_ERR_NOT_FOUND`). | ||
| #[unsafe(no_mangle)] | ||
| pub unsafe extern "C" fn lance_dataset_index_segment_count( | ||
| dataset: *const LanceDataset, | ||
| index_name: *const c_char, | ||
| ) -> u64 { | ||
| if dataset.is_null() || index_name.is_null() { | ||
| set_last_error( | ||
| LanceErrorCode::InvalidArgument, | ||
| "dataset and index_name must not be NULL", | ||
| ); | ||
| return 0; | ||
| } | ||
| let ds = unsafe { &*dataset }; | ||
| let name = match unsafe { helpers::parse_c_string(index_name) } { | ||
| Ok(Some(s)) => s, | ||
| Ok(None) => { | ||
| set_last_error( | ||
| LanceErrorCode::InvalidArgument, | ||
| "index_name must not be empty", | ||
| ); | ||
| return 0; | ||
| } | ||
| Err(err) => { | ||
| crate::error::set_lance_error(&err); | ||
| return 0; | ||
| } | ||
| }; | ||
| let snap = ds.snapshot(); | ||
| match block_on(snap.load_indices()) { | ||
| Ok(indices) => { | ||
| let count = indices.iter().filter(|i| i.name == name).count(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| if count == 0 { | ||
| set_last_error( | ||
| LanceErrorCode::NotFound, | ||
| format!("index '{}' not found", name), | ||
| ); | ||
| return 0; | ||
| } | ||
| crate::error::clear_last_error(); | ||
| count as u64 | ||
| } | ||
| Err(err) => { | ||
| crate::error::set_lance_error(&err); | ||
| 0 | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Fill `out_uuids` with the UUIDs of the segments that make up a logical index. | ||
| /// | ||
| /// Each UUID is written as 16 raw bytes (RFC 4122 layout). The caller must | ||
| /// allocate at least `lance_dataset_index_segment_count() * 16` bytes. | ||
| /// | ||
| /// Returns 0 on success, -1 on error. | ||
| #[unsafe(no_mangle)] | ||
| pub unsafe extern "C" fn lance_dataset_index_segments( | ||
| dataset: *const LanceDataset, | ||
| index_name: *const c_char, | ||
| out_uuids: *mut u8, | ||
| ) -> i32 { | ||
| ffi_try!( | ||
| unsafe { dataset_index_segments_inner(dataset, index_name, out_uuids) }, | ||
| neg | ||
| ) | ||
| } | ||
|
|
||
| unsafe fn dataset_index_segments_inner( | ||
| dataset: *const LanceDataset, | ||
| index_name: *const c_char, | ||
| out_uuids: *mut u8, | ||
| ) -> Result<i32> { | ||
| if dataset.is_null() || index_name.is_null() || out_uuids.is_null() { | ||
| return Err(lance_core::Error::InvalidInput { | ||
| source: "dataset, index_name, and out_uuids must not be NULL".into(), | ||
| location: snafu::location!(), | ||
| }); | ||
| } | ||
| let ds = unsafe { &*dataset }; | ||
| let name = unsafe { helpers::parse_c_string(index_name)? }.ok_or_else(|| { | ||
| lance_core::Error::InvalidInput { | ||
| source: "index_name must not be empty".into(), | ||
| location: snafu::location!(), | ||
| } | ||
| })?; | ||
| let snap = ds.snapshot(); | ||
| let indices = block_on(snap.load_indices())?; | ||
| let segments: Vec<_> = indices.iter().filter(|i| i.name == name).collect(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
| if segments.is_empty() { | ||
| return Err(lance_core::Error::IndexNotFound { | ||
| identity: format!("name='{}'", name), | ||
| location: snafu::location!(), | ||
| }); | ||
| } | ||
| for (i, seg) in segments.iter().enumerate() { | ||
| let bytes = seg.uuid.as_bytes(); | ||
| unsafe { | ||
| std::ptr::copy_nonoverlapping(bytes.as_ptr(), out_uuids.add(i * 16), 16); | ||
| } | ||
| } | ||
| Ok(0) | ||
| } | ||
|
|
||
| /// Drop an index by name. | ||
| #[unsafe(no_mangle)] | ||
| pub unsafe extern "C" fn lance_dataset_drop_index( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,7 @@ use lance::dataset::scanner::DatasetRecordBatchStream; | |
| use lance_core::Result; | ||
| use lance_io::ffi::to_ffi_arrow_array_stream; | ||
| use lance_io::stream::RecordBatchStream; | ||
| use uuid::Uuid; | ||
|
|
||
| use crate::async_dispatcher::{self, LanceCallback}; | ||
| use crate::batch::LanceBatch; | ||
|
|
@@ -46,6 +47,7 @@ pub struct LanceScanner { | |
| batch_size: Option<usize>, | ||
| with_row_id: bool, | ||
| fragment_ids: Option<Vec<u64>>, | ||
| index_segments: Option<Vec<Uuid>>, | ||
| nearest: Option<NearestQuery>, | ||
| nprobes: Option<u32>, | ||
| refine_factor: Option<u32>, | ||
|
|
@@ -95,6 +97,7 @@ impl LanceScanner { | |
| batch_size: None, | ||
| with_row_id: false, | ||
| fragment_ids: None, | ||
| index_segments: None, | ||
| nearest: None, | ||
| nprobes: None, | ||
| refine_factor: None, | ||
|
|
@@ -161,6 +164,9 @@ impl LanceScanner { | |
| if self.prefilter { | ||
| scanner.prefilter(true); | ||
| } | ||
| if let Some(segments) = &self.index_segments { | ||
| scanner.with_index_segments(segments.clone())?; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The if let Some(n) = &self.nearest {
scanner.nearest(&n.column, n.query.as_ref(), n.k as usize)?;
...
if let Some(segments) = &self.index_segments {
scanner.with_index_segments(segments.clone())?;
}
}If a caller invokes Recommended fix. Either:
Option 1 is more flexible (allows a builder to set segments before nearest); option 2 fails earlier and is closer to the rest of the file's style. |
||
| } | ||
| } | ||
| let stream = block_on(scanner.try_into_stream())?; | ||
| self.schema = Some(stream.schema()); | ||
|
|
@@ -207,6 +213,9 @@ impl LanceScanner { | |
| if self.prefilter { | ||
| scanner.prefilter(true); | ||
| } | ||
| if let Some(segments) = &self.index_segments { | ||
| scanner.with_index_segments(segments.clone())?; | ||
| } | ||
| } | ||
| Ok(scanner) | ||
| } | ||
|
|
@@ -727,6 +736,52 @@ pub unsafe extern "C" fn lance_scanner_set_prefilter( | |
| 0 | ||
| } | ||
|
|
||
| /// Restrict the next `nearest()` query to a specific subset of vector index segments. | ||
| /// | ||
| /// Each segment is a 16-byte UUID (RFC 4122 layout). Pass an array of `len` | ||
| /// 16-byte buffers concatenated end-to-end (so the total byte length is `len * 16`). | ||
| /// Used by distributed query engines (e.g. Velox) to fan k-NN out across workers, | ||
| /// each handling a slice of segments. The coordinator gets the segment list via | ||
| /// `lance_dataset_index_segments()`. | ||
| /// | ||
| /// Calling with `len == 0` clears the segment restriction. | ||
| /// | ||
| /// Returns 0 on success, -1 on error. | ||
| #[unsafe(no_mangle)] | ||
| pub unsafe extern "C" fn lance_scanner_set_index_segments( | ||
| scanner: *mut LanceScanner, | ||
| segment_uuids: *const u8, | ||
| len: usize, | ||
| ) -> i32 { | ||
| if scanner.is_null() { | ||
| set_last_error(LanceErrorCode::InvalidArgument, "scanner is NULL"); | ||
| return -1; | ||
| } | ||
| if segment_uuids.is_null() && len > 0 { | ||
| set_last_error( | ||
| LanceErrorCode::InvalidArgument, | ||
| "segment_uuids is NULL but len > 0", | ||
| ); | ||
| return -1; | ||
| } | ||
| let s = unsafe { &mut *scanner }; | ||
| if len == 0 { | ||
| s.index_segments = None; | ||
| } else { | ||
| let mut uuids = Vec::with_capacity(len); | ||
| for i in 0..len { | ||
| let mut bytes = [0u8; 16]; | ||
| unsafe { | ||
| std::ptr::copy_nonoverlapping(segment_uuids.add(i * 16), bytes.as_mut_ptr(), 16); | ||
| } | ||
| uuids.push(Uuid::from_bytes(bytes)); | ||
| } | ||
| s.index_segments = Some(uuids); | ||
| } | ||
| crate::error::clear_last_error(); | ||
| 0 | ||
| } | ||
|
|
||
| // --------------------------------------------------------------------------- | ||
| // Vector search (Phase 2): k-NN query setter | ||
| // --------------------------------------------------------------------------- | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The buffer is sized by the caller using a separate
lance_dataset_index_segment_count()call. The implementation reloads the snapshot independently in each call (snap.load_indices()is invoked twice), and the inner loop writescount * 16bytes without any capacity check.The C++ wrapper makes this two-call pattern explicit:
Between call #1 and call #2, a concurrent writer could commit a new segment for the same logical index — exactly the distributed-build use case mentioned in the follow-ups section of the PR description. The second snapshot would then return more segments than the first, and the inner loop at
src/index.rs:255–260would overrun the caller's buffer:There is no
SAFETY:comment justifying whyout_uuidsis large enough.Possible Fixes
Adopt the well-established "capacity in, count out" FFI pattern (commonly seen in raw C APIs that fill caller-provided buffers):
Reuse
LANCE_ERR_INVALID_ARGUMENT(or introduce a new sentinel — the codebase currently has 8:LANCE_ERR_INVALID_ARGUMENT,LANCE_ERR_IO,LANCE_ERR_NOT_FOUND,LANCE_ERR_DATASET_ALREADY_EXISTS,LANCE_ERR_INDEX,LANCE_ERR_INTERNAL,LANCE_ERR_NOT_SUPPORTED,LANCE_ERR_COMMIT_CONFLICT) whencapacity < segments.len() * 16. This also lets callers do single-shot retrieval with a guess and re-allocate if needed, removing the two-snapshot anti-pattern entirely.Lighter-weight alternative: have a single Rust call return the count and a heap-allocated buffer with the segments. The codebase already exposes
lance_free_stringforCString-style strings; an analogouslance_free_uuid_buffer(or genericlance_free_bytes) would be a small, well-scoped addition. This eliminates caller-side sizing altogether at the cost of an extra allocation.