Skip to content

Commit b4d76c1

Browse files
Address review feedback (round 2)
- Drop view-collision pre-flight check; deferring to a follow-up PR that ports the check across create/replace/rename in one pass. - Simplify RTAS tests + docs to use txn.append(df) instead of the verbose update_snapshot().fast_append() + _dataframe_to_data_files pattern. - Drop parallel new_schema construction in RTAS tests; pass df.schema directly (replace_table* accepts Schema | pa.Schema). - Reword docs intro per reviewer suggestion. - Run formatter / linter; trailing whitespace and trailing newline fix. - mypy: cast format_version to TableVersion; extract inner with into helper to silence unreachable warning under pytest.raises.
1 parent fd7e11f commit b4d76c1

8 files changed

Lines changed: 38 additions & 149 deletions

File tree

mkdocs/docs/api.md

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -187,41 +187,28 @@ with catalog.create_table_transaction(identifier="docs_example.bids", schema=sch
187187

188188
## Replace a table
189189

190-
Atomically replace an existing table's schema, partition spec, sort order, location, and properties. The table UUID and history (snapshots, schemas, specs, sort orders, metadata log) are preserved; the current snapshot is cleared (the `main` branch ref is removed). Use this when you want to redefine the table's metadata; pair it with `replace_table_transaction` to atomically write new data alongside the metadata change (RTAS-style).
190+
Atomically replace an existing table's schema, partition spec, sort order, location, and properties. The table UUID and history (snapshots, schemas, specs, sort orders, metadata log) are preserved; the current snapshot is cleared (the `main` branch ref is removed). `replace_table` redefines the table in this way; `replace_table_transaction` lets you write new data alongside this change to permit RTAS (replace-table-as-select) workflows.
191191

192192
```python
193-
from pyiceberg.schema import Schema
194-
from pyiceberg.types import NestedField, LongType, StringType, BooleanType
195-
196-
new_schema = Schema(
197-
NestedField(field_id=1, name="datetime", field_type=LongType(), required=False),
198-
NestedField(field_id=2, name="symbol", field_type=StringType(), required=False),
199-
NestedField(field_id=3, name="active", field_type=BooleanType(), required=False),
200-
)
201-
catalog.replace_table(
202-
identifier="docs_example.bids",
203-
schema=new_schema,
204-
)
193+
catalog.replace_table(identifier="docs_example.bids", schema=df.schema)
205194
```
206195

207-
Field IDs from columns whose names appear in the previous schema are reused, so existing data files remain readable when the new schema is a compatible superset. New columns get fresh IDs above `last-column-id`.
196+
Where `df` is a PyArrow table (or `Schema`) carrying the new column set. Field IDs from columns whose names appear in the previous schema are reused, so existing data files remain readable when the new schema is a compatible superset. New columns get fresh IDs above `last-column-id`.
208197

209198
Properties passed to `replace_table` are **merged** with the existing table properties (your values override; existing keys you don't pass are preserved). To remove a property as part of the replace, use `replace_table_transaction` and remove it explicitly within the transaction.
210199

211200
Use `replace_table_transaction` to stage additional changes (writes, property updates, schema evolution) before committing — for example, swap the schema and write new data atomically:
212201

213202
```python
214-
with catalog.replace_table_transaction(identifier="docs_example.bids", schema=new_schema) as txn:
215-
with txn.update_snapshot().fast_append() as snap:
216-
for data_file in _dataframe_to_data_files(table_metadata=txn.table_metadata, df=df, io=txn._table.io):
217-
snap.append_data_file(data_file)
203+
with catalog.replace_table_transaction(identifier="docs_example.bids", schema=df.schema) as txn:
204+
txn.append(df)
218205
txn.set_properties(write_replaced_at="2026-04-19T00:00:00Z")
219206
```
220207

221208
To upgrade the table's format version as part of the replace, pass `format-version` in `properties`:
222209

223210
```python
224-
catalog.replace_table(identifier="docs_example.bids", schema=new_schema, properties={"format-version": "2"})
211+
catalog.replace_table(identifier="docs_example.bids", schema=df.schema, properties={"format-version": "2"})
225212
```
226213

227214
## Register a table

pyiceberg/catalog/__init__.py

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -328,20 +328,6 @@ def delete_data_files(io: FileIO, manifests_to_delete: list[ManifestFile]) -> No
328328
deleted_files[path] = True
329329

330330

331-
def _raise_if_view_exists(catalog: Catalog, identifier: str | Identifier) -> None:
332-
"""Raise TableAlreadyExistsError if a view exists at the same identifier.
333-
334-
Catalogs that don't support views raise `NotImplementedError` from `view_exists` —
335-
treat as "no view" in that case.
336-
"""
337-
try:
338-
view_collision = catalog.view_exists(identifier)
339-
except NotImplementedError:
340-
view_collision = False
341-
if view_collision:
342-
raise TableAlreadyExistsError(f"View with same name already exists: {identifier}")
343-
344-
345331
def _import_catalog(name: str, catalog_impl: str, properties: Properties) -> Catalog | None:
346332
try:
347333
path_parts = catalog_impl.split(".")
@@ -564,7 +550,7 @@ def _replace_staged_table(
564550
resolved_format_version = (
565551
int(requested_format_version) if requested_format_version is not None else existing_metadata.format_version
566552
)
567-
iceberg_schema = self._convert_schema_if_needed(schema, resolved_format_version)
553+
iceberg_schema = self._convert_schema_if_needed(schema, cast(TableVersion, resolved_format_version))
568554

569555
fresh_schema, _ = assign_fresh_schema_ids_for_replace(
570556
iceberg_schema, existing_metadata.schema(), existing_metadata.last_column_id
@@ -1083,7 +1069,6 @@ def replace_table_transaction(
10831069
sort_order: SortOrder = UNSORTED_SORT_ORDER,
10841070
properties: Properties = EMPTY_DICT,
10851071
) -> ReplaceTableTransaction:
1086-
_raise_if_view_exists(self, identifier)
10871072
staged_table, fresh_schema, fresh_spec, fresh_sort_order, resolved_location = self._replace_staged_table(
10881073
identifier, schema, location, partition_spec, sort_order, properties
10891074
)

pyiceberg/catalog/rest/__init__.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,7 @@
3030
from typing_extensions import override
3131

3232
from pyiceberg import __version__
33-
from pyiceberg.catalog import (
34-
BOTOCORE_SESSION,
35-
TOKEN,
36-
URI,
37-
WAREHOUSE_LOCATION,
38-
Catalog,
39-
PropertiesUpdateSummary,
40-
_raise_if_view_exists,
41-
)
33+
from pyiceberg.catalog import BOTOCORE_SESSION, TOKEN, URI, WAREHOUSE_LOCATION, Catalog, PropertiesUpdateSummary
4234
from pyiceberg.catalog.rest.auth import AUTH_MANAGER, AuthManager, AuthManagerAdapter, AuthManagerFactory, LegacyOAuth2AuthManager
4335
from pyiceberg.catalog.rest.response import _handle_non_200_response
4436
from pyiceberg.catalog.rest.scan_planning import (
@@ -977,7 +969,6 @@ def replace_table_transaction(
977969
sort_order: SortOrder = UNSORTED_SORT_ORDER,
978970
properties: Properties = EMPTY_DICT,
979971
) -> ReplaceTableTransaction:
980-
_raise_if_view_exists(self, identifier)
981972
staged_table, fresh_schema, fresh_spec, fresh_sort_order, resolved_location = self._replace_staged_table(
982973
identifier, schema, location, partition_spec, sort_order, properties
983974
)

tests/catalog/test_catalog_behaviors.py

Lines changed: 15 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -419,9 +419,7 @@ def _simple_data(num_rows: int = 2) -> pa.Table:
419419
)
420420

421421

422-
def test_replace_table_preserves_uuid_and_clears_current_snapshot(
423-
catalog: Catalog, test_table_identifier: Identifier
424-
) -> None:
422+
def test_replace_table_preserves_uuid_and_clears_current_snapshot(catalog: Catalog, test_table_identifier: Identifier) -> None:
425423
"""Replace preserves table_uuid and snapshot history, but clears the main ref."""
426424
_create_simple_table(catalog, test_table_identifier)
427425
original = catalog.load_table(test_table_identifier)
@@ -553,9 +551,7 @@ def test_replace_table_merges_properties_with_overrides_and_additions(
553551
"""Properties are merged onto existing: new values override, untouched keys are preserved."""
554552
schema = Schema(NestedField(field_id=1, name="id", field_type=LongType(), required=False))
555553
_create_simple_table(catalog, test_table_identifier, schema=schema, properties={"keep": "yes", "override": "old"})
556-
replaced = catalog.replace_table(
557-
test_table_identifier, schema=schema, properties={"override": "new", "new_key": "v"}
558-
)
554+
replaced = catalog.replace_table(test_table_identifier, schema=schema, properties={"override": "new", "new_key": "v"})
559555
assert replaced.properties["keep"] == "yes"
560556
assert replaced.properties["override"] == "new"
561557
assert replaced.properties["new_key"] == "v"
@@ -569,19 +565,15 @@ def test_replace_table_upgrades_format_version(catalog: Catalog, test_table_iden
569565
assert replaced.format_version == 2
570566

571567

572-
def test_replace_table_rejects_format_version_downgrade(
573-
catalog: Catalog, test_table_identifier: Identifier
574-
) -> None:
568+
def test_replace_table_rejects_format_version_downgrade(catalog: Catalog, test_table_identifier: Identifier) -> None:
575569
"""A `format-version` lower than the existing one must be rejected to avoid silently
576570
running the schema-conversion path with the wrong semantics."""
577571
_, schema = _create_simple_table(catalog, test_table_identifier, format_version=2)
578572
with pytest.raises(ValueError, match="Cannot downgrade format-version"):
579573
catalog.replace_table(test_table_identifier, schema=schema, properties={"format-version": "1"})
580574

581575

582-
def test_replace_table_v1_carries_forward_partition_fields_as_void(
583-
catalog: Catalog, test_table_identifier: Identifier
584-
) -> None:
576+
def test_replace_table_v1_carries_forward_partition_fields_as_void(catalog: Catalog, test_table_identifier: Identifier) -> None:
585577
"""v1 specs are append-only; dropped partition fields must be carried forward as VoidTransform."""
586578
spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, name="id_part", transform=IdentityTransform()))
587579
_, schema = _create_simple_table(catalog, test_table_identifier, partition_spec=spec, format_version=1)
@@ -602,9 +594,7 @@ def test_replace_table_raises_when_table_does_not_exist(catalog: Catalog, test_t
602594
catalog.replace_table(test_table_identifier, schema=schema)
603595

604596

605-
def test_replace_table_transaction_can_stage_additional_changes(
606-
catalog: Catalog, test_table_identifier: Identifier
607-
) -> None:
597+
def test_replace_table_transaction_can_stage_additional_changes(catalog: Catalog, test_table_identifier: Identifier) -> None:
608598
"""The transaction context lets callers stage extra updates (e.g. properties) before commit."""
609599
_, schema = _create_simple_table(catalog, test_table_identifier)
610600
with catalog.replace_table_transaction(test_table_identifier, schema=schema) as txn:
@@ -613,9 +603,7 @@ def test_replace_table_transaction_can_stage_additional_changes(
613603
assert replaced.properties.get("staged") == "yes"
614604

615605

616-
def test_replace_table_transaction_with_write_atomic_rtas(
617-
catalog: Catalog, test_table_identifier: Identifier
618-
) -> None:
606+
def test_replace_table_transaction_with_write_atomic_rtas(catalog: Catalog, test_table_identifier: Identifier) -> None:
619607
"""RTAS (Replace Table As Select): replace + write new data in one transaction.
620608
621609
The new schema and new data must land atomically. After commit:
@@ -626,18 +614,12 @@ def test_replace_table_transaction_with_write_atomic_rtas(
626614
catalog.load_table(test_table_identifier).append(_simple_data(num_rows=1))
627615
old_snapshot_id = catalog.load_table(test_table_identifier).current_snapshot().snapshot_id # type: ignore[union-attr]
628616

629-
new_schema = Schema(
630-
NestedField(field_id=1, name="id", field_type=LongType(), required=False),
631-
NestedField(field_id=2, name="name", field_type=StringType(), required=False),
632-
)
633617
new_data = pa.Table.from_pydict(
634618
{"id": [10, 20], "name": ["alice", "bob"]},
635619
schema=pa.schema([pa.field("id", pa.int64()), pa.field("name", pa.large_string())]),
636620
)
637-
with catalog.replace_table_transaction(test_table_identifier, schema=new_schema) as txn:
638-
with txn.update_snapshot().fast_append() as snap:
639-
for data_file in _dataframe_to_data_files(table_metadata=txn.table_metadata, df=new_data, io=txn._table.io):
640-
snap.append_data_file(data_file)
621+
with catalog.replace_table_transaction(test_table_identifier, schema=new_data.schema) as txn:
622+
txn.append(new_data)
641623

642624
replaced = catalog.load_table(test_table_identifier)
643625
new_snapshot = replaced.current_snapshot()
@@ -649,9 +631,7 @@ def test_replace_table_transaction_with_write_atomic_rtas(
649631
assert replaced.scan().to_arrow().num_rows == 2
650632

651633

652-
def test_replace_table_transaction_rolls_back_on_failure(
653-
catalog: Catalog, test_table_identifier: Identifier
654-
) -> None:
634+
def test_replace_table_transaction_rolls_back_on_failure(catalog: Catalog, test_table_identifier: Identifier) -> None:
655635
"""If the body of the transaction raises, no metadata change is committed.
656636
657637
The table's UUID, schemas, current snapshot, and current schema id must all be unchanged."""
@@ -664,10 +644,14 @@ def test_replace_table_transaction_rolls_back_on_failure(
664644
NestedField(field_id=2, name="data", field_type=StringType(), required=False),
665645
NestedField(field_id=3, name="extra", field_type=BooleanType(), required=False),
666646
)
667-
with pytest.raises(RuntimeError, match="simulated failure inside replace transaction"):
647+
648+
def run_failing_replace() -> None:
668649
with catalog.replace_table_transaction(test_table_identifier, schema=new_schema):
669650
raise RuntimeError("simulated failure inside replace transaction")
670651

652+
with pytest.raises(RuntimeError, match="simulated failure inside replace transaction"):
653+
run_failing_replace()
654+
671655
after = catalog.load_table(test_table_identifier).metadata
672656
assert after.table_uuid == before.table_uuid
673657
assert after.current_snapshot_id == before.current_snapshot_id
@@ -696,9 +680,7 @@ def test_concurrent_replace_table(catalog: Catalog, test_table_identifier: Ident
696680
txn_b.commit_transaction()
697681

698682

699-
def test_replace_table_allows_subsequent_append(
700-
catalog: Catalog, test_table_identifier: Identifier
701-
) -> None:
683+
def test_replace_table_allows_subsequent_append(catalog: Catalog, test_table_identifier: Identifier) -> None:
702684
"""After `replace_table` clears the current snapshot, a separate `append` produces a new
703685
snapshot containing only the post-replace data — the pre-replace rows are not visible."""
704686
_, schema = _create_simple_table(catalog, test_table_identifier)

tests/catalog/test_rest.py

Lines changed: 1 addition & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2906,7 +2906,7 @@ def test_load_table_without_storage_credentials(
29062906
# Catalog-agnostic behavior (UUID preservation, snapshot clearing, schema/spec/sort-order reuse,
29072907
# format-version upgrade, etc.) is covered in tests/catalog/test_catalog_behaviors.py against
29082908
# both InMemoryCatalog and SqlCatalog. The tests below cover REST-specific concerns: the wire
2909-
# payload sent to the server, 404 handling, and the view-collision pre-flight check.
2909+
# payload sent to the server and 404 handling.
29102910

29112911

29122912
def _mock_replace_endpoints(
@@ -2915,13 +2915,7 @@ def _mock_replace_endpoints(
29152915
table: str,
29162916
load_response: dict[str, Any],
29172917
commit_response: dict[str, Any],
2918-
view_exists: bool = False,
29192918
) -> None:
2920-
rest_mock.head(
2921-
f"{TEST_URI}v1/namespaces/{namespace}/views/{table}",
2922-
status_code=204 if view_exists else 404,
2923-
request_headers=TEST_HEADERS,
2924-
)
29252919
rest_mock.get(
29262920
f"{TEST_URI}v1/namespaces/{namespace}/tables/{table}",
29272921
json=load_response,
@@ -3009,28 +3003,6 @@ def test_replace_table_transaction_404_raises(
30093003
)
30103004

30113005

3012-
def test_replace_table_transaction_rejects_view_collision(
3013-
rest_mock: Mocker,
3014-
example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any],
3015-
example_table_metadata_no_snapshot_v1_rest_json: dict[str, Any],
3016-
) -> None:
3017-
"""A view at the same identifier must fail replace."""
3018-
_mock_replace_endpoints(
3019-
rest_mock,
3020-
"fokko",
3021-
"fokko2",
3022-
example_table_metadata_with_snapshot_v1_rest_json,
3023-
example_table_metadata_no_snapshot_v1_rest_json,
3024-
view_exists=True,
3025-
)
3026-
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
3027-
with pytest.raises(TableAlreadyExistsError, match="View with same name already exists"):
3028-
catalog.replace_table_transaction(
3029-
identifier=("fokko", "fokko2"),
3030-
schema=Schema(NestedField(field_id=1, name="id", field_type=IntegerType(), required=False)),
3031-
)
3032-
3033-
30343006
def test_replace_table_issues_commit_post_immediately(
30353007
rest_mock: Mocker,
30363008
example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any],
@@ -3064,5 +3036,3 @@ def test_replace_table_issues_commit_post_immediately(
30643036
assert replaced.metadata.table_uuid == uuid.UUID(table_uuid)
30653037
methods_after_replace = [r.method for r in rest_mock.request_history]
30663038
assert "POST" in methods_after_replace, "replace_table must commit immediately"
3067-
3068-

0 commit comments

Comments
 (0)