Skip to content

Commit 87e2b73

Browse files
authored
feat(sqlite): add streaming SqliteSidecarBuilder keyed by a column (#24)
* feat(sqlite): add streaming SqliteSidecarBuilder keyed by a column open_or_build opens parquet files and synthesises monotonic 0..N keys. Add SqliteSidecarBuilder: an incremental begin/push_batch/finish builder that consumes RecordBatches and reads each row's key from a designated column, so a caller can build the sidecar from any batch source (e.g. a storage engine's native rowid) in a single bounded-memory pass without materialising an intermediate parquet file. Shares CREATE/INSERT DDL and row-param construction with the parquet path via new ddl()/row_to_params() helpers; build_table behaviour is unchanged. One transaction wraps the build; an abandoned builder rolls back on drop. Keys are stored as INTEGER PRIMARY KEY, preserving the B-tree point-lookup performance. * test(sqlite): cover rollback, uint64 keys, and validation for stream builder * fix(sqlite): bounds-check value_col_indices in push_batch push_batch validated key_col_index but not value_col_indices; an out-of-range entry would panic in row_to_params on batch.column(ci) rather than returning a clean DataFusionError. Validate both, and add a test for the out-of-range value-index case. Addresses PR review nit.
1 parent 00a6dbc commit 87e2b73

3 files changed

Lines changed: 440 additions & 24 deletions

File tree

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ pub use udtf::VectorSearchVectorUDTF;
8585
#[cfg(feature = "parquet-provider")]
8686
pub use parquet_provider::ParquetLookupProvider;
8787
#[cfg(feature = "sqlite-provider")]
88-
pub use sqlite_provider::SqliteLookupProvider;
88+
pub use sqlite_provider::{SqliteLookupProvider, SqliteSidecarBuilder};
8989

9090
use std::sync::Arc;
9191

src/sqlite_provider.rs

Lines changed: 224 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,149 @@ impl SqliteLookupProvider {
272272
}
273273
}
274274

275+
// ── Streaming sidecar builder ───────────────────────────────────────────────
276+
277+
/// Incremental builder for a [`SqliteLookupProvider`] backed by a stream of
278+
/// [`RecordBatch`]es instead of parquet files.
279+
///
280+
/// [`SqliteLookupProvider::open_or_build`] opens parquet files itself and
281+
/// synthesises monotonic `0..N` keys. This builder instead takes batches one at
282+
/// a time and reads each row's key from a designated column (e.g. a storage
283+
/// engine's native `rowid`). That lets a caller drive a single pass over a
284+
/// source — fanning each batch out to both a USearch index and this sidecar —
285+
/// without first materialising an intermediate parquet file.
286+
///
287+
/// Memory is bounded: [`push_batch`](Self::push_batch) inserts a batch's rows
288+
/// and returns, accumulating nothing across batches. All inserts share one
289+
/// transaction (opened in [`begin`](Self::begin), committed in
290+
/// [`finish`](Self::finish)); dropping the builder before `finish` rolls the
291+
/// transaction back, so a half-built table is never persisted.
292+
///
293+
/// The first field of `schema` is the key column, created as
294+
/// `INTEGER PRIMARY KEY` (the rowid-alias B-tree), matching `open_or_build`.
295+
pub struct SqliteSidecarBuilder {
296+
conn: Connection,
297+
db_path: String,
298+
table_name: String,
299+
schema: SchemaRef,
300+
pool_size: usize,
301+
insert_sql: String,
302+
key_col_index: usize,
303+
value_col_indices: Vec<usize>,
304+
}
305+
306+
impl SqliteSidecarBuilder {
307+
/// Begin a build at `db_path`: open the connection, start the transaction,
308+
/// and create the table.
309+
///
310+
/// `schema` is the output SQLite schema — field 0 is the key column
311+
/// (`INTEGER PRIMARY KEY`), fields 1.. are the stored value columns.
312+
/// `key_col_index` and `value_col_indices` index into the *input* batches
313+
/// passed to [`push_batch`](Self::push_batch): `key_col_index` is the
314+
/// column holding the row key, and `value_col_indices` map input columns to
315+
/// schema fields 1.. in order.
316+
pub fn begin(
317+
db_path: &str,
318+
table_name: &str,
319+
pool_size: usize,
320+
schema: SchemaRef,
321+
key_col_index: usize,
322+
value_col_indices: Vec<usize>,
323+
) -> DFResult<Self> {
324+
if pool_size == 0 {
325+
return Err(DataFusionError::Execution(
326+
"pool_size must be at least 1".into(),
327+
));
328+
}
329+
if schema.fields().len() != value_col_indices.len() + 1 {
330+
return Err(DataFusionError::Execution(format!(
331+
"schema has {} fields but expected 1 key column + {} value columns",
332+
schema.fields().len(),
333+
value_col_indices.len()
334+
)));
335+
}
336+
let (create_sql, insert_sql) = ddl(table_name, &schema);
337+
let conn = open_conn(db_path)?;
338+
// Manual BEGIN/COMMIT rather than a borrowed `Transaction` so the
339+
// transaction can span many push_batch calls without a self-referential
340+
// borrow. An uncommitted transaction is rolled back when `conn` is
341+
// dropped, so an abandoned build leaves no half-written table.
342+
conn.execute_batch("BEGIN;")
343+
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
344+
conn.execute_batch(&create_sql)
345+
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
346+
Ok(Self {
347+
conn,
348+
db_path: db_path.to_string(),
349+
table_name: table_name.to_string(),
350+
schema,
351+
pool_size,
352+
insert_sql,
353+
key_col_index,
354+
value_col_indices,
355+
})
356+
}
357+
358+
/// Insert every row of `batch`. The key is read from `key_col_index`; the
359+
/// value columns from `value_col_indices`. Rows are inserted as they are
360+
/// read — nothing is buffered, so peak memory is O(one batch).
361+
pub fn push_batch(&mut self, batch: &RecordBatch) -> DFResult<()> {
362+
let ncols = batch.num_columns();
363+
if self.key_col_index >= ncols {
364+
return Err(DataFusionError::Execution(format!(
365+
"key_col_index {} out of range for batch with {ncols} columns",
366+
self.key_col_index
367+
)));
368+
}
369+
if let Some(&bad) = self.value_col_indices.iter().find(|&&i| i >= ncols) {
370+
return Err(DataFusionError::Execution(format!(
371+
"value column index {bad} out of range for batch with {ncols} columns"
372+
)));
373+
}
374+
let key_col = batch.column(self.key_col_index);
375+
let mut stmt = self
376+
.conn
377+
.prepare_cached(&self.insert_sql)
378+
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
379+
for row in 0..batch.num_rows() {
380+
let key = extract_key(key_col, row)?;
381+
let params = row_to_params(key, batch, &self.value_col_indices, row);
382+
stmt.execute(rusqlite::params_from_iter(params.iter()))
383+
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
384+
}
385+
Ok(())
386+
}
387+
388+
/// Commit the build, checkpoint the WAL, and open the read connection pool.
389+
pub fn finish(self) -> DFResult<SqliteLookupProvider> {
390+
self.conn
391+
.execute_batch("COMMIT;")
392+
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
393+
// Flush WAL to the main db so the data survives process exit (matches
394+
// open_or_build).
395+
self.conn
396+
.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
397+
.map_err(|e| DataFusionError::Execution(format!("WAL checkpoint failed: {e}")))?;
398+
tracing::info!(
399+
"SQLite table '{}' built from stream and committed.",
400+
self.table_name
401+
);
402+
403+
let key_col = self.schema.field(0).name().clone();
404+
let mut conns = vec![self.conn];
405+
for _ in 1..self.pool_size {
406+
conns.push(open_conn(&self.db_path)?);
407+
}
408+
Ok(SqliteLookupProvider {
409+
schema: self.schema,
410+
table_name: self.table_name,
411+
key_col,
412+
pool: Arc::new(Mutex::new(conns)),
413+
sem: Arc::new(Semaphore::new(self.pool_size)),
414+
})
415+
}
416+
}
417+
275418
// ── PointLookupProvider ───────────────────────────────────────────────────────
276419

277420
#[async_trait]
@@ -682,14 +825,11 @@ fn build_scan_batch(schema: &SchemaRef, col_bufs: Vec<Vec<SqlValue>>) -> DFResul
682825

683826
// ── Build helpers ─────────────────────────────────────────────────────────────
684827

685-
fn build_table(
686-
conn: &Connection,
687-
table_name: &str,
688-
parquet_files: &[String],
689-
schema: &SchemaRef,
690-
parquet_col_indices: &[usize],
691-
) -> DFResult<()> {
692-
// The first field is the key column (INTEGER PRIMARY KEY).
828+
/// Build the `CREATE TABLE` and `INSERT` SQL for a sidecar table. The first
829+
/// schema field is the key column (`INTEGER PRIMARY KEY`, i.e. the rowid-alias
830+
/// B-tree); the rest are typed from their Arrow type. Shared by the parquet
831+
/// build path ([`build_table`]) and the streaming [`SqliteSidecarBuilder`].
832+
fn ddl(table_name: &str, schema: &SchemaRef) -> (String, String) {
693833
let key_col_name = schema.field(0).name();
694834
let col_defs = schema
695835
.fields()
@@ -698,12 +838,16 @@ fn build_table(
698838
if f.name() == key_col_name {
699839
format!("{} INTEGER PRIMARY KEY", quote_ident(f.name()))
700840
} else {
701-
let sql_type = arrow_type_to_sql(f.data_type());
702-
format!("{} {}", quote_ident(f.name()), sql_type)
841+
format!(
842+
"{} {}",
843+
quote_ident(f.name()),
844+
arrow_type_to_sql(f.data_type())
845+
)
703846
}
704847
})
705848
.collect::<Vec<_>>()
706849
.join(", ");
850+
let create_sql = format!("CREATE TABLE {} ({col_defs});", quote_ident(table_name));
707851

708852
let placeholders = schema
709853
.fields()
@@ -715,6 +859,73 @@ fn build_table(
715859
"INSERT INTO {} VALUES ({placeholders})",
716860
quote_ident(table_name)
717861
);
862+
(create_sql, insert_sql)
863+
}
864+
865+
/// Build the INSERT parameter row: the key (as SQLite INTEGER) first, then each
866+
/// value column in `value_col_indices` order.
867+
fn row_to_params(
868+
key: i64,
869+
batch: &RecordBatch,
870+
value_col_indices: &[usize],
871+
row: usize,
872+
) -> Vec<SqlValue> {
873+
let mut params: Vec<SqlValue> = Vec::with_capacity(value_col_indices.len() + 1);
874+
params.push(SqlValue::Integer(key));
875+
for &ci in value_col_indices {
876+
params.push(arrow_cell_to_sql(batch.column(ci), row));
877+
}
878+
params
879+
}
880+
881+
/// Read a non-null integer row key from `col` at `row`. Supports the 32/64-bit
882+
/// signed and unsigned integer types; the value is stored as SQLite INTEGER
883+
/// (i64), so a storage engine's native `rowid` (non-negative i64) round-trips
884+
/// through the `u64` lookup API unchanged.
885+
fn extract_key(col: &ArrayRef, row: usize) -> DFResult<i64> {
886+
if col.is_null(row) {
887+
return Err(DataFusionError::Execution(
888+
"key column has a null value; row keys must be non-null".into(),
889+
));
890+
}
891+
let key = match col.data_type() {
892+
DataType::Int64 => col
893+
.as_any()
894+
.downcast_ref::<Int64Array>()
895+
.unwrap()
896+
.value(row),
897+
DataType::UInt64 => col
898+
.as_any()
899+
.downcast_ref::<UInt64Array>()
900+
.unwrap()
901+
.value(row) as i64,
902+
DataType::Int32 => col
903+
.as_any()
904+
.downcast_ref::<Int32Array>()
905+
.unwrap()
906+
.value(row) as i64,
907+
DataType::UInt32 => col
908+
.as_any()
909+
.downcast_ref::<UInt32Array>()
910+
.unwrap()
911+
.value(row) as i64,
912+
other => {
913+
return Err(DataFusionError::Execution(format!(
914+
"unsupported key column type {other:?}; expected an integer type"
915+
)));
916+
}
917+
};
918+
Ok(key)
919+
}
920+
921+
fn build_table(
922+
conn: &Connection,
923+
table_name: &str,
924+
parquet_files: &[String],
925+
schema: &SchemaRef,
926+
parquet_col_indices: &[usize],
927+
) -> DFResult<()> {
928+
let (create_sql, insert_sql) = ddl(table_name, schema);
718929

719930
// CREATE TABLE and all INSERTs share one transaction so a mid-build crash
720931
// leaves no half-built table. If the table exists with zero rows on the
@@ -724,11 +935,8 @@ fn build_table(
724935
.unchecked_transaction()
725936
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
726937
{
727-
tx.execute_batch(&format!(
728-
"CREATE TABLE {} ({col_defs});",
729-
quote_ident(table_name)
730-
))
731-
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
938+
tx.execute_batch(&create_sql)
939+
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
732940

733941
let mut stmt = tx
734942
.prepare(&insert_sql)
@@ -754,12 +962,7 @@ fn build_table(
754962
let key = global_row_idx;
755963
global_row_idx += 1;
756964

757-
let mut params: Vec<SqlValue> = Vec::with_capacity(schema.fields().len());
758-
params.push(SqlValue::Integer(key as i64));
759-
760-
for &ci in parquet_col_indices {
761-
params.push(arrow_cell_to_sql(batch.column(ci), row_i));
762-
}
965+
let params = row_to_params(key as i64, &batch, parquet_col_indices, row_i);
763966

764967
stmt.execute(rusqlite::params_from_iter(params.iter()))
765968
.map_err(|e| DataFusionError::Execution(e.to_string()))?;

0 commit comments

Comments
 (0)