Skip to content

Commit 8898408

Browse files
rampage644claude
andauthored
fix: MERGE INTO on partitioned Iceberg tables (projection, TIMESTAMPTZ, pruning, manifest rewrite) (#57)
* fix: v2 manifest-list field renames + TIMESTAMPTZ date_transform + identity partition column filter Three independent fixes needed to read and scan real v2 Iceberg tables written by current Apache Iceberg (>= 1.0) and partitioned by an identity column or a day/hour transform on TIMESTAMPTZ. 1. `iceberg-rust-spec/src/spec/manifest_list.rs` - the v2 manifest_list Avro schema uses `added_data_files_count` / `existing_data_files_count` / `deleted_data_files_count`, but the reader still used the older `added_files_count` / `existing_files_count` / `deleted_files_count` names. Any manifest list written by modern Apache Iceberg failed to deserialize with "field not found" before the reader even reached an entry. Declare each count with `#[serde(rename = "added_data_files_count", alias = "added_files_count")]` so both new and legacy field names resolve cleanly, and update the static reader Avro schema to emit the current names. New regression test `test_manifest_list_v2_apache_field_names` simulates an Apache Iceberg >= 1.0 writer and asserts the reader deserializes it. 2. `datafusion_iceberg/src/pruning_statistics.rs` - the internal `DateTransform` UDF used a hardcoded `OneOf(Exact([Utf8, Date32]), Exact([Utf8, Timestamp(us, None)]))` signature, so any timezone-aware timestamp fell through with a type-check error. Replace with `TypeSignature::UserDefined` plus a `coerce_types` impl that accepts any `Timestamp(*, *)` and normalizes to `Timestamp(Microsecond, None)` for the physical call. Partition transforms operate on i64 microseconds-since-epoch and are timezone- agnostic, so stripping the tz on input is safe. 3. `datafusion_iceberg/src/table.rs::datafusion_partition_columns` - skip partition fields whose transform is `Identity` and whose name equals the source column name. For those, Iceberg materializes the column both in the parquet file body and in the Hive-style directory encoding; DataFusion's parquet reader then trips on an off-by-one ("expected N cols but got N+1") because it tries to derive the same column from both places. A follow-up commit promotes this filter out of `datafusion_partition_columns` so the manifest pruner sees the same filtered list. * refactor(table_scan): promote identity-self-named partition filter to table_scan Move the identity-self-named partition drop out of `datafusion_partition_columns` and into `table_scan` itself, so the physical scan column set and the manifest pruner's `partition_column_names` set are computed from the same filtered list. Previously they diverged: `datafusion_partition_columns` filtered out identity-self-named fields but the pruner still built its column subset from the unfiltered `partition_fields`, which meant filters on identity-self-named columns were incorrectly routed through `PruneManifests` (and then failed the subset test on the reduced partition schema anyway). Introduces `file_partition_fields` (kept) and `drop_partition_indices` (dropped), constructed once at the top of `table_scan` from the unfiltered `partition_fields`. Both are then threaded through every downstream consumer: - `datafusion_partition_columns` is called with the kept list. - The manifest-level pruner's `partition_column_names` set is built from the kept list and a comment documents that identity-self-named predicates are intentionally excluded here because they are pruned by per-file statistics in `PruneDataFiles` instead. - `drop_partition_indices` is later consumed by `generate_partitioned_file` so callers that still need to see the unfiltered partition-field order can account for the gaps. Prerequisite for follow-up commits that add the projection remap, TIMESTAMPTZ transform acceptance, PruneDataFiles arrow_schema fix, and manifest nested-id resolution. * fix(datafusion_iceberg): remap caller projection to combined-schema space DataFusionTable::schema() returns [file_schema, __data_file_path?, __manifest_file_path?] but the physical FileScanConfig output is [file_schema, kept_partition_transform_cols..., __data_file_path?, __manifest_file_path?]. Any partition spec with a non-identity transform (day, hour, month, year, bucket, truncate) creates synthetic columns (e.g. ts_day for day(ts)) that sit between the user columns and the metadata columns in the physical schema but are absent from the provider schema. table_scan() was passing the caller's `projection` (indices into the provider schema) straight through to FileScanConfig::with_projection, which interprets indices against the combined schema. With enable_data_file_path_column=true this picked up `ts_day` at the slot where `__data_file_path` was expected and silently truncated `__manifest_file_path`, which in turn made any downstream ProjectionExec referencing those columns by name+index fail with: Internal error: Input field name <col>_<transform> does not match with the projection expression __data_file_path. Embucket's MERGE COW planner hits this on every partitioned target. Fix: compute combined_projection once from the caller's projection, remapping provider-schema indices for __data_file_path / __manifest_file_path to their actual positions in [file_schema, kept_partition_cols, __data_file_path?, __manifest_file_path?]. Use combined_projection throughout table_scan (no-delete path, equality-delete base, per-closure clones). Adds 7 regression tests (day, hour, month, year, bucket, truncate, renamed-identity) alongside the existing unpartitioned test_datafusion_table_insert_with_data_file_path. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(arrow/transform): accept TIMESTAMP_TZ for day/hour/month/year transforms `transform_arrow()` only matched `DataType::Timestamp(TimeUnit::Microsecond, None)` for the day/hour/month/year arms, so any `timestamptz` column fell through to the catchall and raised `Compute error: Failed to perform transform for datatype`. Embucket's MERGE write path on `events_hooli` — whose `collector_tstamp` is `TIMESTAMP_TZ` partitioned by `day(collector_tstamp)` — tripped this every time. Iceberg's day/hour/month/year transforms are defined on the absolute instant (microseconds since the Unix epoch), so the Arrow timezone metadata is irrelevant to the numeric result. Widen each arm to `Timestamp(Microsecond, _)`. For month and year the existing `date_part` call used a named-timezone path that requires `chrono-tz`; cast to `Timestamp(Microsecond, None)` first so we run on a naive variant that works without that feature flag. Adds 4 regression tests exercising all four transforms with a `TimestampMicrosecondArray::with_timezone("UTC")` input to lock the fix in. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(datafusion_iceberg): route full arrow_schema to PruneDataFiles The second-stage data-file pruner (PruneDataFiles) was constructed with `partition_schema` — a subset schema holding only the Hive-style partition columns. Its `min_values`/`max_values` implementation looks up each column referenced by the pruning predicate via `arrow_schema.field_with_name(..)` to fetch the datatype, so any filter on a column absent from `partition_schema` silently returned `None` and pruned nothing. Identity-self-named partition columns (where `pf.name() == pf.source_name()`) are intentionally dropped from `file_partition_fields` so the parquet reader doesn't duplicate them between the path encoding and the file body, which also drops them from `table_partition_cols` and therefore from `partition_schema`. The result: a filter like `event_name = 'ad_start'` against a table partitioned by `identity(event_name)` reached the second- stage pruner but found no schema hit, so every partition file of the target was scanned in full (`files_ranges_pruned_statistics=0`). This only surfaced now because Embucket/embucket#126 unblocked the filter reaching TableScan in the first place. Fix: pass the full `arrow_schema` to `PruneDataFiles::new`. It has every column the predicate might reference — identity-self-named partition columns, non-partition columns with per-file statistics, etc. Correctness is preserved because the first-stage `PruneManifests` path still prunes transformed partition columns (`collector_tstamp_day`, `id_bucket`, ...) via manifest-list partition bounds, and synthetic partition-transform columns simply return `None` from `PruneDataFiles` (no per-file stats exist for them), which is the same behavior they had before. Adds a regression test: `test_identity_self_named_partition_filter_prunes_files` creates a `identity(kind)` partitioned table, inserts one row per partition value to materialize 3 distinct parquet files, then scans with `kind = 'a'` and asserts the resulting plan lists exactly 1 parquet file instead of 3. Refs: Embucket/embucket#127 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(manifest): resolve nested column ids in DataFile statistics DataFile statistics maps (`lower_bounds`, `upper_bounds`, `column_sizes`, `value_counts`, `null_value_counts`, `nan_value_counts`) are keyed by global column id and Iceberg assigns those ids from the same pool at every depth — a struct field nested inside a list<struct> or inside a context/unstruct top-level column is just as valid a key as a top-level column. `AvroMap::into_value_map` was looking keys up via `StructType::get`, which only consults the *top-level* `lookup` table, so any nested id (e.g. Snowplow's `contexts_com_snowplowanalytics_*.*` fields reaching into the 400-700 range) silently failed with `ColumnNotInSchema`. That error was then `.unwrap()`'d inside `from_existing_with_filter`'s per-entry closure, which panicked the tokio worker and aborted the whole Lambda (`signal: aborted`) on any MERGE that touched a real Snowplow events table. Three fixes, smallest-to-largest: 1. `StructType::field_by_id(id)` — new recursive id lookup that walks nested `Struct`, `List`, and `Map` types. Independent from the existing top-level-only `get` so current callers of `get` are unaffected. 2. `AvroMap::into_value_map` now resolves ids via `field_by_id`. Unknown ids — entries pointing at fields that have been removed from the schema since the manifest was written — are now skipped rather than raised as `ColumnNotInSchema`. This matches Iceberg's schema-evolution semantics (old stats on removed fields are tolerated on read). 3. `iceberg-rust/src/table/manifest.rs::from_existing_with_filter`'s main rewrite loop is switched from `filter_map(...).unwrap()` to an explicit `for` loop that propagates per-entry errors via `?`. Any future deserialization edge case surfaces as a clean `Error` instead of a SIGABRT inside a tokio worker. Two new regression tests: - `types::tests::field_by_id_finds_nested_fields` — covers top-level, struct-of-struct, list<struct>, map<string, struct>, and unknown ids. - `manifest::tests::into_value_map_accepts_nested_field_ids` — builds an `AvroMap<ByteBuf>` with a nested-field key (479 inside a list<struct>), a top-level key, and an unknown key, and asserts all three paths (decode nested, decode top-level, silently skip unknown). Reproduced end-to-end: pre-fix, `MERGE INTO demo.atomic.events_hooli` aborts the Lambda after ~21s with `panicked at iceberg-rust/src/table/manifest.rs:549:18: ... Column 479 not in schema`. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent af05323 commit 8898408

7 files changed

Lines changed: 1081 additions & 71 deletions

File tree

datafusion_iceberg/src/pruning_statistics.rs

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -357,14 +357,14 @@ struct DateTransform {
357357

358358
impl DateTransform {
359359
fn new() -> Self {
360+
// Accept any second-argument type via `TypeSignature::UserDefined` and
361+
// normalize it in `coerce_types`. The underlying transform is
362+
// timezone-agnostic (it operates on the i64 microseconds-since-epoch),
363+
// so any `Timestamp(Microsecond, *)` is a valid input — we just need
364+
// to strip the timezone metadata so the physical invocation sees
365+
// `Timestamp(Microsecond, None)`.
360366
let signature = Signature {
361-
type_signature: TypeSignature::OneOf(vec![
362-
TypeSignature::Exact(vec![DataType::Utf8, DataType::Date32]),
363-
TypeSignature::Exact(vec![
364-
DataType::Utf8,
365-
DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Microsecond, None),
366-
]),
367-
]),
367+
type_signature: TypeSignature::UserDefined,
368368
volatility: Volatility::Immutable,
369369
};
370370
Self { signature }
@@ -388,6 +388,38 @@ impl ScalarUDFImpl for DateTransform {
388388
Ok(DataType::Int32)
389389
}
390390

391+
fn coerce_types(
392+
&self,
393+
arg_types: &[DataType],
394+
) -> datafusion::error::Result<Vec<DataType>> {
395+
use datafusion::arrow::datatypes::TimeUnit;
396+
if arg_types.len() != 2 {
397+
return Err(DataFusionError::Plan(format!(
398+
"date_transform expects 2 arguments, got {}",
399+
arg_types.len()
400+
)));
401+
}
402+
if !matches!(arg_types[0], DataType::Utf8 | DataType::LargeUtf8) {
403+
return Err(DataFusionError::Plan(format!(
404+
"date_transform first argument must be Utf8, got {}",
405+
arg_types[0]
406+
)));
407+
}
408+
let coerced_second = match &arg_types[1] {
409+
DataType::Date32 => DataType::Date32,
410+
DataType::Timestamp(TimeUnit::Microsecond, _) => {
411+
DataType::Timestamp(TimeUnit::Microsecond, None)
412+
}
413+
DataType::Timestamp(unit, _) => DataType::Timestamp(*unit, None),
414+
other => {
415+
return Err(DataFusionError::Plan(format!(
416+
"date_transform second argument must be Date32 or Timestamp, got {other}"
417+
)))
418+
}
419+
};
420+
Ok(vec![DataType::Utf8, coerced_second])
421+
}
422+
391423
fn invoke_with_args(
392424
&self,
393425
args: ScalarFunctionArgs,

0 commit comments

Comments
 (0)