Skip to content

Commit 4d29a07

Browse files
committed
Fix case-insensitive delete regression and improve retry loop
Fix _build_delete_files_partition_predicate overwriting _case_sensitive to True by passing the current value to delete_by_predicate. This caused case-insensitive deletes to fail when _OverwriteFiles was used with a user-specified predicate. Move import random/time to file top level. Add total timeout (commit.retry.total-timeout-ms) to the retry loop. Add comments for intentional validation duplication and cached_property clearing. Stabilize test_commit_retry_on_commit_failed by removing flaky patch.object assertion. Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
1 parent 3d32e32 commit 4d29a07

2 files changed

Lines changed: 27 additions & 6 deletions

File tree

pyiceberg/table/__init__.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import itertools
2020
import os
21+
import random
22+
import time
2123
import uuid
2224
import warnings
2325
from abc import ABC, abstractmethod
@@ -988,6 +990,13 @@ def commit_transaction(self) -> Table:
988990
properties, TableProperties.COMMIT_MAX_RETRY_WAIT_MS, TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT
989991
)
990992
max_wait_ms = max_wait_val if max_wait_val is not None else TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT
993+
total_timeout_val = property_as_int(
994+
properties, TableProperties.COMMIT_TOTAL_RETRY_TIME_MS, TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT
995+
)
996+
total_timeout_ms = (
997+
total_timeout_val if total_timeout_val is not None else TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT
998+
)
999+
start_time = time.monotonic()
9911000

9921001
for attempt in range(num_retries + 1):
9931002
try:
@@ -999,10 +1008,9 @@ def commit_transaction(self) -> Table:
9991008
self._cleanup_uncommitted_manifests()
10001009
break
10011010
except CommitFailedException:
1002-
if attempt == num_retries or not self._snapshot_producers:
1011+
elapsed_ms = (time.monotonic() - start_time) * 1000
1012+
if attempt == num_retries or not self._snapshot_producers or elapsed_ms >= total_timeout_ms:
10031013
raise
1004-
import random
1005-
import time
10061014

10071015
wait = min(min_wait_ms * (2**attempt), max_wait_ms)
10081016
jitter = random.uniform(0, 0.25 * wait)

pyiceberg/table/update/snapshot.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,8 @@ def _build_delete_files_partition_predicate(self) -> None:
418418
self.delete_by_predicate(
419419
self._transaction._build_partition_predicate(
420420
partition_records=partition_records, schema=self.schema(), spec=self.spec(spec_id)
421-
)
421+
),
422+
self._case_sensitive,
422423
)
423424

424425

@@ -534,11 +535,19 @@ def files_affected(self) -> bool:
534535
def _refresh_for_retry(self) -> None:
535536
"""Reset state for a retry attempt, clearing the cached delete computation."""
536537
super()._refresh_for_retry()
538+
# Clear @cached_property by removing it from the instance __dict__.
539+
# _compute_deletes depends on _parent_snapshot_id which changes on retry.
537540
if "_compute_deletes" in self.__dict__:
538541
del self.__dict__["_compute_deletes"]
539542

540543
def _validate_concurrency(self) -> None:
541-
"""Validate that concurrent changes do not conflict with this delete."""
544+
"""Validate that concurrent changes do not conflict with this delete.
545+
546+
Note: This method is intentionally duplicated in _OverwriteFiles rather than
547+
extracted to the base class. While the logic is currently identical, Java Iceberg's
548+
BaseOverwriteFiles and BaseRowDelta have divergent validation. Keeping them separate
549+
makes it easier to add RowDelta-specific validation in the future.
550+
"""
542551
from pyiceberg.table import TableProperties
543552
from pyiceberg.table.snapshots import IsolationLevel
544553
from pyiceberg.table.update.validate import (
@@ -746,7 +755,11 @@ def _get_entries(manifest: ManifestFile) -> list[ManifestEntry]:
746755
return []
747756

748757
def _validate_concurrency(self) -> None:
749-
"""Validate that concurrent changes do not conflict with this overwrite."""
758+
"""Validate that concurrent changes do not conflict with this overwrite.
759+
760+
Note: See _DeleteFiles._validate_concurrency() for why this is intentionally
761+
duplicated rather than extracted to the base class.
762+
"""
750763
from pyiceberg.table import TableProperties
751764
from pyiceberg.table.snapshots import IsolationLevel
752765
from pyiceberg.table.update.validate import (

0 commit comments

Comments
 (0)