Skip to content

Commit 0bf0df1

Browse files
committed
feat: add StatsCache for per-pass memoization in StatisticsRegistry
Adds `StatsCache` and `compute_cached` to `StatisticsRegistry`. `compute` now delegates to `compute_cached` with a fresh cache, so existing callers are unaffected. Callers that invoke the registry multiple times within a single optimization pass (e.g., comparing both sides of a join) can share a `StatsCache` across calls to avoid redundant subtree walks. The cache is keyed by plan node pointer address and scoped to the caller, following Apache Calcite's `RelMetadataQuery` pattern.
1 parent 051d557 commit 0bf0df1

1 file changed

Lines changed: 101 additions & 20 deletions

File tree

  • datafusion/physical-plan/src/operator_statistics

datafusion/physical-plan/src/operator_statistics/mod.rs

Lines changed: 101 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,11 @@ impl StatisticsProvider for DefaultStatisticsProvider {
281281
/// Registry that chains [`StatisticsProvider`] implementations.
282282
///
283283
/// The registry is a stateless provider chain: it holds no mutable state
284-
/// and is cheaply `Clone`able / `Send` / `Sync`.
284+
/// and is cheaply `Clone`able / `Send` / `Sync`. For memoized computation
285+
/// across multiple `compute` calls within a single optimization pass, use
286+
/// [`compute_cached`](Self::compute_cached) with a caller-owned
287+
/// [`StatsCache`]. This design follows Apache Calcite's `RelMetadataQuery`
288+
/// pattern, where the cache is per-query and scoped to the caller.
285289
#[derive(Clone)]
286290
pub struct StatisticsRegistry {
287291
providers: Vec<Arc<dyn StatisticsProvider>>,
@@ -299,6 +303,39 @@ impl Default for StatisticsRegistry {
299303
}
300304
}
301305

306+
/// Per-pass memoization cache for [`StatisticsRegistry::compute_cached`].
307+
///
308+
/// Keyed by plan node pointer address. Create one per optimization pass
309+
/// and drop it when the pass completes to limit stale entries.
310+
///
311+
/// # Limitations
312+
///
313+
/// During `transform_up`, replaced plan nodes are deallocated and the
314+
/// allocator may reuse their addresses for new nodes within the same pass.
315+
/// A cache hit on a reused address would return stale statistics.
316+
///
317+
/// TODO: use a monotonic plan-node ID as cache key instead of pointer
318+
/// address to eliminate aliasing risk entirely.
319+
///
320+
/// # Example
321+
///
322+
/// ```ignore
323+
/// let registry = StatisticsRegistry::default_with_builtin_providers();
324+
/// let mut cache = StatsCache::new();
325+
/// let left_stats = registry.compute_cached(left, &mut cache)?;
326+
/// let right_stats = registry.compute_cached(right, &mut cache)?;
327+
/// // cache is dropped at end of scope
328+
/// ```
329+
#[derive(Debug, Default)]
330+
pub struct StatsCache(HashMap<usize, ExtendedStatistics>);
331+
332+
impl StatsCache {
333+
/// Create a new empty cache.
334+
pub fn new() -> Self {
335+
Self(HashMap::new())
336+
}
337+
}
338+
302339
impl StatisticsRegistry {
303340
/// Create a new empty registry.
304341
///
@@ -352,43 +389,87 @@ impl StatisticsRegistry {
352389

353390
/// Compute extended statistics for a plan through the provider chain.
354391
///
355-
/// Performs a bottom-up tree walk: child statistics are computed recursively
356-
/// and passed to providers, mirroring how `partition_statistics` composes
357-
/// operators. Once [#20184](https://github.com/apache/datafusion/issues/20184)
358-
/// lands, the registry can feed enriched base stats directly into
359-
/// `partition_statistics(child_stats)`, removing the need for a separate walk.
360-
///
361-
/// If no providers are registered, falls back to the plan's built-in
362-
/// `partition_statistics(None)` with no overhead.
392+
/// Convenience method that creates a fresh [`StatsCache`] per call.
393+
/// For repeated calls within the same optimization pass (e.g., comparing
394+
/// left/right sides of a join), use [`compute_cached`](Self::compute_cached)
395+
/// with a shared cache to avoid redundant subtree walks.
363396
pub fn compute(&self, plan: &dyn ExecutionPlan) -> Result<ExtendedStatistics> {
397+
let mut cache = StatsCache::new();
398+
self.compute_cached(plan, &mut cache)
399+
}
400+
401+
/// Compute extended statistics with memoization via a caller-owned cache.
402+
///
403+
/// Results are keyed by plan pointer address. Repeated calls for the
404+
/// same plan node (or shared subtrees) return cached results without
405+
/// re-walking the tree.
406+
///
407+
/// The cache should be scoped to a single optimization pass: create it
408+
/// before the pass, share it across all `compute_cached` calls within
409+
/// that pass, and drop it when the pass completes.
410+
///
411+
/// If no providers are registered, immediately falls back to the plan's
412+
/// built-in `partition_statistics(None)` with no overhead (no caching).
413+
///
414+
/// When providers are registered:
415+
/// - For leaf nodes, providers are called with empty child stats
416+
/// - For non-leaf nodes, child stats are recursively computed first
417+
/// and passed to providers
418+
/// - If no provider claims the node, falls back to `partition_statistics(None)`
419+
///
420+
/// # Note on #20184
421+
///
422+
/// This method performs its own bottom-up tree walk, separate from the
423+
/// walk that optimizer rules do via `transform_up`. Once
424+
/// [#20184](https://github.com/apache/datafusion/issues/20184) lands,
425+
/// the registry can feed enriched base stats into `partition_statistics(child_stats)`,
426+
/// removing redundancy for the base-stats path. The separate walk is still needed
427+
/// for extension propagation as long as `partition_statistics` returns `Arc<Statistics>`
428+
/// rather than a type that carries the extension map.
429+
pub fn compute_cached(
430+
&self,
431+
plan: &dyn ExecutionPlan,
432+
cache: &mut StatsCache,
433+
) -> Result<ExtendedStatistics> {
364434
// Fast path: no providers registered, skip the walk entirely
365435
if self.providers.is_empty() {
366436
let base = plan.partition_statistics(None)?;
367437
return Ok(ExtendedStatistics::new_arc(base));
368438
}
369439

440+
// Check memoization cache
441+
let key = plan as *const dyn ExecutionPlan as *const () as usize;
442+
if let Some(cached) = cache.0.get(&key) {
443+
return Ok(cached.clone());
444+
}
445+
370446
let children = plan.children();
371447

372-
// For leaf nodes, try providers with empty child stats.
373-
// For non-leaf nodes, recursively compute enhanced child stats first.
448+
// For leaf nodes, try providers with empty child stats
449+
// For non-leaf nodes, recursively compute enhanced child stats first
374450
let child_stats: Vec<ExtendedStatistics> = if children.is_empty() {
375451
Vec::new()
376452
} else {
377453
children
378454
.iter()
379-
.map(|child| self.compute(child.as_ref()))
455+
.map(|child| self.compute_cached(child.as_ref(), cache))
380456
.collect::<Result<Vec<_>>>()?
381457
};
382458

383-
for provider in &self.providers {
384-
match provider.compute_statistics(plan, &child_stats)? {
385-
StatisticsResult::Computed(stats) => return Ok(stats),
386-
StatisticsResult::Delegate => continue,
459+
let result = 'chain: {
460+
for provider in &self.providers {
461+
match provider.compute_statistics(plan, &child_stats)? {
462+
StatisticsResult::Computed(stats) => break 'chain stats,
463+
StatisticsResult::Delegate => continue,
464+
}
387465
}
388-
}
389-
// Fallback: use plan's built-in stats
390-
let base = plan.partition_statistics(None)?;
391-
Ok(ExtendedStatistics::new_arc(base))
466+
// Fallback: use plan's built-in stats
467+
let base = plan.partition_statistics(None)?;
468+
ExtendedStatistics::new_arc(base)
469+
};
470+
471+
cache.0.insert(key, result.clone());
472+
Ok(result)
392473
}
393474

394475
/// Compute statistics and return only the base Statistics (no extensions).

0 commit comments

Comments
 (0)