Skip to content

Commit 2eefa1f

Browse files
committed
REST: Add support for replacing views
1 parent 5da8186 commit 2eefa1f

11 files changed

Lines changed: 283 additions & 0 deletions

File tree

pyiceberg/catalog/__init__.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -744,6 +744,32 @@ def create_view(
744744
ViewAlreadyExistsError: If a view with the name already exists.
745745
"""
746746

747+
@abstractmethod
748+
def replace_view(
749+
self,
750+
identifier: str | Identifier,
751+
schema: Schema | pa.Schema,
752+
view_version: ViewVersion,
753+
location: str | None = None,
754+
properties: Properties = EMPTY_DICT,
755+
) -> View:
756+
"""Replace a view.
757+
758+
Args:
759+
identifier (str | Identifier): View identifier.
760+
schema (Schema): View's schema.
761+
view_version (ViewVersion): The format version for the view.
762+
location (str | None): Location for the view. Optional Argument.
763+
properties (Properties): View properties that can be a string based dictionary.
764+
765+
Returns:
766+
View: the created view instance.
767+
768+
Raises:
769+
TableAlreadyExistsError: If a table with the same name already exists.
770+
NoSuchViewError: If a view with the name does not exist.
771+
"""
772+
747773
@staticmethod
748774
def identifier_to_tuple(identifier: str | Identifier) -> Identifier:
749775
"""Parse an identifier to a tuple.
@@ -980,6 +1006,17 @@ def create_view(
9801006
) -> View:
9811007
raise NotImplementedError
9821008

1009+
@override
1010+
def replace_view(
1011+
self,
1012+
identifier: str | Identifier,
1013+
schema: Schema | pa.Schema,
1014+
view_version: ViewVersion,
1015+
location: str | None = None,
1016+
properties: Properties = EMPTY_DICT,
1017+
) -> View:
1018+
raise NotImplementedError
1019+
9831020
def _create_staged_table(
9841021
self,
9851022
identifier: str | Identifier,

pyiceberg/catalog/dynamodb.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,17 @@ def create_view(
564564
) -> View:
565565
raise NotImplementedError
566566

567+
@override
568+
def replace_view(
569+
self,
570+
identifier: str | Identifier,
571+
schema: Schema | pa.Schema,
572+
view_version: ViewVersion,
573+
location: str | None = None,
574+
properties: Properties = EMPTY_DICT,
575+
) -> View:
576+
raise NotImplementedError
577+
567578
@override
568579
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
569580
raise NotImplementedError

pyiceberg/catalog/glue.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -981,6 +981,17 @@ def create_view(
981981
) -> View:
982982
raise NotImplementedError
983983

984+
@override
985+
def replace_view(
986+
self,
987+
identifier: str | Identifier,
988+
schema: Schema | pa.Schema,
989+
view_version: ViewVersion,
990+
location: str | None = None,
991+
properties: Properties = EMPTY_DICT,
992+
) -> View:
993+
raise NotImplementedError
994+
984995
@override
985996
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
986997
raise NotImplementedError

pyiceberg/catalog/hive.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,17 @@ def create_view(
443443
) -> View:
444444
raise NotImplementedError
445445

446+
@override
447+
def replace_view(
448+
self,
449+
identifier: str | Identifier,
450+
schema: Union[Schema, "pa.Schema"],
451+
view_version: ViewVersion,
452+
location: str | None = None,
453+
properties: Properties = EMPTY_DICT,
454+
) -> View:
455+
raise NotImplementedError
456+
446457
@override
447458
def register_table(self, identifier: str | Identifier, metadata_location: str, overwrite: bool = False) -> Table:
448459
"""Register a new table using existing metadata.

pyiceberg/catalog/noop.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,17 @@ def create_view(
172172
) -> View:
173173
raise NotImplementedError
174174

175+
@override
176+
def replace_view(
177+
self,
178+
identifier: str | Identifier,
179+
schema: Schema | pa.Schema,
180+
view_version: ViewVersion,
181+
location: str | None = None,
182+
properties: Properties = EMPTY_DICT,
183+
) -> View:
184+
raise NotImplementedError
185+
175186
@override
176187
def load_view(self, identifier: str | Identifier) -> View:
177188
raise NotImplementedError

pyiceberg/catalog/rest/__init__.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,16 @@
8383
from pyiceberg.table.metadata import TableMetadata
8484
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids
8585
from pyiceberg.table.update import (
86+
AddSchemaUpdate,
87+
AddViewVersionUpdate,
88+
AssertViewUUID,
89+
SetCurrentViewVersionUpdate,
90+
SetLocationUpdate,
91+
SetPropertiesUpdate,
8692
TableRequirement,
8793
TableUpdate,
94+
ViewRequirement,
95+
ViewUpdate,
8896
)
8997
from pyiceberg.typedef import EMPTY_DICT, UTF8, IcebergBaseModel, Identifier, Properties
9098
from pyiceberg.types import transform_dict_value_to_str
@@ -155,6 +163,7 @@ class Endpoints:
155163
list_views: str = "namespaces/{namespace}/views"
156164
load_view: str = "namespaces/{namespace}/views/{view}"
157165
create_view: str = "namespaces/{namespace}/views"
166+
update_view: str = "namespaces/{namespace}/views/{view}"
158167
register_view: str = "namespaces/{namespace}/register-view"
159168
drop_view: str = "namespaces/{namespace}/views/{view}"
160169
view_exists: str = "namespaces/{namespace}/views/{view}"
@@ -185,6 +194,7 @@ class Capability:
185194
V1_LIST_VIEWS = Endpoint(http_method=HttpMethod.GET, path=f"{API_PREFIX}/{Endpoints.list_views}")
186195
V1_LOAD_VIEW = Endpoint(http_method=HttpMethod.GET, path=f"{API_PREFIX}/{Endpoints.load_view}")
187196
V1_VIEW_EXISTS = Endpoint(http_method=HttpMethod.HEAD, path=f"{API_PREFIX}/{Endpoints.view_exists}")
197+
V1_UPDATE_VIEW = Endpoint(http_method=HttpMethod.POST, path=f"{API_PREFIX}/{Endpoints.update_view}")
188198
V1_REGISTER_VIEW = Endpoint(http_method=HttpMethod.POST, path=f"{API_PREFIX}/{Endpoints.register_view}")
189199
V1_DELETE_VIEW = Endpoint(http_method=HttpMethod.DELETE, path=f"{API_PREFIX}/{Endpoints.drop_view}")
190200
V1_SUBMIT_TABLE_SCAN_PLAN = Endpoint(http_method=HttpMethod.POST, path=f"{API_PREFIX}/{Endpoints.plan_table_scan}")
@@ -215,6 +225,7 @@ class Capability:
215225
(
216226
Capability.V1_LIST_VIEWS,
217227
Capability.V1_LOAD_VIEW,
228+
Capability.V1_UPDATE_VIEW,
218229
Capability.V1_DELETE_VIEW,
219230
)
220231
)
@@ -337,6 +348,12 @@ class RegisterViewRequest(IcebergBaseModel):
337348
metadata_location: str = Field(..., alias="metadata-location")
338349

339350

351+
class CommitViewRequest(IcebergBaseModel):
352+
identifier: TableIdentifier = Field()
353+
requirements: tuple[ViewRequirement, ...] = Field(default_factory=tuple)
354+
updates: tuple[ViewUpdate, ...] = Field(default_factory=tuple)
355+
356+
340357
class ConfigResponse(IcebergBaseModel):
341358
defaults: Properties | None = Field(default_factory=dict)
342359
overrides: Properties | None = Field(default_factory=dict)
@@ -1000,6 +1017,75 @@ def create_view(
10001017
view_response = ViewResponse.model_validate_json(response.text)
10011018
return self._response_to_view(self.identifier_to_tuple(identifier), view_response)
10021019

1020+
@override
1021+
@retry(**_RETRY_ARGS)
1022+
def replace_view(
1023+
self,
1024+
identifier: str | Identifier,
1025+
schema: Schema | pa.Schema,
1026+
view_version: ViewVersion,
1027+
location: str | None = None,
1028+
properties: Properties = EMPTY_DICT,
1029+
) -> View:
1030+
self._check_endpoint(Capability.V1_UPDATE_VIEW)
1031+
iceberg_schema = self._convert_schema_if_needed(schema)
1032+
1033+
namespace_and_view = self._split_identifier_for_path(identifier, IdentifierKind.VIEW)
1034+
if self.table_exists(identifier):
1035+
raise TableAlreadyExistsError(f"Table with same name already exists: {identifier}")
1036+
if not self.view_exists(identifier):
1037+
raise NoSuchViewError(f"View does not exist: {identifier}")
1038+
1039+
current_view = self.load_view(identifier)
1040+
1041+
if location:
1042+
location = location.rstrip("/")
1043+
1044+
# Check if schema already exists in view metadata by comparing structure
1045+
schema_id = None
1046+
for existing_schema in current_view.metadata.schemas:
1047+
if existing_schema.as_struct() == iceberg_schema.as_struct():
1048+
schema_id = existing_schema.schema_id
1049+
break
1050+
1051+
updates: list[ViewUpdate] = []
1052+
if schema_id is None:
1053+
# Schema not found, add new schema with next schema_id
1054+
next_schema_id = max((s.schema_id for s in current_view.metadata.schemas), default=0) + 1
1055+
schema_to_add = iceberg_schema.model_copy(update={"schema_id": next_schema_id})
1056+
updates.append(AddSchemaUpdate(schema_=schema_to_add))
1057+
schema_id = next_schema_id
1058+
1059+
fresh_view_version = view_version.model_copy(update={"schema_id": schema_id})
1060+
updates.append(AddViewVersionUpdate(view_version=fresh_view_version))
1061+
updates.append(SetCurrentViewVersionUpdate(view_version_id=fresh_view_version.version_id))
1062+
1063+
updates_tuple: tuple[ViewUpdate, ...] = tuple(updates)
1064+
if location:
1065+
updates_tuple = updates_tuple + (SetLocationUpdate(location=location),)
1066+
if properties:
1067+
updates_tuple = updates_tuple + (SetPropertiesUpdate(updates=properties),)
1068+
1069+
requirements: tuple[ViewRequirement, ...] = (AssertViewUUID(uuid=current_view.metadata.view_uuid),)
1070+
1071+
identifier = current_view.name()
1072+
view_identifier = TableIdentifier(namespace=identifier[:-1], name=identifier[-1])
1073+
request = CommitViewRequest(identifier=view_identifier, requirements=requirements, updates=updates_tuple)
1074+
1075+
serialized_json = request.model_dump_json().encode(UTF8)
1076+
response = self._session.post(
1077+
self.url(Endpoints.update_view, **namespace_and_view),
1078+
data=serialized_json,
1079+
)
1080+
1081+
try:
1082+
response.raise_for_status()
1083+
except HTTPError as exc:
1084+
_handle_non_200_response(exc, {409: CommitFailedException, 404: NoSuchViewError})
1085+
1086+
view_response = ViewResponse.model_validate_json(response.text)
1087+
return self._response_to_view(self.identifier_to_tuple(identifier), view_response)
1088+
10031089
@retry(**_RETRY_ARGS)
10041090
@override
10051091
def register_table(self, identifier: str | Identifier, metadata_location: str, overwrite: bool = False) -> Table:

pyiceberg/catalog/sql.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -754,6 +754,17 @@ def create_view(
754754
) -> View:
755755
raise NotImplementedError
756756

757+
@override
758+
def replace_view(
759+
self,
760+
identifier: str | Identifier,
761+
schema: Schema | pa.Schema,
762+
view_version: ViewVersion,
763+
location: str | None = None,
764+
properties: Properties = EMPTY_DICT,
765+
) -> View:
766+
raise NotImplementedError
767+
757768
@override
758769
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
759770
raise NotImplementedError

pyiceberg/table/update/__init__.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,16 @@ class RemovePartitionStatisticsUpdate(IcebergBaseModel):
222222
snapshot_id: int = Field(alias="snapshot-id")
223223

224224

225+
class AddViewVersionUpdate(IcebergBaseModel):
226+
action: Literal["add-view-version"] = Field(default="add-view-version")
227+
view_version: Any = Field(alias="view-version")
228+
229+
230+
class SetCurrentViewVersionUpdate(IcebergBaseModel):
231+
action: Literal["set-current-view-version"] = Field(default="set-current-view-version")
232+
view_version_id: int = Field(alias="view-version-id")
233+
234+
225235
TableUpdate = Annotated[
226236
AssignUUIDUpdate
227237
| UpgradeFormatVersionUpdate
@@ -791,6 +801,19 @@ def validate(self, base_metadata: TableMetadata | None) -> None:
791801
raise CommitFailedException(f"Table UUID does not match: {self.uuid} != {base_metadata.table_uuid}")
792802

793803

804+
class AssertViewUUID(ValidatableTableRequirement):
805+
"""The view UUID must match the requirement's `uuid`."""
806+
807+
type: Literal["assert-view-uuid"] = Field(default="assert-view-uuid")
808+
uuid: uuid.UUID
809+
810+
def validate(self, base_metadata: TableMetadata | None) -> None:
811+
if base_metadata is None:
812+
raise CommitFailedException("Requirement failed: current view metadata is missing")
813+
elif self.uuid != base_metadata.table_uuid:
814+
raise CommitFailedException(f"View UUID does not match: {self.uuid} != {base_metadata.table_uuid}")
815+
816+
794817
class AssertRefSnapshotId(ValidatableTableRequirement):
795818
"""The table branch or tag identified by the requirement's `ref` must reference the requirement's `snapshot-id`.
796819
@@ -919,4 +942,21 @@ def validate(self, base_metadata: TableMetadata | None) -> None:
919942
Field(discriminator="type"),
920943
]
921944

945+
ViewUpdate = Annotated[
946+
AssignUUIDUpdate
947+
| UpgradeFormatVersionUpdate
948+
| AddSchemaUpdate
949+
| SetLocationUpdate
950+
| SetPropertiesUpdate
951+
| RemovePropertiesUpdate
952+
| AddViewVersionUpdate
953+
| SetCurrentViewVersionUpdate,
954+
Field(discriminator="action"),
955+
]
956+
957+
ViewRequirement = Annotated[
958+
AssertViewUUID,
959+
Field(discriminator="type"),
960+
]
961+
922962
UpdatesAndRequirements = tuple[tuple[TableUpdate, ...], tuple[TableRequirement, ...]]

pyiceberg/view/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,14 @@
2323
from pyiceberg.typedef import Identifier
2424
from pyiceberg.view.metadata import SQLViewRepresentation, ViewHistoryEntry, ViewMetadata, ViewVersion
2525

26+
__all__ = [
27+
"View",
28+
"ViewMetadata",
29+
"ViewVersion",
30+
"ViewHistoryEntry",
31+
"SQLViewRepresentation",
32+
]
33+
2634

2735
class View:
2836
"""An Iceberg view."""

tests/catalog/test_rest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@
107107
Capability.V1_LIST_VIEWS,
108108
Capability.V1_LOAD_VIEW,
109109
Capability.V1_VIEW_EXISTS,
110+
Capability.V1_UPDATE_VIEW,
110111
Capability.V1_REGISTER_VIEW,
111112
Capability.V1_DELETE_VIEW,
112113
Capability.V1_SUBMIT_TABLE_SCAN_PLAN,

0 commit comments

Comments
 (0)