Skip to content

Commit 9de723a

Browse files
committed
refactor(sqlite-provider): switch from pack_key to monotonic integer keys
SqliteLookupProvider treats keys as opaque INTEGER PRIMARY KEY and USearch treats them as opaque u64 labels — the bit-packed encoding via pack_key was unnecessary complexity. Replace with a simple monotonic counter (0, 1, 2, ...) which removes the DatasetLayout dependency from open_or_build/build_table and lifts the artificial 16-bit capacity limits on file_idx/rg_idx.
1 parent dba4786 commit 9de723a

2 files changed

Lines changed: 13 additions & 42 deletions

File tree

src/sqlite_provider.rs

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ use datafusion::physical_plan::{
3535
use rusqlite::{Connection, types::Value as SqlValue};
3636
use tokio::sync::Semaphore;
3737

38-
use crate::keys::{DatasetLayout, pack_key};
3938
use crate::lookup::PointLookupProvider;
4039

4140
// ── Provider ──────────────────────────────────────────────────────────────────
@@ -106,15 +105,13 @@ impl SqliteLookupProvider {
106105
/// parquet files on first run. Opens a pool of `pool_size` read
107106
/// connections (WAL allows N concurrent readers).
108107
///
109-
/// `local_parquet_files`, `layout`, `schema`, and `parquet_col_indices`
108+
/// `local_parquet_files`, `schema`, and `parquet_col_indices`
110109
/// are only used if the table does not yet exist.
111-
#[allow(clippy::too_many_arguments)]
112110
pub fn open_or_build(
113111
db_path: &str,
114112
table_name: &str,
115113
pool_size: usize,
116114
local_parquet_files: &[String],
117-
layout: &DatasetLayout,
118115
schema: SchemaRef,
119116
parquet_col_indices: &[usize],
120117
) -> DFResult<Self> {
@@ -156,7 +153,6 @@ impl SqliteLookupProvider {
156153
&conn,
157154
table_name,
158155
local_parquet_files,
159-
layout,
160156
&schema,
161157
parquet_col_indices,
162158
)?;
@@ -581,7 +577,6 @@ fn build_table(
581577
conn: &Connection,
582578
table_name: &str,
583579
parquet_files: &[String],
584-
layout: &DatasetLayout,
585580
schema: &SchemaRef,
586581
parquet_col_indices: &[usize],
587582
) -> DFResult<()> {
@@ -628,7 +623,9 @@ fn build_table(
628623
.prepare(&insert_sql)
629624
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
630625

631-
for (file_idx, file_path) in parquet_files.iter().enumerate() {
626+
let mut global_row_idx: u64 = 0;
627+
628+
for file_path in parquet_files {
632629
let f = std::fs::File::open(file_path)
633630
.map_err(|e| DataFusionError::Execution(format!("open {file_path}: {e}")))?;
634631
let builder = parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(f)
@@ -637,20 +634,17 @@ fn build_table(
637634
.with_batch_size(2048)
638635
.build()
639636
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
640-
let mut file_row: u64 = 0;
641637

642638
for batch_result in reader {
643639
let batch = batch_result.map_err(|e| DataFusionError::Execution(e.to_string()))?;
644640
let n = batch.num_rows();
645641

646642
for row_i in 0..n {
647-
let r = file_row + row_i as u64;
648-
let rg = layout.rg_cum_rows[file_idx].partition_point(|&s| s <= r) - 1;
649-
let lo = (r - layout.rg_cum_rows[file_idx][rg]) as usize;
650-
let packed_key = pack_key(file_idx, rg, lo);
643+
let key = global_row_idx;
644+
global_row_idx += 1;
651645

652646
let mut params: Vec<SqlValue> = Vec::with_capacity(schema.fields().len());
653-
params.push(SqlValue::Integer(packed_key as i64));
647+
params.push(SqlValue::Integer(key as i64));
654648

655649
for &ci in parquet_col_indices {
656650
params.push(arrow_cell_to_sql(batch.column(ci), row_i));
@@ -659,7 +653,6 @@ fn build_table(
659653
stmt.execute(rusqlite::params_from_iter(params.iter()))
660654
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
661655
}
662-
file_row += n as u64;
663656
}
664657
}
665658
}

tests/sqlite_provider_test.rs

Lines changed: 6 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@ use arrow_array::{Array, RecordBatch, StringArray, UInt64Array};
66
use arrow_schema::{DataType, Field, Schema};
77
use datafusion::catalog::TableProvider;
88
use datafusion::prelude::SessionContext;
9-
use datafusion_vector_search_ext::{
10-
DatasetLayout, PointLookupProvider, SqliteLookupProvider, pack_key,
11-
};
9+
use datafusion_vector_search_ext::{PointLookupProvider, SqliteLookupProvider};
1210
use parquet::arrow::ArrowWriter;
1311
use tempfile::tempdir;
1412

@@ -38,13 +36,6 @@ fn make_provider(dir: &tempfile::TempDir) -> SqliteLookupProvider {
3836
writer.write(&batch).unwrap();
3937
writer.close().unwrap();
4038

41-
// Build a minimal DatasetLayout for 1 file with 1 row group of 3 rows.
42-
let layout = DatasetLayout {
43-
file_keys: vec!["parquet/test.parquet".to_string()],
44-
file_cum_rows: vec![0, 3],
45-
rg_cum_rows: vec![vec![0, 3]],
46-
};
47-
4839
let db_path = dir.path().join("test.db");
4940
let parquet_files = vec![parquet_path.to_str().unwrap().to_string()];
5041

@@ -53,7 +44,6 @@ fn make_provider(dir: &tempfile::TempDir) -> SqliteLookupProvider {
5344
"models",
5445
4,
5546
&parquet_files,
56-
&layout,
5747
provider_schema,
5848
&[0], // parquet col 0 (name) → provider col 1
5949
)
@@ -65,10 +55,8 @@ async fn test_fetch_existing_keys() {
6555
let dir = tempdir().unwrap();
6656
let provider = make_provider(&dir);
6757

68-
let key0 = pack_key(0, 0, 0);
69-
let key2 = pack_key(0, 0, 2);
7058
let batches = provider
71-
.fetch_by_keys(&[key0, key2], "row_idx", None)
59+
.fetch_by_keys(&[0, 2], "row_idx", None)
7260
.await
7361
.unwrap();
7462

@@ -98,10 +86,9 @@ async fn test_projection() {
9886
let dir = tempdir().unwrap();
9987
let provider = make_provider(&dir);
10088

101-
let key1 = pack_key(0, 0, 1);
10289
// Project only row_idx (index 0).
10390
let batches = provider
104-
.fetch_by_keys(&[key1], "row_idx", Some(&[0]))
91+
.fetch_by_keys(&[1], "row_idx", Some(&[0]))
10592
.await
10693
.unwrap();
10794

@@ -114,17 +101,16 @@ async fn test_projection() {
114101
.as_any()
115102
.downcast_ref::<UInt64Array>()
116103
.unwrap();
117-
assert_eq!(row_idx_col.value(0), key1);
104+
assert_eq!(row_idx_col.value(0), 1);
118105
}
119106

120107
#[tokio::test]
121108
async fn test_missing_keys_return_empty() {
122109
let dir = tempdir().unwrap();
123110
let provider = make_provider(&dir);
124111

125-
let missing = pack_key(0, 0, 99); // offset 99 doesn't exist
126112
let batches = provider
127-
.fetch_by_keys(&[missing], "row_idx", None)
113+
.fetch_by_keys(&[99], "row_idx", None)
128114
.await
129115
.unwrap();
130116
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
@@ -203,28 +189,20 @@ async fn test_table_name_with_spaces() {
203189
writer.write(&batch).unwrap();
204190
writer.close().unwrap();
205191

206-
let layout = DatasetLayout {
207-
file_keys: vec!["parquet/test.parquet".to_string()],
208-
file_cum_rows: vec![0, 1],
209-
rg_cum_rows: vec![vec![0, 1]],
210-
};
211-
212192
let db_path = dir.path().join("test.db");
213193
// Table name with spaces — previously this would have produced a SQL syntax error.
214194
let provider = SqliteLookupProvider::open_or_build(
215195
db_path.to_str().unwrap(),
216196
"my models",
217197
2,
218198
&[parquet_path.to_str().unwrap().to_string()],
219-
&layout,
220199
provider_schema,
221200
&[0],
222201
)
223202
.unwrap();
224203

225-
let key0 = pack_key(0, 0, 0);
226204
let batches = provider
227-
.fetch_by_keys(&[key0], "row_idx", None)
205+
.fetch_by_keys(&[0], "row_idx", None)
228206
.await
229207
.unwrap();
230208
assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 1);

0 commit comments

Comments
 (0)