Skip to content

Commit 5b4a49a

Browse files
Drop Catalog.replace_table shortcut, keep replace_table_transaction only
Mirrors Java, which exposes newReplaceTableTransaction only and has no top-level Catalog.replaceTable. Callers go through the transaction explicitly and follow up with load_table() when a fresh handle is needed, sidestepping the StagedTable return-type wart on the shortcut. Also collapse the docs section to the single RTAS-flow example.
1 parent 739104e commit 5b4a49a

5 files changed

Lines changed: 35 additions & 104 deletions

File tree

mkdocs/docs/api.md

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -187,36 +187,18 @@ with catalog.create_table_transaction(identifier="docs_example.bids", schema=sch
187187

188188
## Replace a table
189189

190-
Atomically replace an existing table's schema, partition spec, sort order, location, and properties. The table UUID and history (snapshots, schemas, specs, sort orders, metadata log) are preserved; the current snapshot is cleared (the `main` branch ref is removed). `replace_table` redefines the table in this way; `replace_table_transaction` lets you write new data alongside this change to permit RTAS (replace-table-as-select) workflows.
191-
192-
```python
193-
from pyiceberg.schema import Schema
194-
from pyiceberg.types import NestedField, LongType, StringType, BooleanType
195-
196-
new_schema = Schema(
197-
NestedField(field_id=1, name="datetime", field_type=LongType(), required=False),
198-
NestedField(field_id=2, name="symbol", field_type=StringType(), required=False),
199-
NestedField(field_id=3, name="active", field_type=BooleanType(), required=False),
200-
)
201-
catalog.replace_table(identifier="docs_example.bids", schema=new_schema)
202-
```
203-
204-
Field IDs are reused by name from the previous schema; new columns get fresh IDs above `last-column-id`.
205-
206-
Unlike the other fields, table properties are *merged* on replace: properties you don't pass are preserved on the table. To remove a property as part of the replace, use `replace_table_transaction` and drop it explicitly within the transaction.
207-
208-
Use `replace_table_transaction` to stage additional changes (writes, property updates, schema evolution) before committing — for example, swap the schema and write new data atomically:
190+
Atomically replace an existing table's schema, partition spec, sort order, location, and properties via `replace_table_transaction`. The table UUID and history (snapshots, schemas, specs, sort orders, metadata log) are preserved; the current snapshot is cleared (the `main` branch ref is removed). Open the transaction with the new definition, stage any additional changes (writes, property updates, schema evolution), and commit — for example, an RTAS (replace-table-as-select) that swaps the schema and writes the new data atomically:
209191

210192
```python
211193
with catalog.replace_table_transaction(identifier="docs_example.bids", schema=df.schema) as txn:
212194
txn.append(df)
213195
```
214196

215-
To upgrade the table's format version as part of the replace, pass `format-version` in `properties`:
197+
Field IDs are reused by name from the previous schema; new columns get fresh IDs above `last-column-id`.
216198

217-
```python
218-
catalog.replace_table(identifier="docs_example.bids", schema=new_schema, properties={"format-version": "2"})
219-
```
199+
Table properties are *merged* on replace: properties you don't pass are preserved on the table. To remove a property, drop it explicitly within the transaction.
200+
201+
Pass `format-version` in `properties` to upgrade the table's format version as part of the replace.
220202

221203
## Register a table
222204

pyiceberg/catalog/__init__.py

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -449,41 +449,6 @@ def create_table_if_not_exists(
449449
except TableAlreadyExistsError:
450450
return self.load_table(identifier)
451451

452-
def replace_table(
453-
self,
454-
identifier: str | Identifier,
455-
schema: Schema | pa.Schema,
456-
location: str | None = None,
457-
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
458-
sort_order: SortOrder = UNSORTED_SORT_ORDER,
459-
properties: Properties = EMPTY_DICT,
460-
) -> Table:
461-
"""Atomically replace a table's schema, spec, sort order, location, and properties.
462-
463-
The table UUID and history (snapshots, schemas, specs, sort orders) are preserved.
464-
The current snapshot is cleared (main branch ref is removed).
465-
466-
Args:
467-
identifier (str | Identifier): Table identifier.
468-
schema (Schema): New table schema.
469-
location (str | None): New table location. Defaults to the existing location.
470-
partition_spec (PartitionSpec): New partition spec.
471-
sort_order (SortOrder): New sort order.
472-
properties (Properties): Properties to apply. Merged on top of the existing
473-
table properties: keys present here override existing values; existing keys
474-
not present here are preserved. To remove a property, follow up with a
475-
transaction that removes it explicitly.
476-
477-
Returns:
478-
Table: the replaced table instance.
479-
480-
Raises:
481-
NoSuchTableError: If the table does not exist.
482-
"""
483-
return self.replace_table_transaction(
484-
identifier, schema, location, partition_spec, sort_order, properties
485-
).commit_transaction()
486-
487452
@abstractmethod
488453
def replace_table_transaction(
489454
self,

tests/catalog/test_catalog_behaviors.py

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,7 @@ def test_replace_transaction(catalog: Catalog, test_table_identifier: Identifier
432432
snapshot_log_before = list(original.metadata.snapshot_log)
433433
assert len(snapshot_log_before) == 1
434434

435-
catalog.replace_table(test_table_identifier, schema=_REPLACE_SCHEMA)
435+
catalog.replace_table_transaction(test_table_identifier, schema=_REPLACE_SCHEMA).commit_transaction()
436436
replaced = catalog.load_table(test_table_identifier)
437437

438438
# UUID + history preserved, current snapshot cleared, current schema swapped.
@@ -518,12 +518,13 @@ def test_complete_replace_transaction(catalog: Catalog, test_table_identifier: I
518518
def test_replace_transaction_requires_table_exists(catalog: Catalog, test_table_identifier: Identifier) -> None:
519519
schema = Schema(NestedField(field_id=1, name="id", field_type=LongType(), required=False))
520520
with pytest.raises(NoSuchTableError):
521-
catalog.replace_table(test_table_identifier, schema=schema)
521+
catalog.replace_table_transaction(test_table_identifier, schema=schema)
522522

523523

524524
def test_replace_table_reuses_schema_id_when_identical(catalog: Catalog, test_table_identifier: Identifier) -> None:
525525
_, base_schema = _create_simple_table(catalog, test_table_identifier)
526-
replaced = catalog.replace_table(test_table_identifier, schema=base_schema)
526+
catalog.replace_table_transaction(test_table_identifier, schema=base_schema).commit_transaction()
527+
replaced = catalog.load_table(test_table_identifier)
527528
# Identical shape -> no new schema appended, current points back at id 0.
528529
assert [s.schema_id for s in replaced.metadata.schemas] == [0]
529530
assert replaced.metadata.current_schema_id == 0
@@ -537,17 +538,24 @@ def test_replace_table_reuses_partition_spec_and_sort_order_when_identical(
537538
sort = SortOrder(SortField(source_id=1, transform=IdentityTransform(), direction=SortDirection.ASC))
538539
_, schema = _create_simple_table(catalog, test_table_identifier, partition_spec=spec)
539540
# Introduce a sort order then replay both spec and sort — neither should append a new entry.
540-
sorted_first = catalog.replace_table(test_table_identifier, schema=schema, partition_spec=spec, sort_order=sort)
541+
catalog.replace_table_transaction(
542+
test_table_identifier, schema=schema, partition_spec=spec, sort_order=sort
543+
).commit_transaction()
544+
sorted_first = catalog.load_table(test_table_identifier)
541545
sorted_order_id = sorted_first.metadata.default_sort_order_id
542546
assert sorted_order_id != 0
543547

544-
replayed = catalog.replace_table(test_table_identifier, schema=schema, partition_spec=spec, sort_order=sort)
548+
catalog.replace_table_transaction(
549+
test_table_identifier, schema=schema, partition_spec=spec, sort_order=sort
550+
).commit_transaction()
551+
replayed = catalog.load_table(test_table_identifier)
545552
assert [s.spec_id for s in replayed.metadata.partition_specs] == [0]
546553
assert replayed.metadata.default_spec_id == 0
547554
assert replayed.metadata.default_sort_order_id == sorted_order_id
548555

549556
# Dropping the sort order falls back to the unsorted order_id 0 (also reused, not appended).
550-
unsorted = catalog.replace_table(test_table_identifier, schema=schema, partition_spec=spec)
557+
catalog.replace_table_transaction(test_table_identifier, schema=schema, partition_spec=spec).commit_transaction()
558+
unsorted = catalog.load_table(test_table_identifier)
551559
assert unsorted.sort_order().is_unsorted
552560
assert unsorted.metadata.default_sort_order_id == 0
553561

@@ -573,7 +581,8 @@ def test_replace_table_identifier_field_ids(catalog: Catalog, test_table_identif
573581
NestedField(field_id=2, name="data", field_type=StringType(), required=False),
574582
)
575583
)
576-
replaced = catalog.replace_table(test_table_identifier, schema=new_schema)
584+
catalog.replace_table_transaction(test_table_identifier, schema=new_schema).commit_transaction()
585+
replaced = catalog.load_table(test_table_identifier)
577586
expected = [1] if keep_identifier else []
578587
assert list(replaced.schema().identifier_field_ids) == expected
579588

@@ -591,7 +600,8 @@ def test_replace_table_partition_field_carry_forward(
591600
) -> None:
592601
spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, name="id_part", transform=IdentityTransform()))
593602
_, schema = _create_simple_table(catalog, test_table_identifier, partition_spec=spec, format_version=format_version)
594-
replaced = catalog.replace_table(test_table_identifier, schema=schema)
603+
catalog.replace_table_transaction(test_table_identifier, schema=schema).commit_transaction()
604+
replaced = catalog.load_table(test_table_identifier)
595605
new_spec = replaced.spec()
596606
if expect_void_carry_forward:
597607
void_field = next(f for f in new_spec.fields if f.field_id == 1000)
@@ -606,22 +616,26 @@ def test_replace_table_upgrades_format_version(catalog: Catalog, test_table_iden
606616
_, schema = _create_simple_table(catalog, test_table_identifier, format_version=1)
607617
assert catalog.load_table(test_table_identifier).format_version == 1
608618

609-
upgraded = catalog.replace_table(test_table_identifier, schema=schema, properties={"format-version": "2"})
619+
catalog.replace_table_transaction(
620+
test_table_identifier, schema=schema, properties={"format-version": "2"}
621+
).commit_transaction()
622+
upgraded = catalog.load_table(test_table_identifier)
610623
assert upgraded.format_version == 2
611624
# `format-version` is a control input, not a persisted property.
612625
assert "format-version" not in upgraded.properties
613626

614627
# A follow-up replace stays at the upgraded version.
615628
new_schema = Schema(*schema.fields, NestedField(field_id=3, name="extra", field_type=BooleanType(), required=False))
616-
replayed = catalog.replace_table(test_table_identifier, schema=new_schema)
629+
catalog.replace_table_transaction(test_table_identifier, schema=new_schema).commit_transaction()
630+
replayed = catalog.load_table(test_table_identifier)
617631
assert replayed.format_version == 2
618632
assert {f.name for f in replayed.schema().fields} == {"id", "data", "extra"}
619633

620634

621635
def test_replace_table_rejects_format_version_downgrade(catalog: Catalog, test_table_identifier: Identifier) -> None:
622636
_, schema = _create_simple_table(catalog, test_table_identifier, format_version=2)
623637
with pytest.raises(ValueError, match="Cannot downgrade format-version"):
624-
catalog.replace_table(test_table_identifier, schema=schema, properties={"format-version": "1"})
638+
catalog.replace_table_transaction(test_table_identifier, schema=schema, properties={"format-version": "1"})
625639

626640

627641
@pytest.mark.parametrize("location_kind", ["inherit", "explicit", "trailing-slash"])
@@ -630,12 +644,14 @@ def test_replace_table_location(catalog: Catalog, test_table_identifier: Identif
630644
existing = catalog.load_table(test_table_identifier).metadata.location
631645

632646
if location_kind == "inherit":
633-
replaced = catalog.replace_table(test_table_identifier, schema=schema)
647+
catalog.replace_table_transaction(test_table_identifier, schema=schema).commit_transaction()
648+
replaced = catalog.load_table(test_table_identifier)
634649
assert replaced.metadata.location == existing
635650
else:
636651
bare = f"file://{tmp_path}/relocated"
637652
arg = bare + "/" if location_kind == "trailing-slash" else bare
638-
replaced = catalog.replace_table(test_table_identifier, schema=schema, location=arg)
653+
catalog.replace_table_transaction(test_table_identifier, schema=schema, location=arg).commit_transaction()
654+
replaced = catalog.load_table(test_table_identifier)
639655
assert replaced.metadata.location == bare
640656

641657

tests/catalog/test_rest.py

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import base64
2121
import os
22-
import uuid
2322
from collections.abc import Callable
2423
from typing import Any, cast
2524
from unittest import mock
@@ -2981,34 +2980,3 @@ def test_replace_table_transaction_404_raises(
29812980
identifier=("fokko", "missing"),
29822981
schema=Schema(NestedField(field_id=1, name="id", field_type=IntegerType(), required=False)),
29832982
)
2984-
2985-
2986-
def test_replace_table_issues_commit_post_immediately(
2987-
rest_mock: Mocker,
2988-
example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any],
2989-
example_table_metadata_no_snapshot_v1_rest_json: dict[str, Any],
2990-
) -> None:
2991-
"""`replace_table` commits during the call; `replace_table_transaction` defers the POST until the caller commits."""
2992-
table_uuid = example_table_metadata_with_snapshot_v1_rest_json["metadata"]["table-uuid"]
2993-
example_table_metadata_no_snapshot_v1_rest_json["metadata"]["table-uuid"] = table_uuid
2994-
_mock_replace_endpoints(
2995-
rest_mock,
2996-
"fokko",
2997-
"fokko2",
2998-
example_table_metadata_with_snapshot_v1_rest_json,
2999-
example_table_metadata_no_snapshot_v1_rest_json,
3000-
)
3001-
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
3002-
new_schema = Schema(
3003-
NestedField(field_id=1, name="id", field_type=IntegerType(), required=False),
3004-
NestedField(field_id=2, name="data", field_type=StringType(), required=False),
3005-
)
3006-
3007-
catalog.replace_table_transaction(identifier=("fokko", "fokko2"), schema=new_schema)
3008-
methods_after_open = [r.method for r in rest_mock.request_history]
3009-
assert "POST" not in methods_after_open
3010-
3011-
replaced = catalog.replace_table(identifier=("fokko", "fokko2"), schema=new_schema)
3012-
assert replaced.metadata.table_uuid == uuid.UUID(table_uuid)
3013-
methods_after_replace = [r.method for r in rest_mock.request_history]
3014-
assert "POST" in methods_after_replace, "replace_table must commit immediately"

tests/integration/test_catalog.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -902,7 +902,7 @@ def test_replace_table(test_catalog: Catalog, database_name: str, table_name: st
902902
NestedField(field_id=2, name="name", field_type=StringType(), required=False),
903903
NestedField(field_id=3, name="active", field_type=BooleanType(), required=False),
904904
)
905-
test_catalog.replace_table(identifier, schema=new_schema)
905+
test_catalog.replace_table_transaction(identifier, schema=new_schema).commit_transaction()
906906
replaced = test_catalog.load_table(identifier)
907907

908908
assert replaced.metadata.table_uuid == original.metadata.table_uuid

0 commit comments

Comments
 (0)