Skip to content
266 changes: 262 additions & 4 deletions src/sqlite_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,14 @@ use async_trait::async_trait;
use datafusion::catalog::{Session, TableProvider};
use datafusion::common::Result as DFResult;
use datafusion::error::DataFusionError;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::logical_expr::{Expr, TableType};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
};
use rusqlite::{Connection, types::Value as SqlValue};
use tokio::sync::Semaphore;

Expand Down Expand Up @@ -311,10 +317,262 @@ impl TableProvider for SqliteLookupProvider {
_filters: &[Expr],
_limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
Err(DataFusionError::NotImplemented(
"SqliteLookupProvider does not support full table scans; use fetch_by_keys".into(),
))
Ok(Arc::new(SqliteFullScanExec::new(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _projection parameter is ignored — the scan always fetches every column regardless of what DataFusion requests. For tables with many wide columns this wastes I/O and memory.\n\nConsider building a projected schema here (same pattern used in fetch_by_keys) and passing it to SqliteFullScanExec:\n\nrust\nlet schema = match projection {\n None => self.schema.clone(),\n Some(idxs) => Arc::new(arrow_schema::Schema::new(\n idxs.iter().map(|&i| self.schema.field(i).clone()).collect::<Vec<_>>(),\n )),\n};\nOk(Arc::new(SqliteFullScanExec::new(\n self.pool.clone(),\n self.sem.clone(),\n self.table_name.clone(),\n schema,\n)))\n

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentionally left unprojected. The only caller (plan_extension in planner.rs) always passes projection: None because the adaptive filter path needs all columns available to evaluate arbitrary WHERE predicates — projecting here risks dropping columns the filter references. Adding projection support would require the caller to compute the union of filter-referenced columns and key column, which adds complexity for no current benefit.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: thread _projection through to SqliteFullScanExec and use it to build a narrower SELECT column list. Right now every full scan fetches all columns from SQLite regardless of what the query needs. DataFusion will add a ProjectionExec on top so results are correct, but wide tables pay unnecessary deserialization cost.

self.pool.clone(),
self.sem.clone(),
self.table_name.clone(),
self.schema.clone(),
)))
Comment on lines +320 to +325

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: projection hint is ignored — always scans all columns.

SqliteFullScanExec selects every column from SQLite regardless of the _projection argument. For wide tables where a query touches only a few columns, this wastes significant I/O per batch. The existing fetch_by_keys already handles projection correctly via out_schema.

SqliteFullScanExec should accept a projection: Option<Vec<usize>> field. The SQL col_list and the PlanProperties schema should both reflect only the projected columns, matching the pattern in execute_query_sync.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate of the earlier thread — see reply at #discussion_r2945452082. The sole caller (plan_extension) always passes projection: None because the adaptive filter path requires all columns to evaluate arbitrary WHERE predicates. Adding projection support risks correctness (dropping filter-referenced columns) for no current benefit.

}
}

// ── Full-scan execution plan ──────────────────────────────────────────────────

/// Batch size used when streaming rows from SQLite during a full table scan.
/// Larger values reduce round-trip overhead; smaller values reduce peak memory.
const SCAN_BATCH_SIZE: usize = 1024;

/// Physical execution plan that streams all rows from a SQLite table in
/// [`SCAN_BATCH_SIZE`]-row batches. Used by the adaptive filtered path in
/// `USearchExec` to evaluate WHERE-clause predicates without loading the
/// entire table into memory at once.
#[derive(Debug)]
struct SqliteFullScanExec {
pool: Arc<Mutex<Vec<Connection>>>,
sem: Arc<Semaphore>,
table_name: String,
schema: SchemaRef,
properties: PlanProperties,
}

impl SqliteFullScanExec {
fn new(
pool: Arc<Mutex<Vec<Connection>>>,
sem: Arc<Semaphore>,
table_name: String,
schema: SchemaRef,
) -> Self {
let properties = PlanProperties::new(
EquivalenceProperties::new(schema.clone()),
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Bounded,
);
Self {
pool,
sem,
table_name,
schema,
properties,
}
}
}

impl DisplayAs for SqliteFullScanExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "SqliteFullScanExec: table={}", self.table_name)
}
}

impl ExecutionPlan for SqliteFullScanExec {
fn name(&self) -> &str {
"SqliteFullScanExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
if children.is_empty() {
Ok(self)
} else {
Err(DataFusionError::Internal(
"SqliteFullScanExec is a leaf node and takes no children".into(),
))
}
}

fn execute(
&self,
_partition: usize,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: validate partition before spawning. UnknownPartitioning(1) declares a single partition, so any other value is a caller bug — better to surface it than silently spawn a duplicate scan.

Suggested change
_partition: usize,
partition: usize,
        if partition != 0 {
            return Err(DataFusionError::Internal(format!(
                "SqliteFullScanExec only has 1 partition, got partition {partition}"
            )));
        }

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataFusion never calls execute() with partition > 0 when UnknownPartitioning(1) is declared — this is enforced by the framework. No other ExecutionPlan in this codebase (including USearchExec) adds this guard. Adding it here would be inconsistent defensive code for a scenario that can't happen.

_ctx: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
let pool = self.pool.clone();
let sem = Arc::clone(&self.sem);
let table_name = self.table_name.clone();
let schema = self.schema.clone();

// Bounded channel: backpressure limits how many batches are buffered
// ahead of the consumer, keeping peak memory to O(batch_size × 2).
let (tx, rx) = tokio::sync::mpsc::channel::<DFResult<RecordBatch>>(2);

let schema_task = schema.clone();
tokio::spawn(async move {
// Acquire a semaphore permit so the scan counts against the
// same concurrency limit as fetch_by_keys.
let _permit = match sem.acquire_owned().await {
Ok(p) => p,
Err(e) => {
let _ = tx
.send(Err(DataFusionError::Execution(e.to_string())))
.await;
return;
}
};

let conn = match pool.lock() {
Ok(mut g) => g.pop().ok_or_else(|| {
DataFusionError::Execution("SqliteFullScanExec: connection pool empty".into())
}),
Err(e) => Err(DataFusionError::Execution(format!(
"connection pool mutex poisoned: {e}"
))),
};

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Swallowing the poison error and returning None means a poisoned mutex is reported as "connection pool empty", hiding the real cause. fetch_by_keys handles this correctly — propagate the error here too:

Suggested change
};
let conn = match pool.lock() {
Ok(mut g) => g.pop().ok_or_else(|| {
DataFusionError::Execution("SqliteFullScanExec: connection pool empty".into())
}),
Err(e) => Err(DataFusionError::Execution(format!(
"connection pool mutex poisoned: {e}"
))),
};
let conn = match conn {
Ok(c) => c,

let conn = match conn {
Ok(c) => c,
Err(e) => {
let _ = tx.send(Err(e)).await;
return;
}
};

let pool_c = pool.clone();
let tx_c = tx.clone();
if let Err(e) = tokio::task::spawn_blocking(move || {
let guard = ConnGuard::new(pool_c, conn);
let conn = guard.conn.as_ref().unwrap();

let col_list = schema_task
.fields()
.iter()
.map(|f| quote_ident(f.name()))
.collect::<Vec<_>>()
.join(", ");
// No ORDER BY — the adaptive filter doesn't require ordering.
let sql = format!("SELECT {col_list} FROM {}", quote_ident(&table_name));

let mut stmt = match conn.prepare(&sql) {
Ok(s) => s,
Err(e) => {
let _ = tx_c.blocking_send(Err(DataFusionError::Execution(e.to_string())));
return;
}
};

let mut rows = match stmt.query([]) {
Ok(r) => r,
Err(e) => {
let _ = tx_c.blocking_send(Err(DataFusionError::Execution(e.to_string())));
return;
}
};

let n_cols = schema_task.fields().len();
let mut col_bufs: Vec<Vec<SqlValue>> = (0..n_cols)
.map(|_| Vec::with_capacity(SCAN_BATCH_SIZE))
.collect();
let mut rows_in_batch = 0usize;

loop {
match rows.next() {
Ok(Some(row)) => {
let mut row_ok = true;
for (ci, buf) in col_bufs.iter_mut().enumerate() {
match row.get::<_, SqlValue>(ci) {
Ok(v) => buf.push(v),
Err(e) => {
let _ = tx_c.blocking_send(Err(
DataFusionError::Execution(e.to_string()),
));
row_ok = false;
break;
}
}
}
if !row_ok {
// Error already sent on the channel — skip the
// final flush entirely to avoid sending Ok after Err.
return;
}
Comment on lines +486 to +504

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a column read fails mid-row (say on column ci), columns 0..ci in col_bufs have one extra value that columns ci..n don't. The code correctly sends the error and breaks, but rows_in_batch still reflects the count of fully completed rows. If rows_in_batch > 0, the final flush at line 535 will call build_scan_batch with buffers of unequal length, causing RecordBatch::try_new to fail and a second error to be sent on the channel — masking the original one.

Fix: clear col_bufs (or just skip the flush) on the error path before breaking:

if !row_ok {
    // Discard partial row data so the final flush doesn't see
    // mismatched column buffer lengths.
    for buf in col_bufs.iter_mut() {
        buf.truncate(rows_in_batch);
    }
    break;
}

rows_in_batch += 1;
if rows_in_batch >= SCAN_BATCH_SIZE {
let drained: Vec<Vec<SqlValue>> = col_bufs
.iter_mut()
.map(|b| {
std::mem::replace(b, Vec::with_capacity(SCAN_BATCH_SIZE))
})
.collect();
rows_in_batch = 0;
match build_scan_batch(&schema_task, drained) {
Ok(batch) => {
if tx_c.blocking_send(Ok(batch)).is_err() {
return; // consumer dropped
}
}
Err(e) => {
let _ = tx_c.blocking_send(Err(e));
return;
}
}
}
}
Ok(None) => break,
Err(e) => {
let _ =
tx_c.blocking_send(Err(DataFusionError::Execution(e.to_string())));
return;
}
}
}

// Flush the last partial batch.
if rows_in_batch > 0 {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a column-read error the loop breaks with rows_in_batch > 0, so this flush sends an Ok(batch) down the channel after an Err(...) was already sent. DataFusion consumers stop on the first error so this is harmless in practice, but it's confusing. Consider adding a had_error: bool flag and skipping the flush when it's set.

match build_scan_batch(&schema_task, col_bufs) {
Ok(batch) => {
let _ = tx_c.blocking_send(Ok(batch));
}
Err(e) => {
let _ = tx_c.blocking_send(Err(e));
}
}
}
})
.await
{
let _ = tx
.send(Err(DataFusionError::Execution(format!(
"scan task panicked: {e}"
))))
.await;
}
});

// Convert the channel receiver into a RecordBatch stream.
let stream = futures::stream::unfold(rx, |mut rx| async move {
rx.recv().await.map(|item| (item, rx))
});
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}
}

/// Build a [`RecordBatch`] from column buffers of [`SqlValue`]s.
fn build_scan_batch(schema: &SchemaRef, col_bufs: Vec<Vec<SqlValue>>) -> DFResult<RecordBatch> {
let arrays: Vec<ArrayRef> = schema
.fields()
.iter()
.zip(col_bufs)
.map(|(field, values)| sql_values_to_arrow(field.data_type(), values))
.collect::<DFResult<_>>()?;
RecordBatch::try_new(schema.clone(), arrays)
.map_err(|e| DataFusionError::ArrowError(Box::new(e), None))
}

// ── Build helpers ─────────────────────────────────────────────────────────────
Expand Down
44 changes: 32 additions & 12 deletions tests/sqlite_provider_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::sync::Arc;

use arrow_array::{RecordBatch, StringArray, UInt64Array};
use arrow_array::{Array, RecordBatch, StringArray, UInt64Array};
use arrow_schema::{DataType, Field, Schema};
use datafusion::catalog::TableProvider;
use datafusion::prelude::SessionContext;
Expand Down Expand Up @@ -140,23 +140,43 @@ async fn test_empty_key_slice() {
assert!(batches.is_empty());
}

/// Regression test for the silent-empty-scan bug:
/// scan() used to return an empty MemTable, producing zero rows with no error.
/// It must now return NotImplemented so callers get a clear failure.
/// scan() returns a streaming ExecutionPlan that yields all rows in batches.
#[tokio::test]
async fn test_scan_returns_not_implemented() {
async fn test_scan_streams_all_rows() {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing 3-row test covers only the final partial-batch flush. The more complex path — where a full 1024-row batch is emitted mid-scan and buffers are drained — is never exercised. Consider adding a second test that inserts SCAN_BATCH_SIZE + 1 rows (or at minimum > SCAN_BATCH_SIZE) and asserts the total row count, to cover the batch-boundary drain logic.

use datafusion::execution::TaskContext;
use futures::StreamExt;

let dir = tempdir().unwrap();
let provider = make_provider(&dir);

let ctx = SessionContext::new();
let state = ctx.state();
let result = provider.scan(&state, None, &[], None).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("not support full table scans"),
"expected NotImplemented error, got: {err}"
);
let plan = provider.scan(&state, None, &[], None).await.unwrap();

let task_ctx = Arc::new(TaskContext::default());
let mut stream = plan.execute(0, task_ctx).unwrap();

let mut total_rows = 0usize;
let mut all_names: Vec<String> = Vec::new();
while let Some(batch) = stream.next().await {
let batch = batch.unwrap();
total_rows += batch.num_rows();

let names_col = batch
.column_by_name("name")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
for i in 0..names_col.len() {
all_names.push(names_col.value(i).to_string());
}
}

assert_eq!(total_rows, 3);
assert!(all_names.contains(&"alice".to_string()));
assert!(all_names.contains(&"bob".to_string()));
assert!(all_names.contains(&"carol".to_string()));
}

/// Regression test for the SQL injection fix via quote_ident:
Expand Down
Loading