Skip to content

Commit 346381c

Browse files
sgrebnovphillipleblanc
authored andcommitted
fix: Align SqlTable::supports_filters_pushdown with scan_to_sql by using Unparser (#637)
1 parent df7dbc6 commit 346381c

4 files changed

Lines changed: 164 additions & 10 deletions

File tree

core/src/duckdb.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,33 @@ impl DuckDBTableProviderFactory {
356356

357357
Ok(pool)
358358
}
359+
360+
/// Drop the cached pool entry for `key` if any, returning the previously
361+
/// cached pool. Subsequent calls to `get_or_init_*` for the same key will
362+
/// build a fresh pool.
363+
///
364+
/// This is intended for callers that replace the underlying database file
365+
/// out-of-band (for example, after restoring it from a snapshot in object
366+
/// storage). Existing connections held by other clones of the previously
367+
/// returned pool keep operating against the file descriptor they opened;
368+
/// once they are dropped the OS releases the prior inode. New providers
369+
/// built after invalidation will open the file fresh and observe the
370+
/// replacement contents.
371+
pub async fn invalidate_instance(&self, key: &DbInstanceKey) -> Option<DuckDbConnectionPool> {
372+
self.instances.lock().await.remove(key)
373+
}
374+
375+
/// Drop the cached pool entry for the file at `path` if any.
376+
///
377+
/// Convenience wrapper over [`Self::invalidate_instance`] for the common
378+
/// file-mode case.
379+
pub async fn invalidate_file_instance(
380+
&self,
381+
path: impl Into<Arc<str>>,
382+
) -> Option<DuckDbConnectionPool> {
383+
self.invalidate_instance(&DbInstanceKey::file(path.into()))
384+
.await
385+
}
359386
}
360387

361388
type DynDuckDbConnectionPool = dyn DbConnectionPool<r2d2::PooledConnection<DuckdbConnectionManager>, DuckDBParameter>
@@ -779,6 +806,45 @@ pub(crate) mod tests {
779806
use std::collections::HashMap;
780807
use std::sync::Arc;
781808

809+
#[tokio::test]
810+
async fn invalidate_instance_drops_cached_pool() {
811+
let factory = DuckDBTableProviderFactory::new(duckdb::AccessMode::ReadWrite);
812+
let options = HashMap::new();
813+
814+
// First call populates the cache.
815+
let _pool1 = factory
816+
.get_or_init_memory_instance(&options)
817+
.await
818+
.expect("first init");
819+
assert_eq!(factory.instances.lock().await.len(), 1);
820+
821+
// Second call (without invalidation) returns the cached pool clone
822+
// without growing the registry.
823+
let _pool2 = factory
824+
.get_or_init_memory_instance(&options)
825+
.await
826+
.expect("cached init");
827+
assert_eq!(factory.instances.lock().await.len(), 1);
828+
829+
// Invalidate; entry is evicted and returned.
830+
let evicted = factory.invalidate_instance(&DbInstanceKey::memory()).await;
831+
assert!(evicted.is_some(), "invalidate returns evicted pool");
832+
assert_eq!(factory.instances.lock().await.len(), 0);
833+
834+
// Re-invalidating a missing key is a no-op.
835+
assert!(factory
836+
.invalidate_instance(&DbInstanceKey::file("never-cached".into()))
837+
.await
838+
.is_none());
839+
840+
// Next get_or_init repopulates the cache.
841+
let _pool3 = factory
842+
.get_or_init_memory_instance(&options)
843+
.await
844+
.expect("reinit after invalidate");
845+
assert_eq!(factory.instances.lock().await.len(), 1);
846+
}
847+
782848
#[tokio::test]
783849
async fn test_create_with_memory_limit() {
784850
let table_name = TableReference::bare("test_table");

core/src/sql/sql_provider_datafusion/expr.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -267,20 +267,22 @@ fn handle_cast(cast: &Cast, engine: Option<Engine>, expr: &Expr) -> Result<Strin
267267
}
268268
}
269269

270-
// Helper function to check if expression contains subquery
270+
// Helper function to check if expression contains subquery or outer reference
271271
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
272272

273-
#[allow(dead_code)]
274-
pub(super) fn expr_contains_subquery(expr: &Expr) -> datafusion::error::Result<bool> {
275-
let mut contains_subquery = false;
273+
pub(super) fn expr_contains_subquery_or_outer_ref(expr: &Expr) -> datafusion::error::Result<bool> {
274+
let mut found = false;
276275
expr.apply(|expr| match expr {
277-
Expr::ScalarSubquery(_) | Expr::InSubquery(_) | Expr::Exists(_) => {
278-
contains_subquery = true;
276+
Expr::ScalarSubquery(_)
277+
| Expr::InSubquery(_)
278+
| Expr::Exists(_)
279+
| Expr::OuterReferenceColumn(_, _) => {
280+
found = true;
279281
Ok(TreeNodeRecursion::Stop)
280282
}
281283
_ => Ok(TreeNodeRecursion::Continue),
282284
})?;
283-
Ok(contains_subquery)
285+
Ok(found)
284286
}
285287

286288
#[cfg(test)]

core/src/sql/sql_provider_datafusion/mod.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -293,11 +293,27 @@ impl<T, P> TableProvider for SqlTable<T, P> {
293293
&self,
294294
filters: &[&Expr],
295295
) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
296+
let dialect = self.dialect_arc();
297+
let unparser = Unparser::new(dialect.as_ref());
298+
296299
let filter_push_down: Vec<TableProviderFilterPushDown> = filters
297300
.iter()
298-
.map(|f| match expr::to_sql_with_engine(f, self.engine) {
299-
Ok(_) => TableProviderFilterPushDown::Exact,
300-
Err(_) => TableProviderFilterPushDown::Unsupported,
301+
.map(|f| {
302+
// Expressions containing subqueries or outer references must not be
303+
// pushed down. Subqueries are handled at the plan level and may
304+
// reference tables in other databases not accessible from this
305+
// connection. Outer references refer to columns from an enclosing
306+
// query that the table provider cannot resolve.
307+
if expr::expr_contains_subquery_or_outer_ref(f).unwrap_or(true) {
308+
return TableProviderFilterPushDown::Unsupported;
309+
}
310+
// Use the same Unparser that scan_to_sql() uses for actual SQL
311+
// generation, so the capability check matches what can really
312+
// be converted to SQL for the target dialect.
313+
match unparser.expr_to_sql(f) {
314+
Ok(_) => TableProviderFilterPushDown::Exact,
315+
Err(_) => TableProviderFilterPushDown::Unsupported,
316+
}
301317
})
302318
.collect();
303319

core/src/sqlite.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,30 @@ impl SqliteTableProviderFactory {
246246

247247
Ok(pool)
248248
}
249+
250+
/// Drop the cached pool entry for `key` if any, returning the previously
251+
/// cached pool. Subsequent calls to [`Self::get_or_init_instance`] for
252+
/// the same key will build a fresh pool.
253+
///
254+
/// This is intended for callers that replace the underlying database file
255+
/// out-of-band (for example, after restoring it from a snapshot in object
256+
/// storage). See [`crate::duckdb::DuckDBTableProviderFactory::invalidate_instance`]
257+
/// for the semantics around in-flight connections.
258+
pub async fn invalidate_instance(&self, key: &DbInstanceKey) -> Option<SqliteConnectionPool> {
259+
self.instances.lock().await.remove(key)
260+
}
261+
262+
/// Drop the cached pool entry for the file at `db_path` if any.
263+
///
264+
/// Convenience wrapper over [`Self::invalidate_instance`] for the common
265+
/// file-mode case.
266+
pub async fn invalidate_file_instance(
267+
&self,
268+
db_path: impl Into<Arc<str>>,
269+
) -> Option<SqliteConnectionPool> {
270+
self.invalidate_instance(&DbInstanceKey::file(db_path.into()))
271+
.await
272+
}
249273
}
250274

251275
impl Default for SqliteTableProviderFactory {
@@ -1511,6 +1535,52 @@ pub(crate) mod tests {
15111535

15121536
use super::*;
15131537

1538+
#[tokio::test]
1539+
async fn invalidate_instance_drops_cached_pool() {
1540+
let factory = SqliteTableProviderFactory::new();
1541+
let path: Arc<str> = Arc::from("/tmp/spice-tp-invalidate-test.db");
1542+
// Best-effort cleanup of leftover files from prior runs.
1543+
let _ = std::fs::remove_file(path.as_ref());
1544+
1545+
let _pool = factory
1546+
.get_or_init_instance(
1547+
Arc::clone(&path),
1548+
Mode::File,
1549+
std::time::Duration::from_millis(5_000),
1550+
)
1551+
.await
1552+
.expect("init file pool");
1553+
assert_eq!(factory.instances.lock().await.len(), 1);
1554+
1555+
let evicted = factory
1556+
.invalidate_instance(&DbInstanceKey::file(Arc::clone(&path)))
1557+
.await;
1558+
assert!(evicted.is_some(), "invalidate returns evicted pool");
1559+
assert_eq!(factory.instances.lock().await.len(), 0);
1560+
1561+
// Convenience wrapper: re-init then invalidate via path.
1562+
let _pool2 = factory
1563+
.get_or_init_instance(
1564+
Arc::clone(&path),
1565+
Mode::File,
1566+
std::time::Duration::from_millis(5_000),
1567+
)
1568+
.await
1569+
.expect("reinit file pool");
1570+
assert_eq!(factory.instances.lock().await.len(), 1);
1571+
let evicted2 = factory.invalidate_file_instance(Arc::clone(&path)).await;
1572+
assert!(evicted2.is_some());
1573+
assert_eq!(factory.instances.lock().await.len(), 0);
1574+
1575+
// Missing key is a no-op.
1576+
assert!(factory
1577+
.invalidate_instance(&DbInstanceKey::file("never-cached".into()))
1578+
.await
1579+
.is_none());
1580+
1581+
let _ = std::fs::remove_file(path.as_ref());
1582+
}
1583+
15141584
#[tokio::test]
15151585
async fn test_sqlite_table_creation_with_indexes() {
15161586
let schema = Arc::new(Schema::new(vec![

0 commit comments

Comments
 (0)