Skip to content

Commit db234e4

Browse files
committed
fix: dynamic_partition_overwrite now builds per-spec delete predicates after partition spec evolution
Fixes #3148 When a table has undergone partition spec evolution, its snapshot may contain manifests written under different partition_spec_ids. Previously, dynamic_partition_overwrite built the delete predicate using only the current spec, causing the manifest evaluator to incorrectly skip manifests from older specs — leaving stale data files silently behind. The fix builds the delete predicate per historical spec present in the snapshot, projecting the new data files' partition values into each spec's coordinate space before evaluating. Regression tests added covering: - Mixed-spec snapshot (manifests from both spec-0 and spec-1) - Overwrite of a partition that only exists in spec-0 manifests (silent data duplication case)
1 parent 8691d94 commit db234e4

File tree

2 files changed

+330
-5
lines changed

2 files changed

+330
-5
lines changed

pyiceberg/table/__init__.py

Lines changed: 63 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -542,12 +542,70 @@ def dynamic_partition_overwrite(
542542
)
543543
)
544544

545-
partitions_to_overwrite = {data_file.partition for data_file in data_files}
546-
delete_filter = self._build_partition_predicate(
547-
partition_records=partitions_to_overwrite, spec=self.table_metadata.spec(), schema=self.table_metadata.schema()
548-
)
549-
self.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties, branch=branch)
545+
# partitions_to_overwrite = {data_file.partition for data_file in data_files}
546+
# delete_filter = self._build_partition_predicate(
547+
# partition_records=partitions_to_overwrite, spec=self.table_metadata.spec(), schema=self.table_metadata.schema()
548+
# )
549+
# self.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties, branch=branch)
550+
551+
# Build the partition predicate per-spec to handle tables that have
552+
# undergone partition spec evolution. Manifests in the snapshot may be
553+
# written under different (older) specs. We need to project the overwrite
554+
# partitions into each historical spec's coordinate space so that the
555+
# manifest evaluator correctly identifies which old manifests to delete.
556+
# See: https://github.com/apache/iceberg-python/issues/XXXX
557+
current_spec = self.table_metadata.spec()
558+
current_schema = self.table_metadata.schema()
550559

560+
# Collect the source column names (e.g. "category") that are being
561+
# overwritten — these are stable across spec evolution (only field IDs matter).
562+
overwrite_source_ids = {field.source_id for field in current_spec.fields}
563+
564+
delete_filter: BooleanExpression = AlwaysFalse()
565+
566+
# For each historical spec in the snapshot, build a predicate using
567+
# only the fields that spec knows about, matched against the
568+
# corresponding positions in the new data files' partition records.
569+
snapshot = self.table_metadata.snapshot_by_name(branch or MAIN_BRANCH)
570+
if snapshot is not None:
571+
spec_ids_in_snapshot = {m.partition_spec_id for m in snapshot.manifests(io=self._table.io)}
572+
else:
573+
spec_ids_in_snapshot = {current_spec.spec_id}
574+
575+
for spec_id in spec_ids_in_snapshot:
576+
historical_spec = self.table_metadata.specs()[spec_id]
577+
# Find which fields this historical spec shares with the current spec
578+
shared_source_ids = {f.source_id for f in historical_spec.fields} & overwrite_source_ids
579+
if not shared_source_ids:
580+
# No overlap — this spec's manifests cannot contain our partitions
581+
continue
582+
583+
# Project the new data files' partitions into this historical spec's space:
584+
# for each new data file, build a partition record using only the
585+
# fields this historical spec knows about.
586+
historical_partitions: set[Record] = set()
587+
for data_file in data_files:
588+
# data_file.partition is under current_spec — extract shared field values
589+
record_values = []
590+
for field in historical_spec.fields:
591+
if field.source_id in overwrite_source_ids:
592+
# find position of this source_id in current spec
593+
current_pos = next(i for i, f in enumerate(current_spec.fields) if f.source_id == field.source_id)
594+
record_values.append(data_file.partition[current_pos])
595+
else:
596+
record_values.append(None)
597+
historical_partitions.add(Record(*record_values))
598+
599+
delete_filter = Or(
600+
delete_filter,
601+
self._build_partition_predicate(
602+
partition_records=historical_partitions,
603+
spec=historical_spec,
604+
schema=current_schema,
605+
),
606+
)
607+
608+
self.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties, branch=branch)
551609
with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files:
552610
append_files.commit_uuid = append_snapshot_commit_uuid
553611
for data_file in data_files:
Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
"""
2+
Regression / investigation test for manifest pruning correctness under partition spec evolution.
3+
4+
Context
5+
-------
6+
PR #3011 (merged Feb 20 2026) added manifest pruning to _OverwriteFiles and _DeleteFiles
7+
in pyiceberg/table/update/snapshot.py. The pruning builds a partition predicate from the
8+
*current* partition spec and evaluates it against every manifest in the snapshot via a
9+
KeyDefaultDict of per-spec evaluators.
10+
11+
The question this test file investigates:
12+
When a table has been through partition spec evolution, its snapshot may contain manifests
13+
written under *different* partition_spec_ids. Does the manifest evaluator correctly resolve
14+
each manifest's own spec before deciding whether to include or skip it?
15+
16+
If the answer is "no", the overwrite will silently skip manifests from the old spec, leaving
17+
stale data files that should have been deleted -- a silent correctness bug.
18+
19+
How to run
20+
----------
21+
pytest tests/integration/test_manifest_pruning_spec_evolution.py -v
22+
"""
23+
24+
import tempfile
25+
from typing import Any
26+
27+
import pyarrow as pa
28+
29+
from pyiceberg.catalog import Catalog, load_catalog
30+
from pyiceberg.partitioning import PartitionField, PartitionSpec
31+
from pyiceberg.schema import Schema
32+
from pyiceberg.transforms import IdentityTransform
33+
from pyiceberg.types import LongType, NestedField, StringType
34+
35+
# ---------------------------------------------------------------------------
36+
# Helpers
37+
# ---------------------------------------------------------------------------
38+
39+
SCHEMA = Schema(
40+
NestedField(field_id=1, name="category", field_type=StringType(), required=False),
41+
NestedField(field_id=2, name="region", field_type=StringType(), required=False),
42+
NestedField(field_id=3, name="value", field_type=LongType(), required=False),
43+
)
44+
45+
# Spec 0: partitioned only by category
46+
SPEC_V0 = PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="category"))
47+
48+
49+
def make_catalog(warehouse: str) -> Catalog:
50+
"""Spin up a local SQLite-backed catalog -- no services needed."""
51+
return load_catalog(
52+
"test",
53+
type="sql",
54+
uri=f"sqlite:///{warehouse}/catalog.db",
55+
warehouse=f"file://{warehouse}",
56+
)
57+
58+
59+
def arrow_table(rows: list[dict[str, Any]]) -> pa.Table:
60+
return pa.Table.from_pylist(
61+
rows,
62+
schema=pa.schema(
63+
[
64+
pa.field("category", pa.string()),
65+
pa.field("region", pa.string()),
66+
pa.field("value", pa.int64()),
67+
]
68+
),
69+
)
70+
71+
72+
# ---------------------------------------------------------------------------
73+
# Test 1: Mixed spec snapshot -- overwrite partition present in both specs
74+
# ---------------------------------------------------------------------------
75+
76+
77+
def test_overwrite_after_partition_spec_evolution_correctness() -> None:
78+
"""
79+
Verifies that dynamic_partition_overwrite correctly replaces ALL data files
80+
for the target partition, including those written under a previous partition spec.
81+
82+
Setup:
83+
- Spec 0: partition by identity(category)
84+
- Write A(1,2,3) and B(10,11) under spec 0
85+
- Evolve to spec 1: add identity(region)
86+
- Write A(100,101) and B(200) under spec 1
87+
- Overwrite category=A with new rows (999, 888)
88+
89+
Expected after overwrite:
90+
- Only new A rows: values [888, 999]
91+
- All B rows untouched: values [10, 11, 200]
92+
- Total: 5 rows
93+
94+
Bug (pre-fix): spec-0 A manifests are skipped by the evaluator,
95+
leaving stale A rows (1, 2, 3) in the table -> 8 rows total.
96+
"""
97+
with tempfile.TemporaryDirectory() as warehouse:
98+
catalog = make_catalog(warehouse)
99+
catalog.create_namespace("default")
100+
101+
# --- Step 1: create table with spec 0 ---
102+
table = catalog.create_table(
103+
"default.test_spec_evolution_overwrite",
104+
schema=SCHEMA,
105+
partition_spec=SPEC_V0,
106+
)
107+
108+
# --- Step 2: write data under spec 0 ---
109+
table.append(
110+
arrow_table(
111+
[
112+
{"category": "A", "region": None, "value": 1},
113+
{"category": "A", "region": None, "value": 2},
114+
{"category": "A", "region": None, "value": 3},
115+
{"category": "B", "region": None, "value": 10},
116+
{"category": "B", "region": None, "value": 11},
117+
]
118+
)
119+
)
120+
assert table.scan().to_arrow().num_rows == 5
121+
122+
# --- Step 3: evolve partition spec -- add identity(region) ---
123+
with table.update_spec() as update:
124+
update.add_field(
125+
source_column_name="region",
126+
transform=IdentityTransform(),
127+
partition_field_name="region",
128+
)
129+
table = catalog.load_table("default.test_spec_evolution_overwrite")
130+
assert table.spec().spec_id == 1, f"Expected spec_id=1, got {table.spec().spec_id}"
131+
132+
# --- Step 4: write data under spec 1 ---
133+
table.append(
134+
arrow_table(
135+
[
136+
{"category": "A", "region": "us", "value": 100},
137+
{"category": "A", "region": "eu", "value": 101},
138+
{"category": "B", "region": "us", "value": 200},
139+
]
140+
)
141+
)
142+
assert table.scan().to_arrow().num_rows == 8
143+
144+
# Confirm mixed-spec snapshot is actually set up
145+
current_snapshot = table.current_snapshot()
146+
assert current_snapshot is not None
147+
manifests = current_snapshot.manifests(table.io)
148+
spec_ids_in_snapshot = {m.partition_spec_id for m in manifests}
149+
assert len(spec_ids_in_snapshot) > 1, f"Test setup failed: expected manifests from >1 spec, got {spec_ids_in_snapshot}"
150+
151+
# --- Step 5: dynamic_partition_overwrite for category=A only ---
152+
table.dynamic_partition_overwrite(
153+
arrow_table(
154+
[
155+
{"category": "A", "region": "us", "value": 999},
156+
{"category": "A", "region": "eu", "value": 888},
157+
]
158+
)
159+
)
160+
161+
table = catalog.load_table("default.test_spec_evolution_overwrite")
162+
result = table.scan().to_arrow().to_pydict()
163+
164+
categories = result["category"]
165+
values = result["value"]
166+
167+
a_values = [v for c, v in zip(categories, values, strict=True) if c == "A"]
168+
b_values = [v for c, v in zip(categories, values, strict=True) if c == "B"]
169+
170+
# Total rows: 2 new A + 3 B = 5
171+
assert len(a_values) + len(b_values) == 5, (
172+
f"Row count mismatch: expected 5, got {len(a_values) + len(b_values)}.\n"
173+
f"A values: {sorted(a_values)} -- stale values would be any of [1, 2, 3, 100, 101]\n"
174+
f"B values: {sorted(b_values)}"
175+
)
176+
177+
# A rows must be only the new ones
178+
stale = [v for v in a_values if v in (1, 2, 3, 100, 101)]
179+
assert not stale, (
180+
f"Stale A rows found (should have been deleted): {stale}\n"
181+
f"spec-0 manifests were incorrectly skipped during manifest pruning."
182+
)
183+
assert sorted(a_values) == [888, 999], f"Expected A=[888,999], got {sorted(a_values)}"
184+
185+
# B rows completely untouched
186+
assert sorted(b_values) == [10, 11, 200], f"Expected B=[10,11,200], got {sorted(b_values)}"
187+
188+
189+
# ---------------------------------------------------------------------------
190+
# Test 2: Overwrite partition that ONLY exists in spec-0 manifests
191+
# This is the most dangerous case -- silent data duplication, no exception raised
192+
# ---------------------------------------------------------------------------
193+
194+
195+
def test_overwrite_partition_only_in_old_spec() -> None:
196+
"""
197+
Sharpest form of the bug: the overwrite target (category=B) has data
198+
ONLY under spec-0. After spec evolution to spec-1, overwriting B should
199+
delete the old spec-0 B files and write new ones.
200+
201+
Bug (pre-fix): the manifest evaluator, built against spec-1's predicate,
202+
finds zero matching manifests for B (because B only exists in spec-0
203+
manifests) -> UserWarning "did not match any records" -> old B rows survive
204+
-> silent data duplication: [999, 10, 11] instead of [999].
205+
"""
206+
with tempfile.TemporaryDirectory() as warehouse:
207+
catalog = make_catalog(warehouse)
208+
catalog.create_namespace("default")
209+
210+
table = catalog.create_table(
211+
"default.test_old_spec_only_overwrite",
212+
schema=SCHEMA,
213+
partition_spec=SPEC_V0,
214+
)
215+
216+
# Write ONLY category=B under spec 0
217+
table.append(
218+
arrow_table(
219+
[
220+
{"category": "B", "region": None, "value": 10},
221+
{"category": "B", "region": None, "value": 11},
222+
]
223+
)
224+
)
225+
226+
# Evolve spec -- add identity(region)
227+
with table.update_spec() as update:
228+
update.add_field(
229+
source_column_name="region",
230+
transform=IdentityTransform(),
231+
partition_field_name="region",
232+
)
233+
table = catalog.load_table("default.test_old_spec_only_overwrite")
234+
235+
# Write ONLY category=A under spec 1 (B has no spec-1 data)
236+
table.append(
237+
arrow_table(
238+
[
239+
{"category": "A", "region": "us", "value": 100},
240+
]
241+
)
242+
)
243+
244+
# Overwrite category=B -- it only exists in spec-0 manifests
245+
table.dynamic_partition_overwrite(
246+
arrow_table(
247+
[
248+
{"category": "B", "region": "us", "value": 999},
249+
]
250+
)
251+
)
252+
253+
table = catalog.load_table("default.test_old_spec_only_overwrite")
254+
result = table.scan().to_arrow().to_pydict()
255+
256+
categories = result["category"]
257+
values = result["value"]
258+
259+
b_values = [v for c, v in zip(categories, values, strict=True) if c == "B"]
260+
a_values = [v for c, v in zip(categories, values, strict=True) if c == "A"]
261+
262+
assert b_values == [999], (
263+
f"Expected B=[999] only, got {b_values}.\n"
264+
f"Stale rows {[v for v in b_values if v != 999]} were not deleted -- "
265+
f"spec-0 manifests were incorrectly skipped."
266+
)
267+
assert a_values == [100], f"A data unexpectedly modified: {a_values}"

0 commit comments

Comments
 (0)