4141from pyiceberg .io .pyarrow import _dataframe_to_data_files , schema_to_pyarrow
4242from pyiceberg .partitioning import UNPARTITIONED_PARTITION_SPEC , PartitionField , PartitionSpec
4343from pyiceberg .schema import Schema
44- from pyiceberg .table import TableProperties
44+ from pyiceberg .table import Table , TableProperties
4545from pyiceberg .table .snapshots import Operation
4646from pyiceberg .table .sorting import NullOrder , SortDirection , SortField , SortOrder
4747from pyiceberg .table .update import AddSchemaUpdate , SetCurrentSchemaUpdate
@@ -450,10 +450,17 @@ def test_replace_transaction(catalog: Catalog, test_table_identifier: Identifier
450450 assert replaced .scan (snapshot_id = old_snapshot_id ).to_arrow ().equals (_simple_data ())
451451
452452
453- def test_complete_replace_transaction (catalog : Catalog , test_table_identifier : Identifier , tmp_path : Path ) -> None :
454- _create_simple_table (catalog , test_table_identifier , properties = {"keep" : "yes" , "override" : "old" })
455- catalog .load_table (test_table_identifier ).append (_simple_data ())
456- original = catalog .load_table (test_table_identifier )
453+ def _run_complete_replace (
454+ catalog : Catalog , identifier : Identifier , tmp_path : Path
455+ ) -> tuple [Table , Table , SortOrder , pa .Table , int ]:
456+ """Set up a table, run a full-six-args RTAS replace, and return the handles needed for assertions.
457+
458+ Returns:
459+ (original, replaced, new_sort, original_data, old_snapshot_id)
460+ """
461+ _create_simple_table (catalog , identifier , properties = {"keep" : "yes" , "override" : "old" })
462+ catalog .load_table (identifier ).append (_simple_data ())
463+ original = catalog .load_table (identifier )
457464 old_snapshot_id = original .current_snapshot ().snapshot_id # type: ignore[union-attr]
458465 original_data = original .scan ().to_arrow ()
459466
@@ -471,7 +478,7 @@ def test_complete_replace_transaction(catalog: Catalog, test_table_identifier: I
471478 )
472479
473480 with catalog .replace_table_transaction (
474- test_table_identifier ,
481+ identifier ,
475482 schema = new_schema ,
476483 partition_spec = new_spec ,
477484 sort_order = new_sort ,
@@ -480,12 +487,16 @@ def test_complete_replace_transaction(catalog: Catalog, test_table_identifier: I
480487 ) as txn :
481488 txn .append (new_data )
482489
483- replaced = catalog .load_table (test_table_identifier )
490+ return original , catalog .load_table (identifier ), new_sort , original_data , old_snapshot_id
491+
484492
493+ def test_complete_replace_transaction_applies_new_schema_spec_and_sort (
494+ catalog : Catalog , test_table_identifier : Identifier , tmp_path : Path
495+ ) -> None :
496+ original , replaced , new_sort , _ , _ = _run_complete_replace (catalog , test_table_identifier , tmp_path )
485497 # Identity invariants.
486498 assert replaced .metadata .table_uuid == original .metadata .table_uuid
487- assert replaced .metadata .location == new_location
488-
499+ assert replaced .metadata .location == f"file://{ tmp_path } /replaced"
489500 # New schema / spec / sort applied; old entries retained in history.
490501 assert {f .name for f in replaced .schema ().fields } == {"id" , "data" , "extra" }
491502 assert sorted (s .schema_id for s in replaced .metadata .schemas ) == [0 , 1 ]
@@ -495,14 +506,23 @@ def test_complete_replace_transaction(catalog: Catalog, test_table_identifier: I
495506 assert replaced .sort_order ().fields == new_sort .fields
496507 assert {s .order_id for s in replaced .metadata .sort_orders } == {0 , replaced .metadata .default_sort_order_id }
497508
498- # Property merge: kept, overridden, added — and `format-version` does not leak.
509+
510+ def test_complete_replace_transaction_merges_properties (
511+ catalog : Catalog , test_table_identifier : Identifier , tmp_path : Path
512+ ) -> None :
513+ _ , replaced , _ , _ , _ = _run_complete_replace (catalog , test_table_identifier , tmp_path )
514+ # `keep` is preserved, `override` is updated, `added` is new, and `format-version` does not leak.
499515 assert replaced .properties ["keep" ] == "yes"
500516 assert replaced .properties ["override" ] == "new"
501517 assert replaced .properties ["added" ] == "v"
502518 assert "format-version" not in replaced .properties
503519
504- # RTAS atomicity: new snapshot exists, has no parent (fresh start), old snapshot is still
505- # in the snapshot list, and time-travel reads return the original rows.
520+
521+ def test_complete_replace_transaction_rtas_preserves_old_snapshot (
522+ catalog : Catalog , test_table_identifier : Identifier , tmp_path : Path
523+ ) -> None :
524+ _ , replaced , _ , original_data , old_snapshot_id = _run_complete_replace (catalog , test_table_identifier , tmp_path )
525+ # New snapshot exists, has no parent (fresh start), old snapshot is still in the snapshot list.
506526 new_snapshot = replaced .current_snapshot ()
507527 assert new_snapshot is not None
508528 assert new_snapshot .snapshot_id != old_snapshot_id
0 commit comments