Skip to content

Commit fd7e11f

Browse files
Address review feedback (see PR description for full details)
1 parent 8675256 commit fd7e11f

10 files changed

Lines changed: 402 additions & 204 deletions

File tree

mkdocs/docs/api.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ with catalog.create_table_transaction(identifier="docs_example.bids", schema=sch
187187

188188
## Replace a table
189189

190-
Atomically replace an existing table's schema, partition spec, sort order, location, and properties. The table UUID and history (snapshots, schemas, specs, sort orders, metadata log) are preserved; the current snapshot is cleared (the `main` branch ref is removed). This is the analog of Spark/Trino's `CREATE OR REPLACE TABLE` for the table-metadata side, and supports RTAS-style workflows when combined with subsequent writes.
190+
Atomically replace an existing table's schema, partition spec, sort order, location, and properties. The table UUID and history (snapshots, schemas, specs, sort orders, metadata log) are preserved; the current snapshot is cleared (the `main` branch ref is removed). Use this when you want to redefine the table's metadata; pair it with `replace_table_transaction` to atomically write new data alongside the metadata change (RTAS-style).
191191

192192
```python
193193
from pyiceberg.schema import Schema
@@ -206,7 +206,9 @@ catalog.replace_table(
206206

207207
Field IDs from columns whose names appear in the previous schema are reused, so existing data files remain readable when the new schema is a compatible superset. New columns get fresh IDs above `last-column-id`.
208208

209-
Use `replace_table_transaction` to stage additional changes (writes, property updates, schema evolution) before committing — the equivalent of `CREATE OR REPLACE TABLE AS SELECT`:
209+
Properties passed to `replace_table` are **merged** with the existing table properties (your values override; existing keys you don't pass are preserved). To remove a property as part of the replace, use `replace_table_transaction` and remove it explicitly within the transaction.
210+
211+
Use `replace_table_transaction` to stage additional changes (writes, property updates, schema evolution) before committing — for example, swap the schema and write new data atomically:
210212

211213
```python
212214
with catalog.replace_table_transaction(identifier="docs_example.bids", schema=new_schema) as txn:

pyiceberg/catalog/__init__.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -331,8 +331,8 @@ def delete_data_files(io: FileIO, manifests_to_delete: list[ManifestFile]) -> No
331331
def _raise_if_view_exists(catalog: Catalog, identifier: str | Identifier) -> None:
332332
"""Raise TableAlreadyExistsError if a view exists at the same identifier.
333333
334-
Mirrors Java's `RESTSessionCatalog.replaceTransaction()` precondition. Catalogs that
335-
don't support views raise `NotImplementedError` from `view_exists` — treat as "no view".
334+
Catalogs that don't support views raise `NotImplementedError` from `view_exists` —
335+
treat as "no view" in that case.
336336
"""
337337
try:
338338
view_collision = catalog.view_exists(identifier)
@@ -483,7 +483,10 @@ def replace_table(
483483
location (str | None): New table location. Defaults to the existing location.
484484
partition_spec (PartitionSpec): New partition spec.
485485
sort_order (SortOrder): New sort order.
486-
properties (Properties): New table properties (merged with existing).
486+
properties (Properties): Properties to apply. Merged on top of the existing
487+
table properties: keys present here override existing values; existing keys
488+
not present here are preserved. To remove a property, follow up with a
489+
transaction that removes it explicitly.
487490
488491
Returns:
489492
Table: the replaced table instance.
@@ -516,7 +519,10 @@ def replace_table_transaction(
516519
location (str | None): New table location. Defaults to the existing location.
517520
partition_spec (PartitionSpec): New partition spec.
518521
sort_order (SortOrder): New sort order.
519-
properties (Properties): New table properties (merged with existing).
522+
properties (Properties): Properties to apply. Merged on top of the existing
523+
table properties: keys present here override existing values; existing keys
524+
not present here are preserved. To remove a property, follow up with a
525+
transaction that removes it explicitly.
520526
521527
Returns:
522528
ReplaceTableTransaction: A transaction for the replace operation.
@@ -538,8 +544,7 @@ def _replace_staged_table(
538544
) -> tuple[StagedTable, Schema, PartitionSpec, SortOrder, str]:
539545
"""Load the existing table and build fresh schema/spec/sort-order for replacement.
540546
541-
Mirrors the bookkeeping in `TableMetadata.buildReplacement` (iceberg-java):
542-
- reuses existing field IDs by name (current schema)
547+
- reuses existing field IDs by name (from the current schema)
543548
- reuses partition field IDs by `(source, transform)` across all specs (v2+),
544549
or carries forward the current spec with `VoidTransform`s (v1)
545550
- reassigns sort field IDs against the fresh schema
@@ -551,7 +556,14 @@ def _replace_staged_table(
551556
existing_table = self.load_table(identifier)
552557
existing_metadata = existing_table.metadata
553558

554-
resolved_format_version = int(properties.get(TableProperties.FORMAT_VERSION, existing_metadata.format_version)) # type: ignore
559+
requested_format_version = properties.get(TableProperties.FORMAT_VERSION)
560+
if requested_format_version is not None and int(requested_format_version) < existing_metadata.format_version:
561+
raise ValueError(
562+
f"Cannot downgrade format-version from {existing_metadata.format_version} to {requested_format_version}"
563+
)
564+
resolved_format_version = (
565+
int(requested_format_version) if requested_format_version is not None else existing_metadata.format_version
566+
)
555567
iceberg_schema = self._convert_schema_if_needed(schema, resolved_format_version)
556568

557569
fresh_schema, _ = assign_fresh_schema_ids_for_replace(

pyiceberg/catalog/rest/__init__.py

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -966,26 +966,6 @@ def create_table_transaction(
966966
staged_table = self._response_to_staged_table(self.identifier_to_tuple(identifier), table_response)
967967
return CreateTableTransaction(staged_table)
968968

969-
@override
970-
def replace_table(
971-
self,
972-
identifier: str | Identifier,
973-
schema: Schema | pa.Schema,
974-
location: str | None = None,
975-
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
976-
sort_order: SortOrder = UNSORTED_SORT_ORDER,
977-
properties: Properties = EMPTY_DICT,
978-
) -> Table:
979-
txn = self.replace_table_transaction(
980-
identifier=identifier,
981-
schema=schema,
982-
location=location,
983-
partition_spec=partition_spec,
984-
sort_order=sort_order,
985-
properties=properties,
986-
)
987-
return txn.commit_transaction()
988-
989969
@override
990970
@retry(**_RETRY_ARGS)
991971
def replace_table_transaction(

pyiceberg/partitioning.py

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,6 @@ def assign_fresh_partition_spec_ids_for_replace(
346346
) -> tuple[PartitionSpec, int]:
347347
"""Assign partition field IDs for a replace operation, reusing IDs from existing specs.
348348
349-
Mirrors `TableMetadata.reassignPartitionIds` in iceberg-java:
350349
- For v2+, reuse partition field IDs by `(source_id, transform)` across all existing specs.
351350
New fields get IDs starting from `last_partition_id + 1`.
352351
- For v1, the current spec's fields must be preserved (v1 specs are append-only). Fields
@@ -374,8 +373,8 @@ def assign_fresh_partition_spec_ids_for_replace(
374373
spec, old_schema, fresh_schema, current_spec, effective_last_partition_id
375374
)
376375

377-
# v2+: reuse field IDs by (source_id, transform) across all specs.
378-
# Use max() for dedup when the same (source_id, transform) appears in multiple specs.
376+
# v2+: reuse field IDs by (source_id, transform) across all specs. When the same
377+
# (source_id, transform) appears in multiple specs, prefer the highest field_id.
379378
transform_to_field_id: dict[tuple[int, str], int] = {}
380379
for existing_spec in existing_specs:
381380
for field in existing_spec.fields:
@@ -412,8 +411,9 @@ def assign_fresh_partition_spec_ids_for_replace(
412411
)
413412
)
414413

415-
new_last_partition_id = max(next_id, effective_last_partition_id)
416-
return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID), new_last_partition_id
414+
# `next_id` starts at `effective_last_partition_id` and only increments, so it is the
415+
# new last partition id.
416+
return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID), next_id
417417

418418

419419
def _assign_fresh_partition_spec_ids_for_replace_v1(
@@ -442,6 +442,7 @@ def _assign_fresh_partition_spec_ids_for_replace_v1(
442442

443443
# Walk current spec, carrying forward each field. Matching new fields consume their key;
444444
# missing fields become void transforms.
445+
used_names: set[str] = set(new_field_names)
445446
partition_fields = []
446447
for cur_field in current_spec.fields:
447448
key = (cur_field.source_id, str(cur_field.transform))
@@ -456,8 +457,10 @@ def _assign_fresh_partition_spec_ids_for_replace_v1(
456457
transform=new_field.transform,
457458
)
458459
)
460+
used_names.add(new_field.name)
459461
else:
460-
void_name = f"{cur_field.name}_{cur_field.field_id}" if cur_field.name in new_field_names else cur_field.name
462+
void_name = _unique_void_name(cur_field.name, cur_field.field_id, used_names)
463+
used_names.add(void_name)
461464
partition_fields.append(
462465
PartitionField(
463466
name=void_name,
@@ -480,8 +483,25 @@ def _assign_fresh_partition_spec_ids_for_replace_v1(
480483
)
481484
)
482485

483-
new_last_partition_id = max(next_id, effective_last_partition_id)
484-
return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID), new_last_partition_id
486+
# `next_id` starts at `effective_last_partition_id` and only increments, so it is the
487+
# new last partition id.
488+
return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID), next_id
489+
490+
491+
def _unique_void_name(base_name: str, field_id: int, used_names: set[str]) -> str:
492+
"""Pick a void-transform name that does not collide with already-used names.
493+
494+
First tries `base_name`; if taken, tries `base_name_{field_id}`; if still taken,
495+
appends `_2`, `_3`, ... until unique.
496+
"""
497+
if base_name not in used_names:
498+
return base_name
499+
candidate = f"{base_name}_{field_id}"
500+
suffix = 2
501+
while candidate in used_names:
502+
candidate = f"{base_name}_{field_id}_{suffix}"
503+
suffix += 1
504+
return candidate
485505

486506

487507
T = TypeVar("T")

pyiceberg/schema.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1386,6 +1386,10 @@ class _SetFreshIDsForReplace(_SetFreshIDs):
13861386
For each field in the new schema, if a field with the same full name exists in the
13871387
base schema, its ID is reused; otherwise a fresh ID is allocated starting from
13881388
last_column_id + 1.
1389+
1390+
Note: ID reuse is purely name-based — a field whose name matches but whose type differs
1391+
(e.g. `int` → `string`) will reuse the base ID. This is intentional: replace allows
1392+
arbitrary schema changes; type compatibility is the caller's responsibility.
13891393
"""
13901394

13911395
def __init__(self, old_id_to_base_id: dict[int, int], starting_id: int) -> None:

pyiceberg/table/__init__.py

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@
5858
AddSchemaUpdate,
5959
AddSortOrderUpdate,
6060
AssertCreate,
61+
AssertLastAssignedFieldId,
62+
AssertLastAssignedPartitionId,
6163
AssertRefSnapshotId,
6264
AssertTableUUID,
6365
AssignUUIDUpdate,
@@ -1018,6 +1020,18 @@ class ReplaceTableTransaction(Transaction):
10181020
schema/spec/sort-order/location/properties are applied.
10191021
"""
10201022

1023+
def __init__(
1024+
self,
1025+
table: StagedTable,
1026+
new_schema: Schema,
1027+
new_spec: PartitionSpec,
1028+
new_sort_order: SortOrder,
1029+
new_location: str,
1030+
new_properties: Properties,
1031+
) -> None:
1032+
super().__init__(table, autocommit=False)
1033+
self._initial_changes(table.metadata, new_schema, new_spec, new_sort_order, new_location, new_properties)
1034+
10211035
def _initial_changes(
10221036
self,
10231037
table_metadata: TableMetadata,
@@ -1029,11 +1043,11 @@ def _initial_changes(
10291043
) -> None:
10301044
"""Set the initial changes that transform the existing table into the replacement.
10311045
1032-
Mirrors Java's `TableMetadata.buildReplacement` + `RESTSessionCatalog.replaceTransaction`:
1033-
ensures `SetCurrentSchema` / `SetDefaultPartitionSpec` / `SetDefaultSortOrder` are
1034-
always emitted (even when reused), and bumps `format-version` when requested.
1046+
Always emits `SetCurrentSchema` / `SetDefaultPartitionSpec` / `SetDefaultSortOrder`
1047+
(even when the resulting id is reused) so the request body unambiguously signals a
1048+
replace. Bumps `format-version` when the new properties request it.
10351049
"""
1036-
# Upgrade format-version if requested via properties (matches Java's buildReplacement).
1050+
# Upgrade format-version if requested via properties.
10371051
requested_format_version_str = new_properties.get(TableProperties.FORMAT_VERSION)
10381052
if requested_format_version_str is not None:
10391053
requested_format_version = int(requested_format_version_str)
@@ -1115,30 +1129,30 @@ def _find_matching_sort_order_id(table_metadata: TableMetadata, sort_order: Sort
11151129
return existing.order_id
11161130
return None
11171131

1118-
def __init__(
1119-
self,
1120-
table: StagedTable,
1121-
new_schema: Schema,
1122-
new_spec: PartitionSpec,
1123-
new_sort_order: SortOrder,
1124-
new_location: str,
1125-
new_properties: Properties,
1126-
) -> None:
1127-
super().__init__(table, autocommit=False)
1128-
self._initial_changes(table.metadata, new_schema, new_spec, new_sort_order, new_location, new_properties)
1129-
11301132
def commit_transaction(self) -> Table:
11311133
"""Commit the replace changes to the catalog.
11321134
1133-
Uses AssertTableUUID as the only requirement.
1135+
Requirements:
1136+
- `AssertTableUUID` — the table identity hasn't changed since load.
1137+
- `AssertLastAssignedFieldId` — guards against a concurrent commit bumping
1138+
`last-column-id` between load and commit (which would cause our newly-assigned
1139+
field IDs to collide).
1140+
- `AssertLastAssignedPartitionId` — same guard for partition field IDs.
11341141
11351142
Returns:
11361143
The table with the updates applied.
11371144
"""
11381145
if len(self._updates) > 0:
1146+
base = self._table.metadata
1147+
requirements: tuple[TableRequirement, ...] = (
1148+
AssertTableUUID(uuid=base.table_uuid),
1149+
AssertLastAssignedFieldId(last_assigned_field_id=base.last_column_id),
1150+
)
1151+
if base.last_partition_id is not None:
1152+
requirements += (AssertLastAssignedPartitionId(last_assigned_partition_id=base.last_partition_id),)
11391153
self._table._do_commit( # pylint: disable=W0212
11401154
updates=self._updates,
1141-
requirements=(AssertTableUUID(uuid=self._table.metadata.table_uuid),),
1155+
requirements=requirements,
11421156
)
11431157

11441158
self._updates = ()

0 commit comments

Comments
 (0)