Skip to content

Commit b895d57

Browse files
authored
feat: add ANNIvfPartitionExecProto (#6612)
1 parent 073e8e7 commit b895d57

2 files changed

Lines changed: 93 additions & 2 deletions

File tree

protos/ann.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,10 @@ message ANNIvfSubIndexExecProto {
3939
lance.datafusion.TableIdentifier table = 2;
4040
repeated lance.table.IndexMetadata indices = 3;
4141
}
42+
43+
// Serializable form of ANNIvfPartitionExec — the IVF centroid routing node.
44+
message ANNIvfPartitionExecProto {
45+
VectorQueryProto query = 1;
46+
lance.datafusion.TableIdentifier table = 2;
47+
repeated string index_uuids = 3;
48+
}

rust/lance/src/io/exec/ann_proto.rs

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright The Lance Authors
33

4-
//! Protobuf serialization for [`ANNIvfSubIndexExec`].
4+
//! Protobuf serialization for [`ANNIvfPartitionExec`] and [`ANNIvfSubIndexExec`].
55
//!
66
//! Proto message definitions live in `crate::pb` (compiled from `ann.proto`).
77
//! Conversion functions live here because they need access to `ANNIvfSubIndexExec`
@@ -24,7 +24,7 @@ use lance_table::format::pb as table_pb;
2424
use crate::Dataset;
2525
use crate::pb;
2626

27-
use super::knn::ANNIvfSubIndexExec;
27+
use super::knn::{ANNIvfPartitionExec, ANNIvfSubIndexExec};
2828
use super::table_identifier::{resolve_dataset, table_identifier_from_dataset};
2929
use super::utils::PreFilterSource;
3030

@@ -130,6 +130,45 @@ pub fn query_from_proto(proto: pb::VectorQueryProto) -> Result<Query> {
130130
})
131131
}
132132

133+
// =============================================================================
134+
// ANNIvfPartitionExec <-> Proto
135+
// =============================================================================
136+
137+
/// Convert an [`ANNIvfPartitionExec`] to proto for serialization.
138+
pub async fn ann_ivf_partition_exec_to_proto(
139+
exec: &ANNIvfPartitionExec,
140+
) -> Result<pb::AnnIvfPartitionExecProto> {
141+
let table = table_identifier_from_dataset(&exec.dataset).await?;
142+
let query = query_to_proto(&exec.query)?;
143+
144+
Ok(pb::AnnIvfPartitionExecProto {
145+
query: Some(query),
146+
table: Some(table),
147+
index_uuids: exec.index_uuids.clone(),
148+
})
149+
}
150+
151+
/// Reconstruct an [`ANNIvfPartitionExec`] from proto.
152+
pub async fn ann_ivf_partition_exec_from_proto(
153+
proto: pb::AnnIvfPartitionExecProto,
154+
dataset: Option<Arc<Dataset>>,
155+
) -> Result<ANNIvfPartitionExec> {
156+
let dataset = resolve_dataset(dataset, proto.table.as_ref()).await?;
157+
158+
let query_proto = proto.query.ok_or_else(|| {
159+
Error::invalid_input_source("Missing VectorQueryProto in ANNIvfPartitionExecProto".into())
160+
})?;
161+
let query = query_from_proto(query_proto)?;
162+
163+
if proto.index_uuids.is_empty() {
164+
return Err(Error::invalid_input_source(
165+
"ANNIvfPartitionExecProto contains no index UUIDs".into(),
166+
));
167+
}
168+
169+
ANNIvfPartitionExec::try_new(dataset, proto.index_uuids, query)
170+
}
171+
133172
// =============================================================================
134173
// ANNIvfSubIndexExec <-> Proto
135174
// =============================================================================
@@ -330,6 +369,51 @@ mod tests {
330369
(Arc::new(ds), dir)
331370
}
332371

372+
#[tokio::test]
373+
async fn test_ann_ivf_partition_proto_roundtrip() {
374+
let (dataset, _dir) = make_indexed_dataset().await;
375+
376+
let indices = dataset.load_indices_by_name("vector_idx").await.unwrap();
377+
assert!(!indices.is_empty());
378+
379+
let key: ArrayRef = Arc::new(Float32Array::from(vec![0.1f32; 128]));
380+
let query = Query {
381+
column: "vector".to_string(),
382+
key,
383+
k: 10,
384+
lower_bound: None,
385+
upper_bound: None,
386+
minimum_nprobes: 2,
387+
maximum_nprobes: Some(4),
388+
ef: None,
389+
refine_factor: Some(2),
390+
metric_type: Some(DistanceType::L2),
391+
use_index: true,
392+
dist_q_c: 0.0,
393+
};
394+
395+
let exec = ANNIvfPartitionExec::try_new(
396+
dataset.clone(),
397+
indices.iter().map(|idx| idx.uuid.to_string()).collect(),
398+
query,
399+
)
400+
.unwrap();
401+
402+
let proto = ann_ivf_partition_exec_to_proto(&exec).await.unwrap();
403+
assert_eq!(proto.index_uuids.len(), indices.len());
404+
405+
let back = ann_ivf_partition_exec_from_proto(proto, Some(dataset.clone()))
406+
.await
407+
.unwrap();
408+
409+
assert_eq!(back.query.column, "vector");
410+
assert_eq!(back.query.k, 10);
411+
assert_eq!(back.query.minimum_nprobes, 2);
412+
assert_eq!(back.query.refine_factor, Some(2));
413+
assert_eq!(back.index_uuids.len(), indices.len());
414+
assert_eq!(back.dataset.uri(), dataset.uri());
415+
}
416+
333417
#[tokio::test]
334418
async fn test_ann_ivf_sub_index_proto_roundtrip() {
335419
let (dataset, _dir) = make_indexed_dataset().await;

0 commit comments

Comments
 (0)