Skip to content

Commit e649f95

Browse files
Extend replace_table to all catalogs, harden semantics
- Move replace_table_transaction implementations to MetastoreCatalog and RestCatalog; keep stub + shared _replace_staged_table helper on Catalog so out-of-tree subclasses don't break. - Emit UpgradeFormatVersionUpdate when properties bump format-version, matching Java's TableMetadata.buildReplacement. - Always emit SetCurrentSchema / SetDefaultPartitionSpec / SetDefaultSortOrder, mirroring RESTSessionCatalog.replaceTransaction. - Handle v1 partition specs by carrying forward removed fields as VoidTransform (v1 specs are append-only). - Reject view collisions before replacing. - Assign fresh schema_id / spec_id / order_id on Add* updates so AddSchemaUpdate / AddPartitionSpecUpdate / AddSortOrderUpdate produce uniquely-keyed entries. - Tests: behaviour now covered in tests/catalog/test_catalog_behaviors.py parametrized across InMemoryCatalog + SqlCatalog. test_rest.py keeps only REST wire-specific cases (payload shape, 404, view collision, immediate commit). One end-to-end smoke test remains in tests/integration/test_rest_catalog.py. - Add docs section in mkdocs/docs/api.md.
1 parent 41da8bd commit e649f95

10 files changed

Lines changed: 539 additions & 996 deletions

File tree

mkdocs/docs/api.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,43 @@ with catalog.create_table_transaction(identifier="docs_example.bids", schema=sch
185185
txn.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c")
186186
```
187187

188+
## Replace a table
189+
190+
Atomically replace an existing table's schema, partition spec, sort order, location, and properties. The table UUID and history (snapshots, schemas, specs, sort orders, metadata log) are preserved; the current snapshot is cleared (the `main` branch ref is removed). This is the analog of Spark/Trino's `CREATE OR REPLACE TABLE` for the table-metadata side, and supports RTAS-style workflows when combined with subsequent writes.
191+
192+
```python
193+
from pyiceberg.schema import Schema
194+
from pyiceberg.types import NestedField, LongType, StringType, BooleanType
195+
196+
new_schema = Schema(
197+
NestedField(field_id=1, name="datetime", field_type=LongType(), required=False),
198+
NestedField(field_id=2, name="symbol", field_type=StringType(), required=False),
199+
NestedField(field_id=3, name="active", field_type=BooleanType(), required=False),
200+
)
201+
catalog.replace_table(
202+
identifier="docs_example.bids",
203+
schema=new_schema,
204+
)
205+
```
206+
207+
Field IDs from columns whose names appear in the previous schema are reused, so existing data files remain readable when the new schema is a compatible superset. New columns get fresh IDs above `last-column-id`.
208+
209+
Use `replace_table_transaction` to stage additional changes (writes, property updates, schema evolution) before committing — the equivalent of `CREATE OR REPLACE TABLE AS SELECT`:
210+
211+
```python
212+
with catalog.replace_table_transaction(identifier="docs_example.bids", schema=new_schema) as txn:
213+
with txn.update_snapshot().fast_append() as snap:
214+
for data_file in _dataframe_to_data_files(table_metadata=txn.table_metadata, df=df, io=txn._table.io):
215+
snap.append_data_file(data_file)
216+
txn.set_properties(write_replaced_at="2026-04-19T00:00:00Z")
217+
```
218+
219+
To upgrade the table's format version as part of the replace, pass `format-version` in `properties`:
220+
221+
```python
222+
catalog.replace_table(identifier="docs_example.bids", schema=new_schema, properties={"format-version": "2"})
223+
```
224+
188225
## Register a table
189226

190227
To register a table using existing metadata:

pyiceberg/catalog/__init__.py

Lines changed: 93 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,12 @@
4242
)
4343
from pyiceberg.io import FileIO, load_file_io
4444
from pyiceberg.manifest import ManifestFile
45-
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
46-
from pyiceberg.schema import Schema
45+
from pyiceberg.partitioning import (
46+
UNPARTITIONED_PARTITION_SPEC,
47+
PartitionSpec,
48+
assign_fresh_partition_spec_ids_for_replace,
49+
)
50+
from pyiceberg.schema import Schema, assign_fresh_schema_ids_for_replace
4751
from pyiceberg.serializers import ToOutputFile
4852
from pyiceberg.table import (
4953
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE,
@@ -56,7 +60,7 @@
5660
)
5761
from pyiceberg.table.locations import load_location_provider
5862
from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata
59-
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
63+
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids
6064
from pyiceberg.table.update import (
6165
TableRequirement,
6266
TableUpdate,
@@ -324,6 +328,20 @@ def delete_data_files(io: FileIO, manifests_to_delete: list[ManifestFile]) -> No
324328
deleted_files[path] = True
325329

326330

331+
def _raise_if_view_exists(catalog: Catalog, identifier: str | Identifier) -> None:
332+
"""Raise TableAlreadyExistsError if a view exists at the same identifier.
333+
334+
Mirrors Java's `RESTSessionCatalog.replaceTransaction()` precondition. Catalogs that
335+
don't support views raise `NotImplementedError` from `view_exists` — treat as "no view".
336+
"""
337+
try:
338+
view_collision = catalog.view_exists(identifier)
339+
except NotImplementedError:
340+
view_collision = False
341+
if view_collision:
342+
raise TableAlreadyExistsError(f"View with same name already exists: {identifier}")
343+
344+
327345
def _import_catalog(name: str, catalog_impl: str, properties: Properties) -> Catalog | None:
328346
try:
329347
path_parts = catalog_impl.split(".")
@@ -445,7 +463,6 @@ def create_table_if_not_exists(
445463
except TableAlreadyExistsError:
446464
return self.load_table(identifier)
447465

448-
@abstractmethod
449466
def replace_table(
450467
self,
451468
identifier: str | Identifier,
@@ -473,9 +490,12 @@ def replace_table(
473490
474491
Raises:
475492
NoSuchTableError: If the table does not exist.
493+
TableAlreadyExistsError: If a view exists at the same identifier.
476494
"""
495+
return self.replace_table_transaction(
496+
identifier, schema, location, partition_spec, sort_order, properties
497+
).commit_transaction()
477498

478-
@abstractmethod
479499
def replace_table_transaction(
480500
self,
481501
identifier: str | Identifier,
@@ -503,7 +523,63 @@ def replace_table_transaction(
503523
504524
Raises:
505525
NoSuchTableError: If the table does not exist.
526+
TableAlreadyExistsError: If a view exists at the same identifier.
506527
"""
528+
raise NotImplementedError("replace_table_transaction is not supported for this catalog type")
529+
530+
def _replace_staged_table(
531+
self,
532+
identifier: str | Identifier,
533+
schema: Schema | pa.Schema,
534+
location: str | None,
535+
partition_spec: PartitionSpec,
536+
sort_order: SortOrder,
537+
properties: Properties,
538+
) -> tuple[StagedTable, Schema, PartitionSpec, SortOrder, str]:
539+
"""Load the existing table and build fresh schema/spec/sort-order for replacement.
540+
541+
Mirrors the bookkeeping in `TableMetadata.buildReplacement` (iceberg-java):
542+
- reuses existing field IDs by name (current schema)
543+
- reuses partition field IDs by `(source, transform)` across all specs (v2+),
544+
or carries forward the current spec with `VoidTransform`s (v1)
545+
- reassigns sort field IDs against the fresh schema
546+
- resolves `location` to the existing table's location when omitted
547+
548+
Returns:
549+
A tuple `(staged_table, fresh_schema, fresh_partition_spec, fresh_sort_order, resolved_location)`.
550+
"""
551+
existing_table = self.load_table(identifier)
552+
existing_metadata = existing_table.metadata
553+
554+
resolved_format_version = int(properties.get(TableProperties.FORMAT_VERSION, existing_metadata.format_version)) # type: ignore
555+
iceberg_schema = self._convert_schema_if_needed(schema, resolved_format_version)
556+
557+
fresh_schema, _ = assign_fresh_schema_ids_for_replace(
558+
iceberg_schema, existing_metadata.schema(), existing_metadata.last_column_id
559+
)
560+
561+
fresh_partition_spec, _ = assign_fresh_partition_spec_ids_for_replace(
562+
partition_spec,
563+
iceberg_schema,
564+
fresh_schema,
565+
existing_metadata.partition_specs,
566+
existing_metadata.last_partition_id,
567+
format_version=existing_metadata.format_version,
568+
current_spec=existing_metadata.spec(),
569+
)
570+
571+
fresh_sort_order = assign_fresh_sort_order_ids(sort_order, iceberg_schema, fresh_schema)
572+
573+
resolved_location = location.rstrip("/") if location else existing_metadata.location
574+
575+
staged_table = StagedTable(
576+
identifier=existing_table.name(),
577+
metadata=existing_metadata,
578+
metadata_location=existing_table.metadata_location,
579+
io=existing_table.io,
580+
catalog=self,
581+
)
582+
return staged_table, fresh_schema, fresh_partition_spec, fresh_sort_order, resolved_location
507583

508584
@abstractmethod
509585
def load_table(self, identifier: str | Identifier) -> Table:
@@ -985,18 +1061,6 @@ def create_table_transaction(
9851061
self._create_staged_table(identifier, schema, location, partition_spec, sort_order, properties)
9861062
)
9871063

988-
@override
989-
def replace_table(
990-
self,
991-
identifier: str | Identifier,
992-
schema: Schema | pa.Schema,
993-
location: str | None = None,
994-
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
995-
sort_order: SortOrder = UNSORTED_SORT_ORDER,
996-
properties: Properties = EMPTY_DICT,
997-
) -> Table:
998-
raise NotImplementedError("replace_table is not yet supported for this catalog type")
999-
10001064
@override
10011065
def replace_table_transaction(
10021066
self,
@@ -1007,7 +1071,18 @@ def replace_table_transaction(
10071071
sort_order: SortOrder = UNSORTED_SORT_ORDER,
10081072
properties: Properties = EMPTY_DICT,
10091073
) -> ReplaceTableTransaction:
1010-
raise NotImplementedError("replace_table_transaction is not yet supported for this catalog type")
1074+
_raise_if_view_exists(self, identifier)
1075+
staged_table, fresh_schema, fresh_spec, fresh_sort_order, resolved_location = self._replace_staged_table(
1076+
identifier, schema, location, partition_spec, sort_order, properties
1077+
)
1078+
return ReplaceTableTransaction(
1079+
table=staged_table,
1080+
new_schema=fresh_schema,
1081+
new_spec=fresh_spec,
1082+
new_sort_order=fresh_sort_order,
1083+
new_location=resolved_location,
1084+
new_properties=properties,
1085+
)
10111086

10121087
@override
10131088
def table_exists(self, identifier: str | Identifier) -> bool:

pyiceberg/catalog/noop.py

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
from pyiceberg.table import (
2929
CommitTableResponse,
3030
CreateTableTransaction,
31-
ReplaceTableTransaction,
3231
Table,
3332
)
3433
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -69,30 +68,6 @@ def create_table_transaction(
6968
) -> CreateTableTransaction:
7069
raise NotImplementedError
7170

72-
@override
73-
def replace_table(
74-
self,
75-
identifier: str | Identifier,
76-
schema: Schema | pa.Schema,
77-
location: str | None = None,
78-
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
79-
sort_order: SortOrder = UNSORTED_SORT_ORDER,
80-
properties: Properties = EMPTY_DICT,
81-
) -> Table:
82-
raise NotImplementedError
83-
84-
@override
85-
def replace_table_transaction(
86-
self,
87-
identifier: str | Identifier,
88-
schema: Schema | pa.Schema,
89-
location: str | None = None,
90-
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
91-
sort_order: SortOrder = UNSORTED_SORT_ORDER,
92-
properties: Properties = EMPTY_DICT,
93-
) -> ReplaceTableTransaction:
94-
raise NotImplementedError
95-
9671
@override
9772
def load_table(self, identifier: str | Identifier) -> Table:
9873
raise NotImplementedError

pyiceberg/catalog/rest/__init__.py

Lines changed: 14 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,15 @@
3030
from typing_extensions import override
3131

3232
from pyiceberg import __version__
33-
from pyiceberg.catalog import BOTOCORE_SESSION, TOKEN, URI, WAREHOUSE_LOCATION, Catalog, PropertiesUpdateSummary
33+
from pyiceberg.catalog import (
34+
BOTOCORE_SESSION,
35+
TOKEN,
36+
URI,
37+
WAREHOUSE_LOCATION,
38+
Catalog,
39+
PropertiesUpdateSummary,
40+
_raise_if_view_exists,
41+
)
3442
from pyiceberg.catalog.rest.auth import AUTH_MANAGER, AuthManager, AuthManagerAdapter, AuthManagerFactory, LegacyOAuth2AuthManager
3543
from pyiceberg.catalog.rest.response import _handle_non_200_response
3644
from pyiceberg.catalog.rest.scan_planning import (
@@ -72,9 +80,8 @@
7280
UNPARTITIONED_PARTITION_SPEC,
7381
PartitionSpec,
7482
assign_fresh_partition_spec_ids,
75-
assign_fresh_partition_spec_ids_for_replace,
7683
)
77-
from pyiceberg.schema import Schema, assign_fresh_schema_ids, assign_fresh_schema_ids_for_replace
84+
from pyiceberg.schema import Schema, assign_fresh_schema_ids
7885
from pyiceberg.table import (
7986
CommitTableRequest,
8087
CommitTableResponse,
@@ -990,43 +997,14 @@ def replace_table_transaction(
990997
sort_order: SortOrder = UNSORTED_SORT_ORDER,
991998
properties: Properties = EMPTY_DICT,
992999
) -> ReplaceTableTransaction:
993-
existing_table = self.load_table(identifier)
994-
existing_metadata = existing_table.metadata
995-
996-
iceberg_schema = self._convert_schema_if_needed(
997-
schema,
998-
int(properties.get(TableProperties.FORMAT_VERSION, existing_metadata.format_version)), # type: ignore
999-
)
1000-
1001-
# Assign fresh schema IDs, reusing IDs from the existing schema by field name
1002-
fresh_schema, _ = assign_fresh_schema_ids_for_replace(
1003-
iceberg_schema, existing_metadata.schema(), existing_metadata.last_column_id
1004-
)
1005-
1006-
# Assign fresh partition spec IDs, reusing IDs from existing specs
1007-
fresh_partition_spec, _ = assign_fresh_partition_spec_ids_for_replace(
1008-
partition_spec, iceberg_schema, fresh_schema, existing_metadata.partition_specs, existing_metadata.last_partition_id
1000+
_raise_if_view_exists(self, identifier)
1001+
staged_table, fresh_schema, fresh_spec, fresh_sort_order, resolved_location = self._replace_staged_table(
1002+
identifier, schema, location, partition_spec, sort_order, properties
10091003
)
1010-
1011-
# Assign fresh sort order IDs
1012-
fresh_sort_order = assign_fresh_sort_order_ids(sort_order, iceberg_schema, fresh_schema)
1013-
1014-
# Use existing location if not specified
1015-
resolved_location = location.rstrip("/") if location else existing_metadata.location
1016-
1017-
# Create a StagedTable from the existing table
1018-
staged_table = StagedTable(
1019-
identifier=existing_table.name(),
1020-
metadata=existing_metadata,
1021-
metadata_location=existing_table.metadata_location,
1022-
io=existing_table.io,
1023-
catalog=self,
1024-
)
1025-
10261004
return ReplaceTableTransaction(
10271005
table=staged_table,
10281006
new_schema=fresh_schema,
1029-
new_spec=fresh_partition_spec,
1007+
new_spec=fresh_spec,
10301008
new_sort_order=fresh_sort_order,
10311009
new_location=resolved_location,
10321010
new_properties=properties,

0 commit comments

Comments
 (0)