Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
614 changes: 317 additions & 297 deletions Cargo.lock

Large diffs are not rendered by default.

19 changes: 10 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ rust-version = "1.91.0"
crate-type = ["cdylib", "staticlib", "rlib"]

[dependencies]
lance = "3.0.1"
lance-core = "3.0.1"
lance-index = "3.0.1"
lance-io = "3.0.1"
lance-linalg = "3.0.1"
lance = { git = "https://github.com/lance-format/lance.git", rev = "d630106d" }
lance-core = { git = "https://github.com/lance-format/lance.git", rev = "d630106d" }
lance-index = { git = "https://github.com/lance-format/lance.git", rev = "d630106d" }
lance-io = { git = "https://github.com/lance-format/lance.git", rev = "d630106d" }
lance-linalg = { git = "https://github.com/lance-format/lance.git", rev = "d630106d" }
arrow = { version = "57.0.0", features = ["prettyprint", "ffi"] }
arrow-array = "57.0.0"
arrow-schema = "57.0.0"
Expand All @@ -32,12 +32,13 @@ futures = "0.3"
log = "0.4"
pin-project = "1.0"
snafu = "0.9"
uuid = { version = "1", features = ["v4"] }

[dev-dependencies]
lance = "3.0.1"
lance-datagen = "3.0.1"
lance-file = "3.0.1"
lance-table = "3.0.1"
lance = { git = "https://github.com/lance-format/lance.git", rev = "d630106d" }
lance-datagen = { git = "https://github.com/lance-format/lance.git", rev = "d630106d" }
lance-file = { git = "https://github.com/lance-format/lance.git", rev = "d630106d" }
lance-table = { git = "https://github.com/lance-format/lance.git", rev = "d630106d" }
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
arrow-array = "57.0.0"
arrow-schema = "57.0.0"
Expand Down
47 changes: 47 additions & 0 deletions include/lance.h
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,33 @@ uint64_t lance_dataset_index_count(const LanceDataset* dataset);
*/
const char* lance_dataset_index_list_json(const LanceDataset* dataset);

/* ─── Distributed vector search: index segment enumeration ─── */

/**
* Count the segments that make up a logical vector index.
*
* A logical index is a set of physical segments (one per distributed-build
* worker, or one per fragment range). Each segment has a stable UUID. Returns
* 0 if the index does not exist (also sets `LANCE_ERR_NOT_FOUND`) or on error.
*/
uint64_t lance_dataset_index_segment_count(
const LanceDataset* dataset,
const char* index_name
);

/**
* Fill `out_uuids` with the UUIDs of the segments that make up a logical index.
* Each UUID is written as 16 raw bytes (RFC 4122 layout). The caller must
* allocate at least `lance_dataset_index_segment_count() * 16` bytes.
*
* Returns 0 on success, -1 on error.
*/
int32_t lance_dataset_index_segments(
const LanceDataset* dataset,
const char* index_name,
uint8_t* out_uuids
);
Copy link
Copy Markdown
Contributor

@LuciferYang LuciferYang Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The buffer is sized by the caller using a separate lance_dataset_index_segment_count() call. The implementation reloads the snapshot independently in each call (snap.load_indices() is invoked twice), and the inner loop writes count * 16 bytes without any capacity check.

The C++ wrapper makes this two-call pattern explicit:

uint64_t count = index_segment_count(index_name);  // call #1: snapshot load
std::vector<std::array<uint8_t, 16>> out(count);
... lance_dataset_index_segments(...)              // call #2: snapshot load

Between call #1 and call #2, a concurrent writer could commit a new segment for the same logical index — exactly the distributed-build use case mentioned in the follow-ups section of the PR description. The second snapshot would then return more segments than the first, and the inner loop at src/index.rs:255–260 would overrun the caller's buffer:

for (i, seg) in segments.iter().enumerate() {
    let bytes = seg.uuid.as_bytes();
    unsafe {
        std::ptr::copy_nonoverlapping(bytes.as_ptr(), out_uuids.add(i * 16), 16);
    }
}

There is no SAFETY: comment justifying why out_uuids is large enough.


Possible Fixes

Adopt the well-established "capacity in, count out" FFI pattern (commonly seen in raw C APIs that fill caller-provided buffers):

int32_t lance_dataset_index_segments(
    const LanceDataset* dataset,
    const char* index_name,
    uint8_t* out_uuids,
    size_t capacity,        /* bytes available in out_uuids */
    uint64_t* out_count     /* how many UUIDs were actually written */
);

Reuse LANCE_ERR_INVALID_ARGUMENT (or introduce a new sentinel — the codebase currently has 8: LANCE_ERR_INVALID_ARGUMENT, LANCE_ERR_IO, LANCE_ERR_NOT_FOUND, LANCE_ERR_DATASET_ALREADY_EXISTS, LANCE_ERR_INDEX, LANCE_ERR_INTERNAL, LANCE_ERR_NOT_SUPPORTED, LANCE_ERR_COMMIT_CONFLICT) when capacity < segments.len() * 16. This also lets callers do single-shot retrieval with a guess and re-allocate if needed, removing the two-snapshot anti-pattern entirely.

Lighter-weight alternative: have a single Rust call return the count and a heap-allocated buffer with the segments. The codebase already exposes lance_free_string for CString-style strings; an analogous lance_free_uuid_buffer (or generic lance_free_bytes) would be a small, well-scoped addition. This eliminates caller-side sizing altogether at the cost of an extra allocation.


/* ─── Vector search (Phase 2) ─── */

/**
Expand Down Expand Up @@ -460,6 +487,26 @@ int32_t lance_scanner_set_metric(LanceScanner* scanner, LanceMetricType metric);
int32_t lance_scanner_set_use_index(LanceScanner* scanner, bool enable);
int32_t lance_scanner_set_prefilter(LanceScanner* scanner, bool enable);

/**
* Restrict the next k-NN query to a specific subset of vector index segments.
*
* Used by distributed query engines (e.g. Velox) to fan a single k-NN query
* out across workers, each handling a slice of segments. The coordinator gets
* the segment list via `lance_dataset_index_segments()`.
*
* @param segment_uuids Pointer to `len` 16-byte UUIDs concatenated end-to-end
* (total byte length = `len * 16`). Each UUID identifies
* one physical segment of a logical index.
* @param len Number of UUIDs. Pass 0 (and segment_uuids may be NULL)
* to clear any previously-set segment restriction.
* @return 0 on success, -1 on error.
*/
int32_t lance_scanner_set_index_segments(
LanceScanner* scanner,
const uint8_t* segment_uuids,
size_t len
);

#ifdef __cplusplus
} /* extern "C" */
#endif
Expand Down
40 changes: 40 additions & 0 deletions include/lance.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "lance.h"

#include <array>
#include <cstdint>
#include <memory>
#include <stdexcept>
#include <string>
Expand Down Expand Up @@ -259,6 +261,29 @@ class Dataset {
return out;
}

/// Number of segments that make up a logical vector index.
/// Throws lance::Error with code NotFound if the index does not exist.
uint64_t index_segment_count(const std::string& index_name) const {
uint64_t n = lance_dataset_index_segment_count(handle_.get(), index_name.c_str());
if (n == 0 && lance_last_error_code() != LANCE_OK) check_error();
return n;
}

/// UUIDs of the physical segments that make up a logical vector index.
/// Each UUID is a 16-byte array (RFC 4122 layout). Used by distributed
/// query engines to fan k-NN out across workers — see
/// `Scanner::index_segments`.
std::vector<std::array<uint8_t, 16>> index_segments(const std::string& index_name) const {
uint64_t count = index_segment_count(index_name);
std::vector<std::array<uint8_t, 16>> out(count);
if (count == 0) return out;
// out is contiguous: 16 bytes per element.
if (lance_dataset_index_segments(handle_.get(), index_name.c_str(),
reinterpret_cast<uint8_t*>(out.data())) != 0)
check_error();
return out;
}

/// Access the underlying C handle (does not transfer ownership).
const LanceDataset* c_handle() const { return handle_.get(); }

Expand Down Expand Up @@ -314,6 +339,21 @@ class Scanner {
return fragment_ids(ids.data(), ids.size());
}

/// Restrict the next k-NN query to a subset of vector index segments.
/// Pass `len` 16-byte UUIDs concatenated as a single byte buffer
/// (total bytes = `len * 16`). Pass len=0 (and any pointer) to clear.
Scanner& index_segments(const uint8_t* uuids, size_t len) {
if (lance_scanner_set_index_segments(handle_.get(), uuids, len) != 0)
check_error();
return *this;
}

/// Restrict the next k-NN query to a subset of vector index segments
/// (typed vector overload).
Scanner& index_segments(const std::vector<std::array<uint8_t, 16>>& uuids) {
return index_segments(reinterpret_cast<const uint8_t*>(uuids.data()), uuids.size());
}

/// Materialize the scan as an ArrowArrayStream (blocking).
void to_arrow_stream(ArrowArrayStream* out) {
if (lance_scanner_to_arrow_stream(handle_.get(), out) != 0)
Expand Down
110 changes: 109 additions & 1 deletion src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@

use std::ffi::{CString, c_char};

use lance::index::DatasetIndexExt;
use lance_core::Result;
use lance_index::IndexType;
use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams};
use lance_index::{DatasetIndexExt, IndexType};

use crate::dataset::LanceDataset;
use crate::error::{LanceErrorCode, ffi_try, set_last_error};
Expand Down Expand Up @@ -153,6 +154,113 @@ pub unsafe extern "C" fn lance_dataset_index_count(dataset: *const LanceDataset)
}
}

/// Count the segments that make up a logical index.
///
/// A logical index is a set of physical segments (one per distributed-build worker
/// or one per fragment range). Each segment has a stable UUID. Returns 0 if the
/// index does not exist (also sets `LANCE_ERR_NOT_FOUND`).
#[unsafe(no_mangle)]
pub unsafe extern "C" fn lance_dataset_index_segment_count(
dataset: *const LanceDataset,
index_name: *const c_char,
) -> u64 {
if dataset.is_null() || index_name.is_null() {
set_last_error(
LanceErrorCode::InvalidArgument,
"dataset and index_name must not be NULL",
);
return 0;
}
let ds = unsafe { &*dataset };
let name = match unsafe { helpers::parse_c_string(index_name) } {
Ok(Some(s)) => s,
Ok(None) => {
set_last_error(
LanceErrorCode::InvalidArgument,
"index_name must not be empty",
);
return 0;
}
Err(err) => {
crate::error::set_lance_error(&err);
return 0;
}
};
let snap = ds.snapshot();
match block_on(snap.load_indices()) {
Ok(indices) => {
let count = indices.iter().filter(|i| i.name == name).count();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lance_dataset_index_count excludes system indexes (!is_system_index). The new lance_dataset_index_segment_count does not. If a system index ever shares a name with a user-visible index, the count silently includes it and lance_dataset_index_segments emits its UUIDs, which a worker may then attempt to query.

if count == 0 {
set_last_error(
LanceErrorCode::NotFound,
format!("index '{}' not found", name),
);
return 0;
}
crate::error::clear_last_error();
count as u64
}
Err(err) => {
crate::error::set_lance_error(&err);
0
}
}
}

/// Fill `out_uuids` with the UUIDs of the segments that make up a logical index.
///
/// Each UUID is written as 16 raw bytes (RFC 4122 layout). The caller must
/// allocate at least `lance_dataset_index_segment_count() * 16` bytes.
///
/// Returns 0 on success, -1 on error.
#[unsafe(no_mangle)]
pub unsafe extern "C" fn lance_dataset_index_segments(
dataset: *const LanceDataset,
index_name: *const c_char,
out_uuids: *mut u8,
) -> i32 {
ffi_try!(
unsafe { dataset_index_segments_inner(dataset, index_name, out_uuids) },
neg
)
}

unsafe fn dataset_index_segments_inner(
dataset: *const LanceDataset,
index_name: *const c_char,
out_uuids: *mut u8,
) -> Result<i32> {
if dataset.is_null() || index_name.is_null() || out_uuids.is_null() {
return Err(lance_core::Error::InvalidInput {
source: "dataset, index_name, and out_uuids must not be NULL".into(),
location: snafu::location!(),
});
}
let ds = unsafe { &*dataset };
let name = unsafe { helpers::parse_c_string(index_name)? }.ok_or_else(|| {
lance_core::Error::InvalidInput {
source: "index_name must not be empty".into(),
location: snafu::location!(),
}
})?;
let snap = ds.snapshot();
let indices = block_on(snap.load_indices())?;
let segments: Vec<_> = indices.iter().filter(|i| i.name == name).collect();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

if segments.is_empty() {
return Err(lance_core::Error::IndexNotFound {
identity: format!("name='{}'", name),
location: snafu::location!(),
});
}
for (i, seg) in segments.iter().enumerate() {
let bytes = seg.uuid.as_bytes();
unsafe {
std::ptr::copy_nonoverlapping(bytes.as_ptr(), out_uuids.add(i * 16), 16);
}
}
Ok(0)
}

/// Drop an index by name.
#[unsafe(no_mangle)]
pub unsafe extern "C" fn lance_dataset_drop_index(
Expand Down
55 changes: 55 additions & 0 deletions src/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use lance::dataset::scanner::DatasetRecordBatchStream;
use lance_core::Result;
use lance_io::ffi::to_ffi_arrow_array_stream;
use lance_io::stream::RecordBatchStream;
use uuid::Uuid;

use crate::async_dispatcher::{self, LanceCallback};
use crate::batch::LanceBatch;
Expand Down Expand Up @@ -46,6 +47,7 @@ pub struct LanceScanner {
batch_size: Option<usize>,
with_row_id: bool,
fragment_ids: Option<Vec<u64>>,
index_segments: Option<Vec<Uuid>>,
nearest: Option<NearestQuery>,
nprobes: Option<u32>,
refine_factor: Option<u32>,
Expand Down Expand Up @@ -95,6 +97,7 @@ impl LanceScanner {
batch_size: None,
with_row_id: false,
fragment_ids: None,
index_segments: None,
nearest: None,
nprobes: None,
refine_factor: None,
Expand Down Expand Up @@ -161,6 +164,9 @@ impl LanceScanner {
if self.prefilter {
scanner.prefilter(true);
}
if let Some(segments) = &self.index_segments {
scanner.with_index_segments(segments.clone())?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The with_index_segments(...) call is placed inside if let Some(n) = &self.nearest { ... }:

if let Some(n) = &self.nearest {
    scanner.nearest(&n.column, n.query.as_ref(), n.k as usize)?;
    ...
    if let Some(segments) = &self.index_segments {
        scanner.with_index_segments(segments.clone())?;
    }
}

If a caller invokes lance_scanner_set_index_segments(...) but never calls lance_scanner_nearest(...), the segment restriction is silently ignored — no error, no warning. For a distributed-query worker scanning the wrong segments, this is a correctness footgun.

Recommended fix. Either:

  1. Validate at materialize time — return an error if index_segments.is_some() && nearest.is_none() with a message such as "index_segments requires nearest() to be configured".
  2. Validate at setter time — in lance_scanner_set_index_segments, reject if s.nearest.is_none() and document the ordering requirement (consistent with the project's existing fail-fast guards such as the if k == 0 { ... } check inside scanner_nearest_inner).

Option 1 is more flexible (allows a builder to set segments before nearest); option 2 fails earlier and is closer to the rest of the file's style.

}
}
let stream = block_on(scanner.try_into_stream())?;
self.schema = Some(stream.schema());
Expand Down Expand Up @@ -207,6 +213,9 @@ impl LanceScanner {
if self.prefilter {
scanner.prefilter(true);
}
if let Some(segments) = &self.index_segments {
scanner.with_index_segments(segments.clone())?;
}
}
Ok(scanner)
}
Expand Down Expand Up @@ -727,6 +736,52 @@ pub unsafe extern "C" fn lance_scanner_set_prefilter(
0
}

/// Restrict the next `nearest()` query to a specific subset of vector index segments.
///
/// Each segment is a 16-byte UUID (RFC 4122 layout). Pass an array of `len`
/// 16-byte buffers concatenated end-to-end (so the total byte length is `len * 16`).
/// Used by distributed query engines (e.g. Velox) to fan k-NN out across workers,
/// each handling a slice of segments. The coordinator gets the segment list via
/// `lance_dataset_index_segments()`.
///
/// Calling with `len == 0` clears the segment restriction.
///
/// Returns 0 on success, -1 on error.
#[unsafe(no_mangle)]
pub unsafe extern "C" fn lance_scanner_set_index_segments(
scanner: *mut LanceScanner,
segment_uuids: *const u8,
len: usize,
) -> i32 {
if scanner.is_null() {
set_last_error(LanceErrorCode::InvalidArgument, "scanner is NULL");
return -1;
}
if segment_uuids.is_null() && len > 0 {
set_last_error(
LanceErrorCode::InvalidArgument,
"segment_uuids is NULL but len > 0",
);
return -1;
}
let s = unsafe { &mut *scanner };
if len == 0 {
s.index_segments = None;
} else {
let mut uuids = Vec::with_capacity(len);
for i in 0..len {
let mut bytes = [0u8; 16];
unsafe {
std::ptr::copy_nonoverlapping(segment_uuids.add(i * 16), bytes.as_mut_ptr(), 16);
}
uuids.push(Uuid::from_bytes(bytes));
}
s.index_segments = Some(uuids);
}
crate::error::clear_last_error();
0
}

// ---------------------------------------------------------------------------
// Vector search (Phase 2): k-NN query setter
// ---------------------------------------------------------------------------
Expand Down
Loading
Loading