Skip to content

Commit cac01cd

Browse files
rampage644claude
andcommitted
fix(datafusion_iceberg): route full arrow_schema to PruneDataFiles
The second-stage data-file pruner (PruneDataFiles) was constructed with `partition_schema` — a subset schema holding only the Hive-style partition columns. Its `min_values`/`max_values` implementation looks up each column referenced by the pruning predicate via `arrow_schema.field_with_name(..)` to fetch the datatype, so any filter on a column absent from `partition_schema` silently returned `None` and pruned nothing. Identity-self-named partition columns (where `pf.name() == pf.source_name()`) are intentionally dropped from `file_partition_fields` so the parquet reader doesn't duplicate them between the path encoding and the file body, which also drops them from `table_partition_cols` and therefore from `partition_schema`. The result: a filter like `event_name = 'ad_start'` against a table partitioned by `identity(event_name)` reached the second- stage pruner but found no schema hit, so every partition file of the target was scanned in full (`files_ranges_pruned_statistics=0`). This only surfaced now because Embucket/embucket#126 unblocked the filter reaching TableScan in the first place. Fix: pass the full `arrow_schema` to `PruneDataFiles::new`. It has every column the predicate might reference — identity-self-named partition columns, non-partition columns with per-file statistics, etc. Correctness is preserved because the first-stage `PruneManifests` path still prunes transformed partition columns (`collector_tstamp_day`, `id_bucket`, ...) via manifest-list partition bounds, and synthetic partition-transform columns simply return `None` from `PruneDataFiles` (no per-file stats exist for them), which is the same behavior they had before. Adds a regression test: `test_identity_self_named_partition_filter_prunes_files` creates a `identity(kind)` partitioned table, inserts one row per partition value to materialize 3 distinct parquet files, then scans with `kind = 'a'` and asserts the resulting plan lists exactly 1 parquet file instead of 3. Refs: Embucket/embucket#127 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 8d242d5 commit cac01cd

1 file changed

Lines changed: 122 additions & 2 deletions

File tree

datafusion_iceberg/src/table.rs

Lines changed: 122 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -597,10 +597,19 @@ async fn table_scan(
597597

598598
let pruning_predicate =
599599
PruningPredicate::try_new(physical_predicate, arrow_schema.clone())?;
600-
// After the first pruning stage the data_files are pruned again based on the pruning statistics in the manifest files.
600+
// After the first pruning stage the data_files are pruned again based
601+
// on the pruning statistics in the manifest files. `PruneDataFiles`
602+
// looks up each column referenced by the predicate in its
603+
// `arrow_schema` field to fetch the datatype; passing the narrow
604+
// `partition_schema` here would hide every non-partition-key column
605+
// (including identity-self-named partition columns that have been
606+
// dropped from `table_partition_cols` because they are materialized
607+
// in the parquet file body), so any filter on such a column would
608+
// silently prune nothing. Passing the full `arrow_schema` lets the
609+
// manifest-level pruner reach any column with per-file statistics.
601610
let files_to_prune = pruning_predicate.prune(&PruneDataFiles::new(
602611
&schema,
603-
&partition_schema,
612+
&arrow_schema,
604613
&data_files,
605614
))?;
606615

@@ -3003,6 +3012,117 @@ mod tests {
30033012
.await;
30043013
}
30053014

3015+
#[tokio::test]
3016+
pub async fn test_identity_self_named_partition_filter_prunes_files() {
3017+
// Regression for `partition_schema` being passed to `PruneDataFiles` at
3018+
// `table.rs:601`: identity-self-named partition columns (where the
3019+
// partition field's `name()` equals its `source_name()`) are dropped
3020+
// from `file_partition_fields` upstream, so `partition_schema` doesn't
3021+
// contain them. When a filter references such a column, the second-
3022+
// stage `PruneDataFiles` pruner fails its arrow-schema lookup in
3023+
// `min_values` / `max_values` and returns `None`, so no file gets
3024+
// pruned. The fix is to pass the full `arrow_schema` — which contains
3025+
// every column in the table — to `PruneDataFiles::new`.
3026+
//
3027+
// Reproducer: partition by `identity(kind)` on a string column named
3028+
// `kind`, insert rows for 3 distinct `kind` values (one parquet file
3029+
// per partition), then scan with a filter `kind = 'a'`. The resulting
3030+
// plan's file_groups should contain exactly ONE parquet file, not 3.
3031+
3032+
use datafusion::physical_plan::displayable;
3033+
use datafusion::prelude::{col, lit};
3034+
use datafusion::catalog::TableProvider;
3035+
3036+
let object_store = ObjectStoreBuilder::memory();
3037+
let catalog: Arc<dyn Catalog> = Arc::new(
3038+
SqlCatalog::new("sqlite://", "identity_prune_probe", object_store)
3039+
.await
3040+
.unwrap(),
3041+
);
3042+
3043+
let schema = Schema::builder()
3044+
.with_struct_field(StructField {
3045+
id: 1,
3046+
name: "id".to_string(),
3047+
required: true,
3048+
field_type: Type::Primitive(PrimitiveType::Long),
3049+
doc: None,
3050+
})
3051+
.with_struct_field(StructField {
3052+
id: 2,
3053+
name: "kind".to_string(),
3054+
required: true,
3055+
field_type: Type::Primitive(PrimitiveType::String),
3056+
doc: None,
3057+
})
3058+
.build()
3059+
.unwrap();
3060+
3061+
// Identity-self-named: partition field "kind" on source column "kind".
3062+
let partition_spec = PartitionSpec::builder()
3063+
.with_partition_field(PartitionField::new(2, 1000, "kind", Transform::Identity))
3064+
.build()
3065+
.expect("Failed to build partition spec");
3066+
3067+
let table = Table::builder()
3068+
.with_name("identity_prune_probe")
3069+
.with_location("/test/identity_prune_probe")
3070+
.with_schema(schema)
3071+
.with_partition_spec(partition_spec)
3072+
.build(&["test".to_owned()], catalog)
3073+
.await
3074+
.expect("Failed to create partitioned table");
3075+
3076+
let table = Arc::new(DataFusionTable::from(table));
3077+
3078+
let ctx = SessionContext::new();
3079+
ctx.register_table("identity_prune_probe", table.clone())
3080+
.unwrap();
3081+
3082+
// Three rows with three distinct kind values → three partition files.
3083+
ctx.sql(
3084+
"INSERT INTO identity_prune_probe (id, kind) VALUES
3085+
(1, 'a'),
3086+
(2, 'b'),
3087+
(3, 'c');",
3088+
)
3089+
.await
3090+
.expect("Failed to create query plan for insert")
3091+
.collect()
3092+
.await
3093+
.expect("Failed to insert values into partitioned table");
3094+
3095+
// Sanity: three partition files exist unfiltered.
3096+
let state = ctx.state();
3097+
let unfiltered_plan = table
3098+
.scan(&state, None, &[], None)
3099+
.await
3100+
.expect("unfiltered scan should succeed");
3101+
let unfiltered_display = displayable(unfiltered_plan.as_ref())
3102+
.indent(false)
3103+
.to_string();
3104+
let unfiltered_parquet_count = unfiltered_display.matches(".parquet").count();
3105+
assert_eq!(
3106+
unfiltered_parquet_count, 3,
3107+
"precondition: unfiltered scan should list all 3 partition files, got {unfiltered_parquet_count}:\n{unfiltered_display}"
3108+
);
3109+
3110+
// Now scan with a filter that matches exactly one partition.
3111+
let filter = col("kind").eq(lit("a"));
3112+
let filtered_plan = table
3113+
.scan(&state, None, &[filter], None)
3114+
.await
3115+
.expect("filtered scan should succeed");
3116+
let filtered_display = displayable(filtered_plan.as_ref())
3117+
.indent(false)
3118+
.to_string();
3119+
let filtered_parquet_count = filtered_display.matches(".parquet").count();
3120+
assert_eq!(
3121+
filtered_parquet_count, 1,
3122+
"expected pruning filter `kind = 'a'` to reduce scan to exactly 1 parquet file, got {filtered_parquet_count}:\n{filtered_display}"
3123+
);
3124+
}
3125+
30063126
#[test]
30073127
fn test_fake_object_store_url() {
30083128
assert_eq!(

0 commit comments

Comments
 (0)