Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub use udtf::VectorSearchVectorUDTF;
#[cfg(feature = "parquet-provider")]
pub use parquet_provider::ParquetLookupProvider;
#[cfg(feature = "sqlite-provider")]
pub use sqlite_provider::SqliteLookupProvider;
pub use sqlite_provider::{SqliteLookupProvider, SqliteSidecarBuilder};

use std::sync::Arc;

Expand Down
245 changes: 224 additions & 21 deletions src/sqlite_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,149 @@ impl SqliteLookupProvider {
}
}

// ── Streaming sidecar builder ───────────────────────────────────────────────

/// Incremental builder for a [`SqliteLookupProvider`] backed by a stream of
/// [`RecordBatch`]es instead of parquet files.
///
/// [`SqliteLookupProvider::open_or_build`] opens parquet files itself and
/// synthesises monotonic `0..N` keys. This builder instead takes batches one at
/// a time and reads each row's key from a designated column (e.g. a storage
/// engine's native `rowid`). That lets a caller drive a single pass over a
/// source — fanning each batch out to both a USearch index and this sidecar —
/// without first materialising an intermediate parquet file.
///
/// Memory is bounded: [`push_batch`](Self::push_batch) inserts a batch's rows
/// and returns, accumulating nothing across batches. All inserts share one
/// transaction (opened in [`begin`](Self::begin), committed in
/// [`finish`](Self::finish)); dropping the builder before `finish` rolls the
/// transaction back, so a half-built table is never persisted.
///
/// The first field of `schema` is the key column, created as
/// `INTEGER PRIMARY KEY` (the rowid-alias B-tree), matching `open_or_build`.
pub struct SqliteSidecarBuilder {
conn: Connection,
db_path: String,
table_name: String,
schema: SchemaRef,
pool_size: usize,
insert_sql: String,
key_col_index: usize,
value_col_indices: Vec<usize>,
}

impl SqliteSidecarBuilder {
/// Begin a build at `db_path`: open the connection, start the transaction,
/// and create the table.
///
/// `schema` is the output SQLite schema — field 0 is the key column
/// (`INTEGER PRIMARY KEY`), fields 1.. are the stored value columns.
/// `key_col_index` and `value_col_indices` index into the *input* batches
/// passed to [`push_batch`](Self::push_batch): `key_col_index` is the
/// column holding the row key, and `value_col_indices` map input columns to
/// schema fields 1.. in order.
pub fn begin(
db_path: &str,
table_name: &str,
pool_size: usize,
schema: SchemaRef,
key_col_index: usize,
value_col_indices: Vec<usize>,
) -> DFResult<Self> {
if pool_size == 0 {
return Err(DataFusionError::Execution(
"pool_size must be at least 1".into(),
));
}
if schema.fields().len() != value_col_indices.len() + 1 {
return Err(DataFusionError::Execution(format!(
"schema has {} fields but expected 1 key column + {} value columns",
schema.fields().len(),
value_col_indices.len()
)));
}
let (create_sql, insert_sql) = ddl(table_name, &schema);
let conn = open_conn(db_path)?;
// Manual BEGIN/COMMIT rather than a borrowed `Transaction` so the
// transaction can span many push_batch calls without a self-referential
// borrow. An uncommitted transaction is rolled back when `conn` is
// dropped, so an abandoned build leaves no half-written table.
conn.execute_batch("BEGIN;")
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
conn.execute_batch(&create_sql)
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
Ok(Self {
conn,
db_path: db_path.to_string(),
table_name: table_name.to_string(),
schema,
pool_size,
insert_sql,
key_col_index,
value_col_indices,
})
}

/// Insert every row of `batch`. The key is read from `key_col_index`; the
/// value columns from `value_col_indices`. Rows are inserted as they are
/// read — nothing is buffered, so peak memory is O(one batch).
pub fn push_batch(&mut self, batch: &RecordBatch) -> DFResult<()> {
let ncols = batch.num_columns();
if self.key_col_index >= ncols {
return Err(DataFusionError::Execution(format!(
"key_col_index {} out of range for batch with {ncols} columns",
self.key_col_index
)));
}
Comment on lines +362 to +368
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: key_col_index is bounds-checked here but value_col_indices is not — if any entry is >= batch.num_columns(), row_to_params will panic on batch.column(ci) instead of returning a DataFusionError. Worth validating both for consistency so a bad caller index always surfaces as a clean error. (not blocking)

if let Some(&bad) = self.value_col_indices.iter().find(|&&i| i >= ncols) {
return Err(DataFusionError::Execution(format!(
"value column index {bad} out of range for batch with {ncols} columns"
)));
}
let key_col = batch.column(self.key_col_index);
let mut stmt = self
.conn
.prepare_cached(&self.insert_sql)
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
for row in 0..batch.num_rows() {
let key = extract_key(key_col, row)?;
let params = row_to_params(key, batch, &self.value_col_indices, row);
stmt.execute(rusqlite::params_from_iter(params.iter()))
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
}
Ok(())
}

/// Commit the build, checkpoint the WAL, and open the read connection pool.
pub fn finish(self) -> DFResult<SqliteLookupProvider> {
self.conn
.execute_batch("COMMIT;")
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
// Flush WAL to the main db so the data survives process exit (matches
// open_or_build).
self.conn
.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
.map_err(|e| DataFusionError::Execution(format!("WAL checkpoint failed: {e}")))?;
tracing::info!(
"SQLite table '{}' built from stream and committed.",
self.table_name
);

let key_col = self.schema.field(0).name().clone();
let mut conns = vec![self.conn];
for _ in 1..self.pool_size {
conns.push(open_conn(&self.db_path)?);
}
Ok(SqliteLookupProvider {
schema: self.schema,
table_name: self.table_name,
key_col,
pool: Arc::new(Mutex::new(conns)),
sem: Arc::new(Semaphore::new(self.pool_size)),
})
}
}

// ── PointLookupProvider ───────────────────────────────────────────────────────

#[async_trait]
Expand Down Expand Up @@ -682,14 +825,11 @@ fn build_scan_batch(schema: &SchemaRef, col_bufs: Vec<Vec<SqlValue>>) -> DFResul

// ── Build helpers ─────────────────────────────────────────────────────────────

fn build_table(
conn: &Connection,
table_name: &str,
parquet_files: &[String],
schema: &SchemaRef,
parquet_col_indices: &[usize],
) -> DFResult<()> {
// The first field is the key column (INTEGER PRIMARY KEY).
/// Build the `CREATE TABLE` and `INSERT` SQL for a sidecar table. The first
/// schema field is the key column (`INTEGER PRIMARY KEY`, i.e. the rowid-alias
/// B-tree); the rest are typed from their Arrow type. Shared by the parquet
/// build path ([`build_table`]) and the streaming [`SqliteSidecarBuilder`].
fn ddl(table_name: &str, schema: &SchemaRef) -> (String, String) {
let key_col_name = schema.field(0).name();
let col_defs = schema
.fields()
Expand All @@ -698,12 +838,16 @@ fn build_table(
if f.name() == key_col_name {
format!("{} INTEGER PRIMARY KEY", quote_ident(f.name()))
} else {
let sql_type = arrow_type_to_sql(f.data_type());
format!("{} {}", quote_ident(f.name()), sql_type)
format!(
"{} {}",
quote_ident(f.name()),
arrow_type_to_sql(f.data_type())
)
}
})
.collect::<Vec<_>>()
.join(", ");
let create_sql = format!("CREATE TABLE {} ({col_defs});", quote_ident(table_name));

let placeholders = schema
.fields()
Expand All @@ -715,6 +859,73 @@ fn build_table(
"INSERT INTO {} VALUES ({placeholders})",
quote_ident(table_name)
);
(create_sql, insert_sql)
}

/// Build the INSERT parameter row: the key (as SQLite INTEGER) first, then each
/// value column in `value_col_indices` order.
fn row_to_params(
key: i64,
batch: &RecordBatch,
value_col_indices: &[usize],
row: usize,
) -> Vec<SqlValue> {
let mut params: Vec<SqlValue> = Vec::with_capacity(value_col_indices.len() + 1);
params.push(SqlValue::Integer(key));
for &ci in value_col_indices {
params.push(arrow_cell_to_sql(batch.column(ci), row));
}
params
}

/// Read a non-null integer row key from `col` at `row`. Supports the 32/64-bit
/// signed and unsigned integer types; the value is stored as SQLite INTEGER
/// (i64), so a storage engine's native `rowid` (non-negative i64) round-trips
/// through the `u64` lookup API unchanged.
fn extract_key(col: &ArrayRef, row: usize) -> DFResult<i64> {
if col.is_null(row) {
return Err(DataFusionError::Execution(
"key column has a null value; row keys must be non-null".into(),
));
}
let key = match col.data_type() {
DataType::Int64 => col
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.value(row),
DataType::UInt64 => col
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.value(row) as i64,
DataType::Int32 => col
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.value(row) as i64,
DataType::UInt32 => col
.as_any()
.downcast_ref::<UInt32Array>()
.unwrap()
.value(row) as i64,
other => {
return Err(DataFusionError::Execution(format!(
"unsupported key column type {other:?}; expected an integer type"
)));
}
};
Ok(key)
}

fn build_table(
conn: &Connection,
table_name: &str,
parquet_files: &[String],
schema: &SchemaRef,
parquet_col_indices: &[usize],
) -> DFResult<()> {
let (create_sql, insert_sql) = ddl(table_name, schema);

// CREATE TABLE and all INSERTs share one transaction so a mid-build crash
// leaves no half-built table. If the table exists with zero rows on the
Expand All @@ -724,11 +935,8 @@ fn build_table(
.unchecked_transaction()
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
{
tx.execute_batch(&format!(
"CREATE TABLE {} ({col_defs});",
quote_ident(table_name)
))
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
tx.execute_batch(&create_sql)
.map_err(|e| DataFusionError::Execution(e.to_string()))?;

let mut stmt = tx
.prepare(&insert_sql)
Expand All @@ -754,12 +962,7 @@ fn build_table(
let key = global_row_idx;
global_row_idx += 1;

let mut params: Vec<SqlValue> = Vec::with_capacity(schema.fields().len());
params.push(SqlValue::Integer(key as i64));

for &ci in parquet_col_indices {
params.push(arrow_cell_to_sql(batch.column(ci), row_i));
}
let params = row_to_params(key as i64, &batch, parquet_col_indices, row_i);

stmt.execute(rusqlite::params_from_iter(params.iter()))
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
Expand Down
Loading
Loading