Skip to content

Commit 2f9ad30

Browse files
committed
Introduce CommitWindow to fix validation in multi-producer retry
_validate_concurrency was resolving the catalog HEAD via _parent_snapshot_id, which breaks when a Transaction has multiple producers (_DeleteFiles + _OverwriteFiles). On retry, the second producer's parent points to the first producer's uncommitted snapshot, which does not exist in the catalog metadata, causing a spurious ValidationException. CommitWindow explicitly separates the two snapshot references that validation needs: starting_snapshot_id (fixed at operation start) and catalog_head_snapshot_id (the actual catalog HEAD after refresh). _rebuild_snapshot_updates builds one CommitWindow and shares it across all producers, so each validates against the real catalog state. Add test_mixed_delete_overwrite_retries_successfully to cover the multi-producer retry path end-to-end. Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
1 parent 58ba6b0 commit 2f9ad30

3 files changed

Lines changed: 99 additions & 19 deletions

File tree

pyiceberg/table/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1133,11 +1133,20 @@ def _cleanup_uncommitted_manifests(self) -> None:
11331133
def _rebuild_snapshot_updates(self) -> None:
11341134
"""Rebuild snapshot updates for retry by re-executing registered producers."""
11351135
from pyiceberg.table.update import AddSnapshotUpdate, AssertRefSnapshotId, SetSnapshotRefUpdate
1136+
from pyiceberg.table.update.snapshot import CommitWindow
11361137

11371138
self._updates = tuple(u for u in self._updates if not isinstance(u, (AddSnapshotUpdate, SetSnapshotRefUpdate)))
11381139
self._requirements = tuple(r for r in self._requirements if not isinstance(r, AssertRefSnapshotId))
11391140

1141+
# Build CommitWindow: starting_snapshot_id is from the first producer (fixed at operation start),
1142+
# catalog_head_snapshot_id is the current catalog HEAD after refresh.
1143+
starting_id = self._snapshot_producers[0]._starting_snapshot_id if self._snapshot_producers else None
1144+
current_snapshot = self._table.metadata.current_snapshot()
1145+
catalog_head_id = current_snapshot.snapshot_id if current_snapshot else None
1146+
commit_window = CommitWindow(starting_snapshot_id=starting_id, catalog_head_snapshot_id=catalog_head_id)
1147+
11401148
for producer in self._snapshot_producers:
1149+
producer._commit_window = commit_window
11411150
producer._refresh_for_retry()
11421151
producer._validate_concurrency()
11431152
updates, requirements = producer._commit()

pyiceberg/table/update/snapshot.py

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from abc import abstractmethod
2323
from collections import defaultdict
2424
from collections.abc import Callable
25+
from dataclasses import dataclass
2526
from datetime import datetime
2627
from functools import cached_property
2728
from typing import TYPE_CHECKING, Generic
@@ -95,6 +96,18 @@ def _new_manifest_list_file_name(snapshot_id: int, attempt: int, commit_uuid: uu
9596
return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro"
9697

9798

99+
@dataclass
100+
class CommitWindow:
101+
"""Tracks the commit range to validate against during retry.
102+
103+
starting_snapshot_id: The snapshot when the operation began (fixed across retries).
104+
catalog_head_snapshot_id: The catalog's latest HEAD snapshot (updated on each retry).
105+
"""
106+
107+
starting_snapshot_id: int | None
108+
catalog_head_snapshot_id: int | None
109+
110+
98111
class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
99112
commit_uuid: uuid.UUID
100113
_io: FileIO
@@ -109,6 +122,7 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
109122
_target_branch: str | None
110123
_predicate: BooleanExpression
111124
_case_sensitive: bool
125+
_commit_window: CommitWindow | None
112126
_written_manifests: list[str]
113127
_uncommitted_manifests: list[str]
114128

@@ -144,6 +158,7 @@ def __init__(
144158
self._starting_snapshot_id = self._parent_snapshot_id
145159
self._predicate = AlwaysFalse()
146160
self._case_sensitive = True
161+
self._commit_window = None
147162
self._isolation_level_property: str = TableProperties.WRITE_DELETE_ISOLATION_LEVEL
148163

149164
def _validate_target_branch(self, branch: str | None) -> str | None:
@@ -407,8 +422,10 @@ def _refresh_for_retry(self) -> None:
407422
def _validate_concurrency(self) -> None:
408423
"""Validate that concurrent changes do not conflict with this operation.
409424
410-
Checks isolation level and uses the conflict detection filter to determine
411-
whether concurrent commits introduced conflicting data or delete files.
425+
Uses the CommitWindow to determine which catalog commits to validate against.
426+
The window spans from starting_snapshot (when the operation began) to the
427+
catalog HEAD (latest committed snapshot), covering all external concurrent commits.
428+
412429
Subclasses that do not require validation (e.g. fast append) should override
413430
with a no-op.
414431
"""
@@ -421,8 +438,11 @@ def _validate_concurrency(self) -> None:
421438
_validate_no_new_deletes_for_data_files,
422439
)
423440

424-
parent_snapshot = self._resolve_parent_snapshot()
425-
if parent_snapshot is None:
441+
if self._commit_window is None:
442+
return
443+
444+
catalog_head = self._resolve_catalog_head_snapshot()
445+
if catalog_head is None:
426446
return
427447

428448
starting_snapshot = self._resolve_starting_snapshot()
@@ -435,16 +455,27 @@ def _validate_concurrency(self) -> None:
435455
conflict_detection_filter = self._predicate if self._predicate != AlwaysFalse() else None
436456

437457
if isolation_level == IsolationLevel.SERIALIZABLE:
438-
_validate_added_data_files(table, parent_snapshot, conflict_detection_filter, starting_snapshot)
458+
_validate_added_data_files(table, catalog_head, conflict_detection_filter, starting_snapshot)
439459

440460
if conflict_detection_filter is not None:
441-
_validate_no_new_delete_files(table, parent_snapshot, conflict_detection_filter, None, starting_snapshot)
442-
_validate_deleted_data_files(table, parent_snapshot, conflict_detection_filter, starting_snapshot)
461+
_validate_no_new_delete_files(table, catalog_head, conflict_detection_filter, None, starting_snapshot)
462+
_validate_deleted_data_files(table, catalog_head, conflict_detection_filter, starting_snapshot)
443463

444464
if self._deleted_data_files:
445465
_validate_no_new_deletes_for_data_files(
446-
table, parent_snapshot, conflict_detection_filter, self._deleted_data_files, starting_snapshot
466+
table, catalog_head, conflict_detection_filter, self._deleted_data_files, starting_snapshot
467+
)
468+
469+
def _resolve_catalog_head_snapshot(self) -> Snapshot | None:
470+
"""Resolve the catalog HEAD snapshot from the CommitWindow."""
471+
if self._commit_window is None or self._commit_window.catalog_head_snapshot_id is None:
472+
return None
473+
snapshot = self._transaction._table.metadata.snapshot_by_id(self._commit_window.catalog_head_snapshot_id)
474+
if snapshot is None:
475+
raise ValidationException(
476+
f"Cannot find catalog head snapshot {self._commit_window.catalog_head_snapshot_id} in table metadata"
447477
)
478+
return snapshot
448479

449480
def _resolve_parent_snapshot(self) -> Snapshot | None:
450481
"""Resolve parent snapshot, raising ValidationException if ID is set but snapshot is missing."""
@@ -456,8 +487,8 @@ def _resolve_parent_snapshot(self) -> Snapshot | None:
456487
return snapshot
457488

458489
def _resolve_starting_snapshot(self) -> Snapshot:
459-
"""Resolve starting snapshot for the conflict detection window."""
460-
starting_id = self._starting_snapshot_id if self._starting_snapshot_id is not None else self._parent_snapshot_id
490+
"""Resolve starting snapshot for the conflict detection window from the CommitWindow."""
491+
starting_id = self._commit_window.starting_snapshot_id if self._commit_window else self._starting_snapshot_id
461492
if starting_id is None:
462493
raise ValidationException("Cannot resolve starting snapshot: both starting and parent snapshot IDs are None")
463494
snapshot = self._transaction._table.metadata.snapshot_by_id(starting_id)

tests/table/test_commit_retry.py

Lines changed: 49 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -632,8 +632,8 @@ def test_mixed_delete_overwrite_starts_from_catalog_snapshot(catalog: Catalog) -
632632
assert overwrite_producer._starting_snapshot_id == base_snapshot_id
633633

634634

635-
def test_validate_concurrency_raises_on_missing_parent_snapshot(catalog: Catalog) -> None:
636-
"""Validation should raise when parent_snapshot_id is non-null but cannot be resolved."""
635+
def test_validate_concurrency_raises_on_missing_catalog_head_snapshot(catalog: Catalog) -> None:
636+
"""Validation should raise when catalog_head_snapshot_id is non-null but cannot be resolved."""
637637
catalog.create_namespace("default")
638638
schema = _test_schema()
639639
table = catalog.create_table("default.missing_parent_test", schema=schema)
@@ -642,7 +642,7 @@ def test_validate_concurrency_raises_on_missing_parent_snapshot(catalog: Catalog
642642

643643
table.append(pa.table({"x": [1, 2, 3]}))
644644

645-
from pyiceberg.table.update.snapshot import _DeleteFiles
645+
from pyiceberg.table.update.snapshot import CommitWindow, _DeleteFiles
646646

647647
tx = Transaction(table, autocommit=False)
648648
producer = _DeleteFiles(
@@ -651,10 +651,12 @@ def test_validate_concurrency_raises_on_missing_parent_snapshot(catalog: Catalog
651651
io=table.io,
652652
)
653653

654-
# Artificially set a non-existent parent snapshot ID
655-
producer._parent_snapshot_id = 99999999
654+
# Set a CommitWindow with a non-existent catalog head snapshot ID
655+
producer._commit_window = CommitWindow(
656+
starting_snapshot_id=table.metadata.current_snapshot_id, catalog_head_snapshot_id=99999999
657+
)
656658

657-
with pytest.raises(ValidationException, match="Cannot find parent snapshot"):
659+
with pytest.raises(ValidationException, match="Cannot find catalog head snapshot"):
658660
producer._validate_concurrency()
659661

660662

@@ -668,7 +670,7 @@ def test_validate_concurrency_raises_on_missing_starting_snapshot(catalog: Catal
668670

669671
table.append(pa.table({"x": [1, 2, 3]}))
670672

671-
from pyiceberg.table.update.snapshot import _DeleteFiles
673+
from pyiceberg.table.update.snapshot import CommitWindow, _DeleteFiles
672674

673675
tx = Transaction(table, autocommit=False)
674676
producer = _DeleteFiles(
@@ -677,8 +679,46 @@ def test_validate_concurrency_raises_on_missing_starting_snapshot(catalog: Catal
677679
io=table.io,
678680
)
679681

680-
# parent is valid but starting is corrupted
681-
producer._starting_snapshot_id = 99999999
682+
# Set a CommitWindow with a non-existent starting snapshot ID
683+
producer._commit_window = CommitWindow(
684+
starting_snapshot_id=99999999, catalog_head_snapshot_id=table.metadata.current_snapshot_id
685+
)
682686

683687
with pytest.raises(ValidationException, match="Cannot find starting snapshot"):
684688
producer._validate_concurrency()
689+
690+
691+
def test_mixed_delete_overwrite_retries_successfully(catalog: Catalog) -> None:
692+
"""A mixed full-file + partial delete should succeed via retry, not raise ValidationException."""
693+
from pyiceberg.partitioning import PartitionField, PartitionSpec
694+
from pyiceberg.transforms import IdentityTransform
695+
696+
catalog.create_namespace("default")
697+
schema = Schema(
698+
NestedField(1, "category", StringType(), required=False),
699+
NestedField(2, "value", LongType(), required=False),
700+
)
701+
spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="category"))
702+
catalog.create_table("default.mixed_retry_test", schema=schema, partition_spec=spec)
703+
704+
import pyarrow as pa
705+
706+
tbl = catalog.load_table("default.mixed_retry_test")
707+
708+
# 3 partitions, one data file each: a->[1,2], b->[3,4], c->[5,6]
709+
tbl.append(pa.table({"category": ["a", "a", "b", "b", "c", "c"], "value": [1, 2, 3, 4, 5, 6]}))
710+
711+
tbl1 = catalog.load_table("default.mixed_retry_test")
712+
tbl2 = catalog.load_table("default.mixed_retry_test")
713+
714+
# Concurrent append to partition 'c' (commits first, advances the HEAD)
715+
tbl1.append(pa.table({"category": ["c"], "value": [7]}))
716+
717+
# Mixed delete on tbl2 (stale snapshot):
718+
# partition 'a' is a partial rewrite (value==1 deleted, value==2 remains) -> _OverwriteFiles
719+
# partition 'b' is a full delete (category == 'b') -> _DeleteFiles
720+
# This should NOT conflict with the append to 'c', so retry should succeed.
721+
tbl2.delete("value == 1 or category == 'b'")
722+
723+
result = catalog.load_table("default.mixed_retry_test").scan().to_arrow()
724+
assert sorted(result.column("value").to_pylist()) == [2, 5, 6, 7]

0 commit comments

Comments
 (0)