Skip to content

Commit b81a1c1

Browse files
committed
Add validation guards and isolation level property routing
Skip _validate_no_new_delete_files and _validate_deleted_data_files when conflict_detection_filter is None, matching Java's BaseOverwriteFiles.validate() behavior for rowFilter == AlwaysFalse(). Route isolation level property based on the calling operation. Transaction.delete() uses write.delete.isolation-level (default). Transaction.overwrite(), dynamic_partition_overwrite(), and upsert() use write.update.isolation-level via _isolation_level_property on the snapshot producer. Remove unused WRITE_MERGE_ISOLATION_LEVEL constant.` Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
1 parent 43337e6 commit b81a1c1

3 files changed

Lines changed: 84 additions & 9 deletions

File tree

pyiceberg/table/__init__.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,6 @@ class TableProperties:
220220

221221
WRITE_DELETE_ISOLATION_LEVEL = "write.delete.isolation-level"
222222
WRITE_UPDATE_ISOLATION_LEVEL = "write.update.isolation-level"
223-
WRITE_MERGE_ISOLATION_LEVEL = "write.merge.isolation-level"
224223
WRITE_ISOLATION_LEVEL_DEFAULT = "serializable"
225224

226225

@@ -569,7 +568,12 @@ def dynamic_partition_overwrite(
569568
delete_filter = self._build_partition_predicate(
570569
partition_records=partitions_to_overwrite, spec=self.table_metadata.spec(), schema=self.table_metadata.schema()
571570
)
572-
self.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties, branch=branch)
571+
self.delete(
572+
delete_filter=delete_filter,
573+
snapshot_properties=snapshot_properties,
574+
branch=branch,
575+
_isolation_level_property=TableProperties.WRITE_UPDATE_ISOLATION_LEVEL,
576+
)
573577

574578
with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files:
575579
append_files.commit_uuid = append_snapshot_commit_uuid
@@ -626,6 +630,7 @@ def overwrite(
626630
case_sensitive=case_sensitive,
627631
snapshot_properties=snapshot_properties,
628632
branch=branch,
633+
_isolation_level_property=TableProperties.WRITE_UPDATE_ISOLATION_LEVEL,
629634
)
630635

631636
with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files:
@@ -643,6 +648,7 @@ def delete(
643648
snapshot_properties: dict[str, str] = EMPTY_DICT,
644649
case_sensitive: bool = True,
645650
branch: str | None = MAIN_BRANCH,
651+
_isolation_level_property: str | None = None,
646652
) -> None:
647653
"""
648654
Shorthand for deleting record from a table.
@@ -670,6 +676,8 @@ def delete(
670676
delete_filter = _parse_row_filter(delete_filter)
671677

672678
with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).delete() as delete_snapshot:
679+
if _isolation_level_property is not None:
680+
delete_snapshot._isolation_level_property = _isolation_level_property
673681
delete_snapshot.delete_by_predicate(delete_filter, case_sensitive)
674682

675683
# Check if there are any files that require an actual rewrite of a data file
@@ -725,6 +733,8 @@ def delete(
725733
with self.update_snapshot(
726734
snapshot_properties=snapshot_properties, branch=branch
727735
).overwrite() as overwrite_snapshot:
736+
if _isolation_level_property is not None:
737+
overwrite_snapshot._isolation_level_property = _isolation_level_property
728738
overwrite_snapshot.commit_uuid = commit_uuid
729739
overwrite_snapshot.delete_by_predicate(delete_filter, case_sensitive)
730740
for original_data_file, replaced_data_files in replaced_files:

pyiceberg/table/update/snapshot.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ def __init__(
141141
)
142142
self._predicate = AlwaysFalse()
143143
self._case_sensitive = True
144+
self._isolation_level_property: str = TableProperties.WRITE_DELETE_ISOLATION_LEVEL
144145

145146
def _validate_target_branch(self, branch: str | None) -> str | None:
146147
# if branch is none, write will be written into a staging snapshot
@@ -556,16 +557,17 @@ def _validate_concurrency(self) -> None:
556557
return
557558

558559
isolation_level_str = table.metadata.properties.get(
559-
TableProperties.WRITE_DELETE_ISOLATION_LEVEL, TableProperties.WRITE_ISOLATION_LEVEL_DEFAULT
560+
self._isolation_level_property, TableProperties.WRITE_ISOLATION_LEVEL_DEFAULT
560561
)
561562
isolation_level = IsolationLevel(isolation_level_str)
562563
conflict_detection_filter = self._predicate if self._predicate != AlwaysFalse() else None
563564

564565
if isolation_level == IsolationLevel.SERIALIZABLE:
565566
_validate_added_data_files(table, parent_snapshot, conflict_detection_filter, parent_snapshot)
566567

567-
_validate_no_new_delete_files(table, parent_snapshot, conflict_detection_filter, None, parent_snapshot)
568-
_validate_deleted_data_files(table, parent_snapshot, conflict_detection_filter, parent_snapshot)
568+
if conflict_detection_filter is not None:
569+
_validate_no_new_delete_files(table, parent_snapshot, conflict_detection_filter, None, parent_snapshot)
570+
_validate_deleted_data_files(table, parent_snapshot, conflict_detection_filter, parent_snapshot)
569571

570572
if self._deleted_data_files:
571573
_validate_no_new_deletes_for_data_files(
@@ -763,16 +765,17 @@ def _validate_concurrency(self) -> None:
763765
return
764766

765767
isolation_level_str = table.metadata.properties.get(
766-
TableProperties.WRITE_DELETE_ISOLATION_LEVEL, TableProperties.WRITE_ISOLATION_LEVEL_DEFAULT
768+
self._isolation_level_property, TableProperties.WRITE_ISOLATION_LEVEL_DEFAULT
767769
)
768770
isolation_level = IsolationLevel(isolation_level_str)
769771
conflict_detection_filter = self._predicate if self._predicate != AlwaysFalse() else None
770772

771773
if isolation_level == IsolationLevel.SERIALIZABLE:
772774
_validate_added_data_files(table, parent_snapshot, conflict_detection_filter, parent_snapshot)
773775

774-
_validate_no_new_delete_files(table, parent_snapshot, conflict_detection_filter, None, parent_snapshot)
775-
_validate_deleted_data_files(table, parent_snapshot, conflict_detection_filter, parent_snapshot)
776+
if conflict_detection_filter is not None:
777+
_validate_no_new_delete_files(table, parent_snapshot, conflict_detection_filter, None, parent_snapshot)
778+
_validate_deleted_data_files(table, parent_snapshot, conflict_detection_filter, parent_snapshot)
776779

777780
if self._deleted_data_files:
778781
_validate_no_new_deletes_for_data_files(

tests/table/test_commit_retry.py

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ def test_commit_retry_table_properties() -> None:
4747
def test_isolation_level_table_properties() -> None:
4848
assert TableProperties.WRITE_DELETE_ISOLATION_LEVEL == "write.delete.isolation-level"
4949
assert TableProperties.WRITE_UPDATE_ISOLATION_LEVEL == "write.update.isolation-level"
50-
assert TableProperties.WRITE_MERGE_ISOLATION_LEVEL == "write.merge.isolation-level"
5150
assert TableProperties.WRITE_ISOLATION_LEVEL_DEFAULT == "serializable"
5251

5352

@@ -484,3 +483,66 @@ def test_concurrent_partial_deletes_on_different_partitions_succeed(catalog: Cat
484483
result = refreshed.scan().to_arrow()
485484
# Original 4 rows, minus value==1 and value==3 = 2 rows remaining
486485
assert len(result) == 2
486+
487+
488+
def test_overwrite_uses_update_isolation_level(catalog: Catalog) -> None:
489+
"""Verify that overwrite() reads write.update.isolation-level, not write.delete.isolation-level."""
490+
catalog.create_namespace("default")
491+
schema = _test_schema()
492+
catalog.create_table(
493+
"default.update_iso_test",
494+
schema=schema,
495+
properties={
496+
"write.delete.isolation-level": "serializable",
497+
"write.update.isolation-level": "snapshot",
498+
},
499+
)
500+
501+
import pyarrow as pa
502+
503+
df = pa.table({"x": [1, 2, 3]})
504+
505+
tbl = catalog.load_table("default.update_iso_test")
506+
tbl.append(df)
507+
508+
tbl1 = catalog.load_table("default.update_iso_test")
509+
tbl2 = catalog.load_table("default.update_iso_test")
510+
511+
tbl1.append(df)
512+
513+
# Under write.delete.isolation-level=serializable this would raise ValidationException.
514+
# But overwrite() uses write.update.isolation-level=snapshot, so it succeeds.
515+
tbl2.overwrite(pa.table({"x": [10, 20, 30]}), overwrite_filter="x > 0")
516+
517+
refreshed = catalog.load_table("default.update_iso_test")
518+
result = refreshed.scan().to_arrow()
519+
# overwrite with x > 0 deletes all rows (including tbl1's append), then adds 3 new rows
520+
assert len(result) == 3
521+
522+
523+
def test_overwrite_with_serializable_update_isolation_raises(catalog: Catalog) -> None:
524+
"""Verify that overwrite() raises ValidationException when write.update.isolation-level=serializable."""
525+
catalog.create_namespace("default")
526+
schema = _test_schema()
527+
catalog.create_table(
528+
"default.update_serial_test",
529+
schema=schema,
530+
properties={
531+
"write.update.isolation-level": "serializable",
532+
},
533+
)
534+
535+
import pyarrow as pa
536+
537+
df = pa.table({"x": [1, 2, 3]})
538+
539+
tbl = catalog.load_table("default.update_serial_test")
540+
tbl.append(df)
541+
542+
tbl1 = catalog.load_table("default.update_serial_test")
543+
tbl2 = catalog.load_table("default.update_serial_test")
544+
545+
tbl1.append(df)
546+
547+
with pytest.raises(ValidationException):
548+
tbl2.overwrite(pa.table({"x": [10, 20, 30]}), overwrite_filter="x > 0")

0 commit comments

Comments
 (0)