Skip to content

Commit 728a2c7

Browse files
Support replace_table and replace_table_transaction for REST catalog
1 parent 39e08a1 commit 728a2c7

File tree

9 files changed

+967
-3
lines changed

9 files changed

+967
-3
lines changed

pyiceberg/catalog/__init__.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE,
4848
CommitTableResponse,
4949
CreateTableTransaction,
50+
ReplaceTableTransaction,
5051
StagedTable,
5152
Table,
5253
TableProperties,
@@ -442,6 +443,66 @@ def create_table_if_not_exists(
442443
except TableAlreadyExistsError:
443444
return self.load_table(identifier)
444445

446+
@abstractmethod
447+
def replace_table(
448+
self,
449+
identifier: str | Identifier,
450+
schema: Schema | pa.Schema,
451+
location: str | None = None,
452+
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
453+
sort_order: SortOrder = UNSORTED_SORT_ORDER,
454+
properties: Properties = EMPTY_DICT,
455+
) -> Table:
456+
"""Atomically replace a table's schema, spec, sort order, location, and properties.
457+
458+
The table UUID and history (snapshots, schemas, specs, sort orders) are preserved.
459+
The current snapshot is cleared (main branch ref is removed).
460+
461+
Args:
462+
identifier (str | Identifier): Table identifier.
463+
schema (Schema): New table schema.
464+
location (str | None): New table location. Defaults to the existing location.
465+
partition_spec (PartitionSpec): New partition spec.
466+
sort_order (SortOrder): New sort order.
467+
properties (Properties): New table properties (merged with existing).
468+
469+
Returns:
470+
Table: the replaced table instance.
471+
472+
Raises:
473+
NoSuchTableError: If the table does not exist.
474+
"""
475+
476+
@abstractmethod
477+
def replace_table_transaction(
478+
self,
479+
identifier: str | Identifier,
480+
schema: Schema | pa.Schema,
481+
location: str | None = None,
482+
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
483+
sort_order: SortOrder = UNSORTED_SORT_ORDER,
484+
properties: Properties = EMPTY_DICT,
485+
) -> ReplaceTableTransaction:
486+
"""Create a ReplaceTableTransaction.
487+
488+
The transaction can be used to stage additional changes (schema evolution,
489+
partition evolution, etc.) before committing.
490+
491+
Args:
492+
identifier (str | Identifier): Table identifier.
493+
schema (Schema): New table schema.
494+
location (str | None): New table location. Defaults to the existing location.
495+
partition_spec (PartitionSpec): New partition spec.
496+
sort_order (SortOrder): New sort order.
497+
properties (Properties): New table properties (merged with existing).
498+
499+
Returns:
500+
ReplaceTableTransaction: A transaction for the replace operation.
501+
502+
Raises:
503+
NoSuchTableError: If the table does not exist.
504+
"""
505+
445506
@abstractmethod
446507
def load_table(self, identifier: str | Identifier) -> Table:
447508
"""Load the table's metadata and returns the table instance.
@@ -888,6 +949,28 @@ def create_table_transaction(
888949
self._create_staged_table(identifier, schema, location, partition_spec, sort_order, properties)
889950
)
890951

952+
def replace_table(
953+
self,
954+
identifier: str | Identifier,
955+
schema: Schema | pa.Schema,
956+
location: str | None = None,
957+
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
958+
sort_order: SortOrder = UNSORTED_SORT_ORDER,
959+
properties: Properties = EMPTY_DICT,
960+
) -> Table:
961+
raise NotImplementedError("replace_table is not yet supported for this catalog type")
962+
963+
def replace_table_transaction(
964+
self,
965+
identifier: str | Identifier,
966+
schema: Schema | pa.Schema,
967+
location: str | None = None,
968+
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
969+
sort_order: SortOrder = UNSORTED_SORT_ORDER,
970+
properties: Properties = EMPTY_DICT,
971+
) -> ReplaceTableTransaction:
972+
raise NotImplementedError("replace_table_transaction is not yet supported for this catalog type")
973+
891974
def table_exists(self, identifier: str | Identifier) -> bool:
892975
try:
893976
self.load_table(identifier)

pyiceberg/catalog/noop.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from pyiceberg.table import (
2727
CommitTableResponse,
2828
CreateTableTransaction,
29+
ReplaceTableTransaction,
2930
Table,
3031
)
3132
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -64,6 +65,28 @@ def create_table_transaction(
6465
) -> CreateTableTransaction:
6566
raise NotImplementedError
6667

68+
def replace_table(
69+
self,
70+
identifier: str | Identifier,
71+
schema: Schema | pa.Schema,
72+
location: str | None = None,
73+
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
74+
sort_order: SortOrder = UNSORTED_SORT_ORDER,
75+
properties: Properties = EMPTY_DICT,
76+
) -> Table:
77+
raise NotImplementedError
78+
79+
def replace_table_transaction(
80+
self,
81+
identifier: str | Identifier,
82+
schema: Schema | pa.Schema,
83+
location: str | None = None,
84+
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
85+
sort_order: SortOrder = UNSORTED_SORT_ORDER,
86+
properties: Properties = EMPTY_DICT,
87+
) -> ReplaceTableTransaction:
88+
raise NotImplementedError
89+
6790
def load_table(self, identifier: str | Identifier) -> Table:
6891
raise NotImplementedError
6992

pyiceberg/catalog/rest/__init__.py

Lines changed: 77 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,19 @@
6767
FileIO,
6868
load_file_io,
6969
)
70-
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec, assign_fresh_partition_spec_ids
71-
from pyiceberg.schema import Schema, assign_fresh_schema_ids
70+
from pyiceberg.partitioning import (
71+
UNPARTITIONED_PARTITION_SPEC,
72+
PartitionSpec,
73+
assign_fresh_partition_spec_ids,
74+
assign_fresh_partition_spec_ids_for_replace,
75+
)
76+
from pyiceberg.schema import Schema, assign_fresh_schema_ids, assign_fresh_schema_ids_for_replace
7277
from pyiceberg.table import (
7378
CommitTableRequest,
7479
CommitTableResponse,
7580
CreateTableTransaction,
7681
FileScanTask,
82+
ReplaceTableTransaction,
7783
StagedTable,
7884
Table,
7985
TableIdentifier,
@@ -930,6 +936,75 @@ def create_table_transaction(
930936
staged_table = self._response_to_staged_table(self.identifier_to_tuple(identifier), table_response)
931937
return CreateTableTransaction(staged_table)
932938

939+
def replace_table(
940+
self,
941+
identifier: str | Identifier,
942+
schema: Schema | pa.Schema,
943+
location: str | None = None,
944+
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
945+
sort_order: SortOrder = UNSORTED_SORT_ORDER,
946+
properties: Properties = EMPTY_DICT,
947+
) -> Table:
948+
txn = self.replace_table_transaction(
949+
identifier=identifier,
950+
schema=schema,
951+
location=location,
952+
partition_spec=partition_spec,
953+
sort_order=sort_order,
954+
properties=properties,
955+
)
956+
return txn.commit_transaction()
957+
958+
@retry(**_RETRY_ARGS)
959+
def replace_table_transaction(
960+
self,
961+
identifier: str | Identifier,
962+
schema: Schema | pa.Schema,
963+
location: str | None = None,
964+
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
965+
sort_order: SortOrder = UNSORTED_SORT_ORDER,
966+
properties: Properties = EMPTY_DICT,
967+
) -> ReplaceTableTransaction:
968+
existing_table = self.load_table(identifier)
969+
existing_metadata = existing_table.metadata
970+
971+
iceberg_schema = self._convert_schema_if_needed(
972+
schema,
973+
int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore
974+
)
975+
976+
# Assign fresh schema IDs, reusing IDs from the existing schema by field name
977+
fresh_schema, _ = assign_fresh_schema_ids_for_replace(iceberg_schema, existing_metadata.schema(), existing_metadata.last_column_id)
978+
979+
# Assign fresh partition spec IDs, reusing IDs from existing specs
980+
fresh_partition_spec, _ = assign_fresh_partition_spec_ids_for_replace(
981+
partition_spec, iceberg_schema, fresh_schema, existing_metadata.partition_specs, existing_metadata.last_partition_id
982+
)
983+
984+
# Assign fresh sort order IDs
985+
fresh_sort_order = assign_fresh_sort_order_ids(sort_order, iceberg_schema, fresh_schema)
986+
987+
# Use existing location if not specified
988+
resolved_location = location.rstrip("/") if location else existing_metadata.location
989+
990+
# Create a StagedTable from the existing table
991+
staged_table = StagedTable(
992+
identifier=existing_table.name(),
993+
metadata=existing_metadata,
994+
metadata_location=existing_table.metadata_location,
995+
io=existing_table.io,
996+
catalog=self,
997+
)
998+
999+
return ReplaceTableTransaction(
1000+
table=staged_table,
1001+
new_schema=fresh_schema,
1002+
new_spec=fresh_partition_spec,
1003+
new_sort_order=fresh_sort_order,
1004+
new_location=resolved_location,
1005+
new_properties=properties,
1006+
)
1007+
9331008
@retry(**_RETRY_ARGS)
9341009
def create_view(
9351010
self,

pyiceberg/partitioning.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,73 @@ def assign_fresh_partition_spec_ids(spec: PartitionSpec, old_schema: Schema, fre
335335
return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID)
336336

337337

338+
def assign_fresh_partition_spec_ids_for_replace(
339+
spec: PartitionSpec,
340+
old_schema: Schema,
341+
fresh_schema: Schema,
342+
existing_specs: list[PartitionSpec],
343+
last_partition_id: int | None,
344+
) -> tuple[PartitionSpec, int]:
345+
"""Assign partition field IDs for a replace operation, reusing IDs from existing specs.
346+
347+
For each partition field, if a field with the same (source_id, transform) pair exists in
348+
any of the existing specs, its partition field ID is reused; otherwise a fresh ID is
349+
allocated starting from last_partition_id + 1.
350+
351+
Args:
352+
spec: The new partition spec to assign IDs to.
353+
old_schema: The schema that the new spec's source_ids reference.
354+
fresh_schema: The schema with freshly assigned field IDs.
355+
existing_specs: All partition specs from the existing table metadata.
356+
last_partition_id: The current table's last_partition_id.
357+
358+
Returns:
359+
A tuple of (fresh_spec, new_last_partition_id).
360+
"""
361+
effective_last_partition_id = last_partition_id if last_partition_id is not None else PARTITION_FIELD_ID_START - 1
362+
363+
# Build (source_id, transform) → partition_field_id mapping from all existing specs
364+
# Use max() for dedup when the same (source_id, transform) appears in multiple specs
365+
transform_to_field_id: dict[tuple[int, str], int] = {}
366+
for existing_spec in existing_specs:
367+
for field in existing_spec.fields:
368+
key = (field.source_id, str(field.transform))
369+
if key not in transform_to_field_id or field.field_id > transform_to_field_id[key]:
370+
transform_to_field_id[key] = field.field_id
371+
372+
next_id = effective_last_partition_id
373+
partition_fields = []
374+
for field in spec.fields:
375+
original_column_name = old_schema.find_column_name(field.source_id)
376+
if original_column_name is None:
377+
raise ValueError(f"Could not find in old schema: {field}")
378+
fresh_field = fresh_schema.find_field(original_column_name)
379+
if fresh_field is None:
380+
raise ValueError(f"Could not find field in fresh schema: {original_column_name}")
381+
382+
validate_partition_name(field.name, field.transform, fresh_field.field_id, fresh_schema, set())
383+
384+
key = (fresh_field.field_id, str(field.transform))
385+
if key in transform_to_field_id:
386+
partition_field_id = transform_to_field_id[key]
387+
else:
388+
next_id += 1
389+
partition_field_id = next_id
390+
transform_to_field_id[key] = partition_field_id
391+
392+
partition_fields.append(
393+
PartitionField(
394+
name=field.name,
395+
source_id=fresh_field.field_id,
396+
field_id=partition_field_id,
397+
transform=field.transform,
398+
)
399+
)
400+
401+
new_last_partition_id = max(next_id, effective_last_partition_id)
402+
return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID), new_last_partition_id
403+
404+
338405
T = TypeVar("T")
339406

340407

pyiceberg/schema.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1380,6 +1380,58 @@ def primitive(self, primitive: PrimitiveType) -> PrimitiveType:
13801380
return primitive
13811381

13821382

1383+
class _SetFreshIDsForReplace(_SetFreshIDs):
1384+
"""Assign fresh IDs for a replace operation, reusing IDs from the base schema by field name.
1385+
1386+
For each field in the new schema, if a field with the same full name exists in the
1387+
base schema, its ID is reused; otherwise a fresh ID is allocated starting from
1388+
last_column_id + 1.
1389+
"""
1390+
1391+
def __init__(self, old_id_to_base_id: dict[int, int], starting_id: int) -> None:
1392+
self.old_id_to_new_id: dict[int, int] = {}
1393+
self._old_id_to_base_id = old_id_to_base_id
1394+
counter = itertools.count(starting_id + 1)
1395+
self.next_id_func = lambda: next(counter)
1396+
1397+
def _get_and_increment(self, current_id: int) -> int:
1398+
if current_id in self._old_id_to_base_id:
1399+
new_id = self._old_id_to_base_id[current_id]
1400+
else:
1401+
new_id = self.next_id_func()
1402+
self.old_id_to_new_id[current_id] = new_id
1403+
return new_id
1404+
1405+
1406+
def assign_fresh_schema_ids_for_replace(schema: Schema, base_schema: Schema, last_column_id: int) -> tuple[Schema, int]:
1407+
"""Assign fresh IDs to a schema for a replace operation, reusing IDs from the base schema.
1408+
1409+
For each field in the new schema, if a field with the same full path name exists
1410+
in the base schema, its ID is reused. New fields get IDs starting from
1411+
last_column_id + 1.
1412+
1413+
Args:
1414+
schema: The new schema to assign IDs to.
1415+
base_schema: The existing table's schema (IDs are reused from here by name).
1416+
last_column_id: The current table's last_column_id (new IDs start above this).
1417+
1418+
Returns:
1419+
A tuple of (fresh_schema, new_last_column_id).
1420+
"""
1421+
base_name_to_id = index_by_name(base_schema)
1422+
new_id_to_name = index_name_by_id(schema)
1423+
1424+
old_id_to_base_id: dict[int, int] = {}
1425+
for old_id, name in new_id_to_name.items():
1426+
if name in base_name_to_id:
1427+
old_id_to_base_id[old_id] = base_name_to_id[name]
1428+
1429+
visitor = _SetFreshIDsForReplace(old_id_to_base_id, last_column_id)
1430+
fresh_schema = pre_order_visit(schema, visitor)
1431+
new_last_column_id = max(fresh_schema.highest_field_id, last_column_id)
1432+
return fresh_schema, new_last_column_id
1433+
1434+
13831435
# Implementation copied from Apache Iceberg repo.
13841436
def make_compatible_name(name: str) -> str:
13851437
"""Make a field name compatible with Avro specification.

0 commit comments

Comments
 (0)