Skip to content

Commit 4c2af81

Browse files
adriangbclaude
andcommitted
catalog: query-aware statistics requests via ScanArgs / ScanResult
Adds an opt-in handshake that lets callers ask a `TableProvider` for specific stats by name and receive only what the provider can answer cheaply, instead of the all-or-nothing dense `Statistics` we have today. ## What's new * `datafusion-common::stats::StatisticsRequest` — enum of stat kinds that mirror `Statistics` / `ColumnStatistics` (Min, Max, NullCount, DistinctCount, Sum, ByteSize, RowCount, TotalByteSize). `Hash + Eq` so it can key a `HashMap`. * `datafusion-common::stats::StatisticsValue` — `Scalar(Precision<...>) | Distribution(Arc<dyn Any>) | Sketch(Arc<dyn Any>) | Absent`. Whether a value is exact or estimated travels in the `Precision` wrapper, not the variant. * `ScanArgs::with_statistics_requests` / `statistics_requests()` — the caller's question. * `ScanResult::with_statistics` / `statistics()` / `into_parts()` — the provider's answer, paired 1:1 with the requests slice. * `PartitionedFile::satisfied_stats` — sparse, `Arc<HashMap<StatisticsRequest, StatisticsValue>>` for per-file answers. Memory scales with what was asked, not with table width. Providers that store stats out-of-band (Delta/Iceberg/Hudi manifests, Hive Metastore, custom catalogs) can populate this directly without rebuilding a full dense `Statistics`. * `FilePruner` learns to consume the sparse map. Internally, `file_stats_pruning` is now `Box<dyn PruningStatistics + Send + Sync>` so we can dispatch between the existing `PrunableStatistics` (dense) and a new `SparseFilePruningStats` adapter (sparse). The sparse adapter looks up each `StatisticsRequest` directly in the map and materializes single-row arrays only for the columns the pruning predicate touches — no densify-then-throw-away. * `ListingTable::scan_with_args` populates `ScanResult.statistics` from the merged dense `Statistics` it already computed when `args.statistics_requests()` is set and `collect_statistics=true`. When `collect_statistics=false` it returns `Absent` for everything (the contract is "answer what's free"). `DistinctCount`/`Sum`/ `ByteSize` are likewise `Absent` for parquet — those aren't in thrift footers; layered helpers (or richer providers) can fill the gaps. ## Backwards compat All additions are opt-in: * `ScanArgs` / `ScanResult` gain new fields with `Default`-friendly initializers; existing callers that don't use the new builders see no change. * `FilePruner`'s field-type change is internal (private field). * The only minor source-level break is a new pub field on `PartitionedFile` (`satisfied_stats`). Callers using `PartitionedFile::new` / `From<ObjectMeta>` / the existing builders are unaffected. Direct struct literals — uncommon, none in-tree — need to add `satisfied_stats: None` (or use the new `with_satisfied_stats` builder). ## Tests * `datafusion-common::stats::tests::statistics_request_is_hashable_keyable` — round-trip a `StatisticsRequest` through a `HashMap`. * `datafusion-pruning::file_pruner::tests` — three tests demonstrating end-to-end pruning against a sparse-only `PartitionedFile` (`x > 100` prunes a `[10, 20]` file, `x > 15` doesn't, no stats at all → no pruner). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 9a29e33 commit 4c2af81

4 files changed

Lines changed: 408 additions & 7 deletions

File tree

datafusion/catalog/src/table.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use async_trait::async_trait;
2626
use datafusion_common::{Constraints, Statistics, not_impl_err};
2727
use datafusion_common::{Result, internal_err};
2828
use datafusion_expr::Expr;
29+
use datafusion_expr::statistics::StatisticsRequest;
2930

3031
use datafusion_expr::dml::InsertOp;
3132
use datafusion_expr::{
@@ -406,6 +407,7 @@ pub struct ScanArgs<'a> {
406407
filters: Option<&'a [Expr]>,
407408
projection: Option<&'a [usize]>,
408409
limit: Option<usize>,
410+
statistics_requests: &'a [StatisticsRequest],
409411
}
410412

411413
impl<'a> ScanArgs<'a> {
@@ -467,6 +469,31 @@ impl<'a> ScanArgs<'a> {
467469
pub fn limit(&self) -> Option<usize> {
468470
self.limit
469471
}
472+
473+
/// Set a list of statistics the caller would like the provider to
474+
/// answer if it can do so cheaply.
475+
///
476+
/// Typical sources a provider may answer from:
477+
/// * Parquet thrift footers (Min/Max/NullCount/RowCount, exact)
478+
/// * An external metadata catalog (Delta/Iceberg/Hudi manifests,
479+
/// Hive-Metastore-style stats columns)
480+
/// * Cached / materialized stats columns
481+
///
482+
/// The provider returns its answers on
483+
/// [`ScanResult::statistics`] paired 1:1 with `requests`. Anything
484+
/// not answerable should come back as [`StatisticsValue::Absent`] —
485+
/// the caller decides what to do with the gaps. The contract is
486+
/// "answer what's free, leave the rest as `Absent`": providers MUST
487+
/// NOT do expensive scans purely to satisfy these requests.
488+
pub fn with_statistics_requests(mut self, requests: &'a [StatisticsRequest]) -> Self {
489+
self.statistics_requests = requests;
490+
self
491+
}
492+
493+
/// Get the statistics requests. Empty if none were set.
494+
pub fn statistics_requests(&self) -> &'a [StatisticsRequest] {
495+
self.statistics_requests
496+
}
470497
}
471498

472499
/// Result of a table scan operation from [`TableProvider::scan_with_args`].

datafusion/datasource/src/mod.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ use chrono::TimeZone;
5858
use datafusion_common::stats::Precision;
5959
use datafusion_common::{ColumnStatistics, Result, exec_datafusion_err};
6060
use datafusion_common::{ScalarValue, Statistics};
61+
use datafusion_expr::statistics::SatisfiedStatistics;
6162
use datafusion_physical_expr::LexOrdering;
6263
use futures::{Stream, StreamExt};
6364
use object_store::{GetOptions, GetRange, ObjectStore};
@@ -138,6 +139,18 @@ pub struct PartitionedFile {
138139
/// When set via [`Self::with_statistics`], partition column statistics are automatically
139140
/// computed from [`Self::partition_values`] with exact min/max/null_count/distinct_count.
140141
pub statistics: Option<Arc<Statistics>>,
142+
/// Sparse, request-keyed stats answered by the provider for this file.
143+
///
144+
/// Only entries for non-`Absent` answers are present, so memory scales
145+
/// with the *count of stats actually requested* rather than the table's
146+
/// column count. Used in tandem with — not in place of —
147+
/// [`Self::statistics`]: existing consumers that read the dense
148+
/// `Statistics` keep working; new consumers (e.g. `FilePruner` in
149+
/// `datafusion-pruning`) prefer this sparse map when it's
150+
/// populated. Providers that store stats out-of-band (Delta/Iceberg/Hudi
151+
/// manifests, Hive Metastore, custom catalogs) can populate this
152+
/// directly without rebuilding a full dense `Statistics`.
153+
pub satisfied_stats: Option<Arc<SatisfiedStatistics>>,
141154
/// The known lexicographical ordering of the rows in this file, if any.
142155
///
143156
/// This describes how the data within the file is sorted with respect to one or more
@@ -168,6 +181,7 @@ impl PartitionedFile {
168181
partition_values: vec![],
169182
range: None,
170183
statistics: None,
184+
satisfied_stats: None,
171185
ordering: None,
172186
extensions: None,
173187
metadata_size_hint: None,
@@ -181,6 +195,7 @@ impl PartitionedFile {
181195
partition_values: vec![],
182196
range: None,
183197
statistics: None,
198+
satisfied_stats: None,
184199
ordering: None,
185200
extensions: None,
186201
metadata_size_hint: None,
@@ -200,6 +215,7 @@ impl PartitionedFile {
200215
partition_values: vec![],
201216
range: Some(FileRange { start, end }),
202217
statistics: None,
218+
satisfied_stats: None,
203219
ordering: None,
204220
extensions: None,
205221
metadata_size_hint: None,
@@ -328,6 +344,21 @@ impl PartitionedFile {
328344
self.ordering = ordering;
329345
self
330346
}
347+
348+
/// Attach a sparse map of provider-answered statistics for this file.
349+
///
350+
/// Used by table providers that store per-file stats out-of-band
351+
/// (Delta/Iceberg/Hudi manifests, Hive Metastore, custom catalogs)
352+
/// and want to surface them without reconstructing a full dense
353+
/// [`Statistics`]. Consumers (e.g. `FilePruner`) prefer this map
354+
/// when [`Self::statistics`] is not set.
355+
pub fn with_satisfied_stats(
356+
mut self,
357+
satisfied_stats: Arc<SatisfiedStatistics>,
358+
) -> Self {
359+
self.satisfied_stats = Some(satisfied_stats);
360+
self
361+
}
331362
}
332363

333364
impl From<ObjectMeta> for PartitionedFile {
@@ -337,6 +368,7 @@ impl From<ObjectMeta> for PartitionedFile {
337368
partition_values: vec![],
338369
range: None,
339370
statistics: None,
371+
satisfied_stats: None,
340372
ordering: None,
341373
extensions: None,
342374
metadata_size_hint: None,
@@ -534,6 +566,7 @@ pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec<FileGro
534566
byte_size: Precision::Absent,
535567
}],
536568
})),
569+
satisfied_stats: None,
537570
ordering: None,
538571
extensions: None,
539572
metadata_size_hint: None,

datafusion/expr-common/src/statistics.rs

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1632,3 +1632,121 @@ mod tests {
16321632
all_ops.into_iter().collect()
16331633
}
16341634
}
1635+
1636+
// ---------------------------------------------------------------------------
1637+
// Query-aware statistics request / response.
1638+
//
1639+
// A small extension to the existing `Statistics` model: instead of
1640+
// "give me everything you have for every column", a caller can ask for
1641+
// a specific list of stats by name. Providers that have something
1642+
// cheap to offer (parquet thrift footers, an external catalog, cached
1643+
// metadata) answer the entries they can; everything else comes back
1644+
// `Absent`. Callers (optimizer rules, layered helpers, etc.) decide
1645+
// what to do with the gaps.
1646+
//
1647+
// See `TableProvider::scan_with_args`
1648+
// (`ScanArgs::with_statistics_requests` / `ScanResult::statistics`)
1649+
// for the table-level handshake, and `PartitionedFile::satisfied_stats`
1650+
// for the per-file one.
1651+
// ---------------------------------------------------------------------------
1652+
1653+
use datafusion_common::Column;
1654+
use datafusion_common::stats::Precision;
1655+
1656+
/// What stat does the caller want?
1657+
///
1658+
/// Each variant maps onto a field of
1659+
/// [`datafusion_common::Statistics`] / [`datafusion_common::ColumnStatistics`]
1660+
/// so providers that already populate one can answer the other
1661+
/// trivially. The companion [`StatisticsValue`] is paired 1:1 with
1662+
/// the request in the response. Whether a value is exact or estimated
1663+
/// is encoded in the returned [`Precision`] wrapper, not in the
1664+
/// request kind itself — `DistinctCount` covers both an exact distinct
1665+
/// count from a metadata catalog and an HLL-style estimate from a
1666+
/// sampled scan.
1667+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1668+
pub enum StatisticsRequest {
1669+
/// Smallest non-null value of `column`.
1670+
Min(Column),
1671+
/// Largest non-null value of `column`.
1672+
Max(Column),
1673+
/// Number of NULLs in `column`.
1674+
NullCount(Column),
1675+
/// Number of distinct values in `column` (exact or estimated).
1676+
DistinctCount(Column),
1677+
/// Sum of values in `column` (numerics, widened per
1678+
/// `ColumnStatistics::sum_value`).
1679+
Sum(Column),
1680+
/// Encoded/output byte size of `column`.
1681+
ByteSize(Column),
1682+
/// Number of rows in the container (table / file).
1683+
RowCount,
1684+
/// Total byte size of the container's output.
1685+
TotalByteSize,
1686+
}
1687+
1688+
/// Response value paired 1:1 with an inbound [`StatisticsRequest`].
1689+
///
1690+
/// Variants are intentionally schema-agnostic: a provider answering
1691+
/// `Min(c)` returns `Scalar(Precision::Exact(ScalarValue::...))` with
1692+
/// the column's natural type; `RowCount` / `NullCount` / `DistinctCount`
1693+
/// return `Scalar(Precision::*(ScalarValue::UInt64(...)))`. The
1694+
/// `Distribution` variant carries a richly-typed [`Distribution`] for
1695+
/// providers that have one to surface.
1696+
#[derive(Debug, Clone)]
1697+
pub enum StatisticsValue {
1698+
/// A single scalar value.
1699+
Scalar(Precision<ScalarValue>),
1700+
/// A typed probability distribution. Boxed so the enum stays small —
1701+
/// `Distribution` is significantly larger than the other variants.
1702+
Distribution(Box<Distribution>),
1703+
/// Provider can't (or won't) answer this request. The caller
1704+
/// decides whether to fall back to another mechanism.
1705+
Absent,
1706+
}
1707+
1708+
impl StatisticsValue {
1709+
/// Convenience: an `Exact` scalar response.
1710+
pub fn exact(value: ScalarValue) -> Self {
1711+
Self::Scalar(Precision::Exact(value))
1712+
}
1713+
/// Convenience: an `Inexact` scalar response.
1714+
pub fn inexact(value: ScalarValue) -> Self {
1715+
Self::Scalar(Precision::Inexact(value))
1716+
}
1717+
}
1718+
1719+
/// Sparse map of stats answers, keyed by request. Used as the storage for
1720+
/// per-file answers (see `PartitionedFile::satisfied_stats`) and as the
1721+
/// internal representation for adapters that consume request-driven
1722+
/// stats. Only entries the provider actually answered are present, so
1723+
/// memory scales with what was asked rather than with table width.
1724+
pub type SatisfiedStatistics =
1725+
std::collections::HashMap<StatisticsRequest, StatisticsValue>;
1726+
1727+
#[cfg(test)]
1728+
mod stats_request_tests {
1729+
use super::*;
1730+
use std::collections::HashMap;
1731+
1732+
#[test]
1733+
fn statistics_request_is_hashable_keyable() {
1734+
// Sanity: two equal `StatisticsRequest`s hash equal and round-trip
1735+
// through a HashMap, so they can be used as keys (e.g. for the
1736+
// sparse `PartitionedFile::satisfied_stats` map).
1737+
let r1 = StatisticsRequest::Min(Column::new_unqualified("c"));
1738+
let r2 = StatisticsRequest::Min(Column::new_unqualified("c"));
1739+
assert_eq!(r1, r2);
1740+
let mut map: HashMap<StatisticsRequest, StatisticsValue> = HashMap::new();
1741+
map.insert(
1742+
r1.clone(),
1743+
StatisticsValue::exact(ScalarValue::Int64(Some(7))),
1744+
);
1745+
match map.get(&r2) {
1746+
Some(StatisticsValue::Scalar(Precision::Exact(ScalarValue::Int64(
1747+
Some(7),
1748+
)))) => {}
1749+
other => panic!("unexpected lookup: {other:?}"),
1750+
}
1751+
}
1752+
}

0 commit comments

Comments
 (0)