Skip to content

Commit f3c4038

Browse files
committed
feat(sqlite): add streaming SqliteSidecarBuilder keyed by a column
open_or_build opens parquet files and synthesises monotonic 0..N keys. Add SqliteSidecarBuilder: an incremental begin/push_batch/finish builder that consumes RecordBatches and reads each row's key from a designated column, so a caller can build the sidecar from any batch source (e.g. a storage engine's native rowid) in a single bounded-memory pass without materialising an intermediate parquet file. Shares CREATE/INSERT DDL and row-param construction with the parquet path via new ddl()/row_to_params() helpers; build_table behaviour is unchanged. One transaction wraps the build; an abandoned builder rolls back on drop. Keys are stored as INTEGER PRIMARY KEY, preserving the B-tree point-lookup performance.
1 parent 00a6dbc commit f3c4038

3 files changed

Lines changed: 302 additions & 24 deletions

File tree

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ pub use udtf::VectorSearchVectorUDTF;
8585
#[cfg(feature = "parquet-provider")]
8686
pub use parquet_provider::ParquetLookupProvider;
8787
#[cfg(feature = "sqlite-provider")]
88-
pub use sqlite_provider::SqliteLookupProvider;
88+
pub use sqlite_provider::{SqliteLookupProvider, SqliteSidecarBuilder};
8989

9090
use std::sync::Arc;
9191

src/sqlite_provider.rs

Lines changed: 219 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,144 @@ impl SqliteLookupProvider {
272272
}
273273
}
274274

275+
// ── Streaming sidecar builder ───────────────────────────────────────────────
276+
277+
/// Incremental builder for a [`SqliteLookupProvider`] backed by a stream of
278+
/// [`RecordBatch`]es instead of parquet files.
279+
///
280+
/// [`SqliteLookupProvider::open_or_build`] opens parquet files itself and
281+
/// synthesises monotonic `0..N` keys. This builder instead takes batches one at
282+
/// a time and reads each row's key from a designated column (e.g. a storage
283+
/// engine's native `rowid`). That lets a caller drive a single pass over a
284+
/// source — fanning each batch out to both a USearch index and this sidecar —
285+
/// without first materialising an intermediate parquet file.
286+
///
287+
/// Memory is bounded: [`push_batch`](Self::push_batch) inserts a batch's rows
288+
/// and returns, accumulating nothing across batches. All inserts share one
289+
/// transaction (opened in [`begin`](Self::begin), committed in
290+
/// [`finish`](Self::finish)); dropping the builder before `finish` rolls the
291+
/// transaction back, so a half-built table is never persisted.
292+
///
293+
/// The first field of `schema` is the key column, created as
294+
/// `INTEGER PRIMARY KEY` (the rowid-alias B-tree), matching `open_or_build`.
295+
pub struct SqliteSidecarBuilder {
296+
conn: Connection,
297+
db_path: String,
298+
table_name: String,
299+
schema: SchemaRef,
300+
pool_size: usize,
301+
insert_sql: String,
302+
key_col_index: usize,
303+
value_col_indices: Vec<usize>,
304+
}
305+
306+
impl SqliteSidecarBuilder {
307+
/// Begin a build at `db_path`: open the connection, start the transaction,
308+
/// and create the table.
309+
///
310+
/// `schema` is the output SQLite schema — field 0 is the key column
311+
/// (`INTEGER PRIMARY KEY`), fields 1.. are the stored value columns.
312+
/// `key_col_index` and `value_col_indices` index into the *input* batches
313+
/// passed to [`push_batch`](Self::push_batch): `key_col_index` is the
314+
/// column holding the row key, and `value_col_indices` map input columns to
315+
/// schema fields 1.. in order.
316+
pub fn begin(
317+
db_path: &str,
318+
table_name: &str,
319+
pool_size: usize,
320+
schema: SchemaRef,
321+
key_col_index: usize,
322+
value_col_indices: Vec<usize>,
323+
) -> DFResult<Self> {
324+
if pool_size == 0 {
325+
return Err(DataFusionError::Execution(
326+
"pool_size must be at least 1".into(),
327+
));
328+
}
329+
if schema.fields().len() != value_col_indices.len() + 1 {
330+
return Err(DataFusionError::Execution(format!(
331+
"schema has {} fields but expected 1 key column + {} value columns",
332+
schema.fields().len(),
333+
value_col_indices.len()
334+
)));
335+
}
336+
let (create_sql, insert_sql) = ddl(table_name, &schema);
337+
let conn = open_conn(db_path)?;
338+
// Manual BEGIN/COMMIT rather than a borrowed `Transaction` so the
339+
// transaction can span many push_batch calls without a self-referential
340+
// borrow. An uncommitted transaction is rolled back when `conn` is
341+
// dropped, so an abandoned build leaves no half-written table.
342+
conn.execute_batch("BEGIN;")
343+
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
344+
conn.execute_batch(&create_sql)
345+
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
346+
Ok(Self {
347+
conn,
348+
db_path: db_path.to_string(),
349+
table_name: table_name.to_string(),
350+
schema,
351+
pool_size,
352+
insert_sql,
353+
key_col_index,
354+
value_col_indices,
355+
})
356+
}
357+
358+
/// Insert every row of `batch`. The key is read from `key_col_index`; the
359+
/// value columns from `value_col_indices`. Rows are inserted as they are
360+
/// read — nothing is buffered, so peak memory is O(one batch).
361+
pub fn push_batch(&mut self, batch: &RecordBatch) -> DFResult<()> {
362+
let ncols = batch.num_columns();
363+
if self.key_col_index >= ncols {
364+
return Err(DataFusionError::Execution(format!(
365+
"key_col_index {} out of range for batch with {ncols} columns",
366+
self.key_col_index
367+
)));
368+
}
369+
let key_col = batch.column(self.key_col_index);
370+
let mut stmt = self
371+
.conn
372+
.prepare_cached(&self.insert_sql)
373+
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
374+
for row in 0..batch.num_rows() {
375+
let key = extract_key(key_col, row)?;
376+
let params = row_to_params(key, batch, &self.value_col_indices, row);
377+
stmt.execute(rusqlite::params_from_iter(params.iter()))
378+
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
379+
}
380+
Ok(())
381+
}
382+
383+
/// Commit the build, checkpoint the WAL, and open the read connection pool.
384+
pub fn finish(self) -> DFResult<SqliteLookupProvider> {
385+
self.conn
386+
.execute_batch("COMMIT;")
387+
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
388+
// Flush WAL to the main db so the data survives process exit (matches
389+
// open_or_build).
390+
self.conn
391+
.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
392+
.map_err(|e| DataFusionError::Execution(format!("WAL checkpoint failed: {e}")))?;
393+
tracing::info!(
394+
"SQLite table '{}' built from stream and committed.",
395+
self.table_name
396+
);
397+
398+
let key_col = self.schema.field(0).name().clone();
399+
let mut conns = vec![self.conn];
400+
for _ in 1..self.pool_size {
401+
conns.push(open_conn(&self.db_path)?);
402+
}
403+
Ok(SqliteLookupProvider {
404+
schema: self.schema,
405+
table_name: self.table_name,
406+
key_col,
407+
pool: Arc::new(Mutex::new(conns)),
408+
sem: Arc::new(Semaphore::new(self.pool_size)),
409+
})
410+
}
411+
}
412+
275413
// ── PointLookupProvider ───────────────────────────────────────────────────────
276414

277415
#[async_trait]
@@ -682,14 +820,11 @@ fn build_scan_batch(schema: &SchemaRef, col_bufs: Vec<Vec<SqlValue>>) -> DFResul
682820

683821
// ── Build helpers ─────────────────────────────────────────────────────────────
684822

685-
fn build_table(
686-
conn: &Connection,
687-
table_name: &str,
688-
parquet_files: &[String],
689-
schema: &SchemaRef,
690-
parquet_col_indices: &[usize],
691-
) -> DFResult<()> {
692-
// The first field is the key column (INTEGER PRIMARY KEY).
823+
/// Build the `CREATE TABLE` and `INSERT` SQL for a sidecar table. The first
824+
/// schema field is the key column (`INTEGER PRIMARY KEY`, i.e. the rowid-alias
825+
/// B-tree); the rest are typed from their Arrow type. Shared by the parquet
826+
/// build path ([`build_table`]) and the streaming [`SqliteSidecarBuilder`].
827+
fn ddl(table_name: &str, schema: &SchemaRef) -> (String, String) {
693828
let key_col_name = schema.field(0).name();
694829
let col_defs = schema
695830
.fields()
@@ -698,12 +833,16 @@ fn build_table(
698833
if f.name() == key_col_name {
699834
format!("{} INTEGER PRIMARY KEY", quote_ident(f.name()))
700835
} else {
701-
let sql_type = arrow_type_to_sql(f.data_type());
702-
format!("{} {}", quote_ident(f.name()), sql_type)
836+
format!(
837+
"{} {}",
838+
quote_ident(f.name()),
839+
arrow_type_to_sql(f.data_type())
840+
)
703841
}
704842
})
705843
.collect::<Vec<_>>()
706844
.join(", ");
845+
let create_sql = format!("CREATE TABLE {} ({col_defs});", quote_ident(table_name));
707846

708847
let placeholders = schema
709848
.fields()
@@ -715,6 +854,73 @@ fn build_table(
715854
"INSERT INTO {} VALUES ({placeholders})",
716855
quote_ident(table_name)
717856
);
857+
(create_sql, insert_sql)
858+
}
859+
860+
/// Build the INSERT parameter row: the key (as SQLite INTEGER) first, then each
861+
/// value column in `value_col_indices` order.
862+
fn row_to_params(
863+
key: i64,
864+
batch: &RecordBatch,
865+
value_col_indices: &[usize],
866+
row: usize,
867+
) -> Vec<SqlValue> {
868+
let mut params: Vec<SqlValue> = Vec::with_capacity(value_col_indices.len() + 1);
869+
params.push(SqlValue::Integer(key));
870+
for &ci in value_col_indices {
871+
params.push(arrow_cell_to_sql(batch.column(ci), row));
872+
}
873+
params
874+
}
875+
876+
/// Read a non-null integer row key from `col` at `row`. Supports the 32/64-bit
877+
/// signed and unsigned integer types; the value is stored as SQLite INTEGER
878+
/// (i64), so a storage engine's native `rowid` (non-negative i64) round-trips
879+
/// through the `u64` lookup API unchanged.
880+
fn extract_key(col: &ArrayRef, row: usize) -> DFResult<i64> {
881+
if col.is_null(row) {
882+
return Err(DataFusionError::Execution(
883+
"key column has a null value; row keys must be non-null".into(),
884+
));
885+
}
886+
let key = match col.data_type() {
887+
DataType::Int64 => col
888+
.as_any()
889+
.downcast_ref::<Int64Array>()
890+
.unwrap()
891+
.value(row),
892+
DataType::UInt64 => col
893+
.as_any()
894+
.downcast_ref::<UInt64Array>()
895+
.unwrap()
896+
.value(row) as i64,
897+
DataType::Int32 => col
898+
.as_any()
899+
.downcast_ref::<Int32Array>()
900+
.unwrap()
901+
.value(row) as i64,
902+
DataType::UInt32 => col
903+
.as_any()
904+
.downcast_ref::<UInt32Array>()
905+
.unwrap()
906+
.value(row) as i64,
907+
other => {
908+
return Err(DataFusionError::Execution(format!(
909+
"unsupported key column type {other:?}; expected an integer type"
910+
)));
911+
}
912+
};
913+
Ok(key)
914+
}
915+
916+
fn build_table(
917+
conn: &Connection,
918+
table_name: &str,
919+
parquet_files: &[String],
920+
schema: &SchemaRef,
921+
parquet_col_indices: &[usize],
922+
) -> DFResult<()> {
923+
let (create_sql, insert_sql) = ddl(table_name, schema);
718924

719925
// CREATE TABLE and all INSERTs share one transaction so a mid-build crash
720926
// leaves no half-built table. If the table exists with zero rows on the
@@ -724,11 +930,8 @@ fn build_table(
724930
.unchecked_transaction()
725931
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
726932
{
727-
tx.execute_batch(&format!(
728-
"CREATE TABLE {} ({col_defs});",
729-
quote_ident(table_name)
730-
))
731-
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
933+
tx.execute_batch(&create_sql)
934+
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
732935

733936
let mut stmt = tx
734937
.prepare(&insert_sql)
@@ -754,12 +957,7 @@ fn build_table(
754957
let key = global_row_idx;
755958
global_row_idx += 1;
756959

757-
let mut params: Vec<SqlValue> = Vec::with_capacity(schema.fields().len());
758-
params.push(SqlValue::Integer(key as i64));
759-
760-
for &ci in parquet_col_indices {
761-
params.push(arrow_cell_to_sql(batch.column(ci), row_i));
762-
}
960+
let params = row_to_params(key as i64, &batch, parquet_col_indices, row_i);
763961

764962
stmt.execute(rusqlite::params_from_iter(params.iter()))
765963
.map_err(|e| DataFusionError::Execution(e.to_string()))?;

tests/sqlite_provider_test.rs

Lines changed: 82 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@
22

33
use std::sync::Arc;
44

5-
use arrow_array::{Array, RecordBatch, StringArray, UInt64Array};
5+
use arrow_array::{Array, Int64Array, RecordBatch, StringArray, UInt64Array};
66
use arrow_schema::{DataType, Field, Schema};
77
use datafusion::catalog::TableProvider;
88
use datafusion::prelude::SessionContext;
9-
use datafusion_vector_search_ext::{PointLookupProvider, SqliteLookupProvider};
9+
use datafusion_vector_search_ext::{
10+
PointLookupProvider, SqliteLookupProvider, SqliteSidecarBuilder,
11+
};
1012
use parquet::arrow::ArrowWriter;
1113
use tempfile::tempdir;
1214

@@ -81,6 +83,84 @@ async fn test_fetch_existing_keys() {
8183
assert!(!names.contains(&"bob".to_string()));
8284
}
8385

86+
#[tokio::test]
87+
async fn test_stream_builder_with_explicit_rowid_keys() {
88+
// The streaming builder reads each row's key from a column (e.g. a storage
89+
// engine's native rowid) instead of synthesising 0..N. Keys here are sparse
90+
// and non-monotonic across two batches to prove that works end to end.
91+
let dir = tempdir().unwrap();
92+
93+
let batch_schema = Arc::new(Schema::new(vec![
94+
Field::new("rowid", DataType::Int64, false),
95+
Field::new("name", DataType::Utf8, true),
96+
]));
97+
// Output schema: key column first, then the stored value column.
98+
let provider_schema = Arc::new(Schema::new(vec![
99+
Field::new("rowid", DataType::Int64, false),
100+
Field::new("name", DataType::Utf8, true),
101+
]));
102+
103+
let b1 = RecordBatch::try_new(
104+
batch_schema.clone(),
105+
vec![
106+
Arc::new(Int64Array::from(vec![100_i64, 250])),
107+
Arc::new(StringArray::from(vec![Some("alice"), Some("bob")])),
108+
],
109+
)
110+
.unwrap();
111+
let b2 = RecordBatch::try_new(
112+
batch_schema.clone(),
113+
vec![
114+
Arc::new(Int64Array::from(vec![999_i64])),
115+
Arc::new(StringArray::from(vec![Some("carol")])),
116+
],
117+
)
118+
.unwrap();
119+
120+
let db_path = dir.path().join("stream.db");
121+
let mut builder = SqliteSidecarBuilder::begin(
122+
db_path.to_str().unwrap(),
123+
"models",
124+
4,
125+
provider_schema,
126+
0, // key (rowid) is column 0 of the input batches
127+
vec![1], // input column 1 (name) → provider field 1
128+
)
129+
.unwrap();
130+
builder.push_batch(&b1).unwrap();
131+
builder.push_batch(&b2).unwrap();
132+
let provider = builder.finish().unwrap();
133+
134+
// Point-lookup by sparse rowids.
135+
let batches = provider
136+
.fetch_by_keys(&[100, 999], "rowid", None)
137+
.await
138+
.unwrap();
139+
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
140+
assert_eq!(total_rows, 2);
141+
142+
let names: Vec<String> = batches
143+
.iter()
144+
.flat_map(|b| {
145+
b.column_by_name("name")
146+
.unwrap()
147+
.as_any()
148+
.downcast_ref::<StringArray>()
149+
.unwrap()
150+
.iter()
151+
.flatten()
152+
.map(|s| s.to_string())
153+
})
154+
.collect();
155+
assert!(names.contains(&"alice".to_string()));
156+
assert!(names.contains(&"carol".to_string()));
157+
assert!(!names.contains(&"bob".to_string())); // rowid 250 not requested
158+
159+
// A rowid that was never inserted returns nothing.
160+
let empty = provider.fetch_by_keys(&[42], "rowid", None).await.unwrap();
161+
assert_eq!(empty.iter().map(|b| b.num_rows()).sum::<usize>(), 0);
162+
}
163+
84164
#[tokio::test]
85165
async fn test_projection() {
86166
let dir = tempdir().unwrap();

0 commit comments

Comments
 (0)