Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 126 additions & 9 deletions pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
TYPE_CHECKING,
Any,
Dict,
Iterator,
List,
Optional,
Set,
Expand Down Expand Up @@ -184,6 +185,7 @@ class ConfigResponse(IcebergBaseModel):

class ListNamespaceResponse(IcebergBaseModel):
namespaces: List[Identifier] = Field()
next_page_token: Optional[str] = Field(alias="next-page-token", default=None)


class NamespaceResponse(IcebergBaseModel):
Expand All @@ -209,10 +211,12 @@ class ListViewResponseEntry(IcebergBaseModel):

class ListTablesResponse(IcebergBaseModel):
identifiers: List[ListTableResponseEntry] = Field()
next_page_token: Optional[str] = Field(alias="next-page-token", default=None)


class ListViewsResponse(IcebergBaseModel):
identifiers: List[ListViewResponseEntry] = Field()
next_page_token: Optional[str] = Field(alias="next-page-token", default=None)


class RestCatalog(Catalog):
Expand Down Expand Up @@ -584,15 +588,51 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
return self._response_to_table(self.identifier_to_tuple(identifier), table_response)

@retry(**_RETRY_ARGS)
def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
namespace_tuple = self._check_valid_namespace_identifier(namespace)
def list_tables_raw(
self, namespace: Union[str, Identifier], page_size: Optional[int] = None, next_page_token: Optional[str] = None
) -> ListTablesResponse:
"""List Tables, optionally provide page size and next page token.

Args:
namespace (Union[str, Identifier]): Namespace to list against
page_size (int): Number of namespaces to return per request.
next_page_token (Optional[str]): Token for pagination.

Returns:
ListTablesResponse: List of tables.
"""
namespace_tuple = self.identifier_to_tuple(namespace)
namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple)
response = self._session.get(self.url(Endpoints.list_tables, namespace=namespace_concat))
params: Dict[str, Any] = {}
if next_page_token is not None:
params["pageToken"] = next_page_token
if page_size is not None:
params["pageSize"] = page_size
response = self._session.get(self.url(Endpoints.list_tables, namespace=namespace_concat), params=params)
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchNamespaceError})
return [(*table.namespace, table.name) for table in ListTablesResponse.model_validate_json(response.text).identifiers]
return ListTablesResponse.model_validate_json(response.text)

def iter_tables(self, namespace: Union[str, Identifier], page_size: Optional[int] = None) -> Iterator[Identifier]:
"""Lazily iterate over tables in a namespace."""
next_page_token = None
while True:
list_tables_response = self.list_tables_raw(namespace=namespace, page_size=page_size, next_page_token=next_page_token)
yield from ((*table.namespace, table.name) for table in list_tables_response.identifiers)
if list_tables_response.next_page_token is None:
break
else:
next_page_token = list_tables_response.next_page_token

def list_tables(self, namespace: Union[str, Identifier], page_size: Optional[int] = None) -> List[Identifier]:
"""List tables in a namespace.

Note: This method loads all tables into memory. For catalogs with many tables,
consider using iter_tables().
"""
return list(self.iter_tables(namespace, page_size))

@retry(**_RETRY_ARGS)
def load_table(self, identifier: Union[str, Identifier]) -> Table:
Expand Down Expand Up @@ -655,15 +695,52 @@ def _remove_catalog_name_from_table_request_identifier(self, table_request: Comm
return table_request

@retry(**_RETRY_ARGS)
def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
def list_views_raw(
self, namespace: Union[str, Identifier], page_size: Optional[int] = None, next_page_token: Optional[str] = None
) -> ListViewsResponse:
"""List Views, optionally provide page size and next page token.

Args:
namespace (Union[str, Identifier]): Namespace to list against
page_size (int): Number of namespaces to return per request.
next_page_token (Optional[str]): Token for pagination.

Returns:
ListViewsResponse: List of views.
"""
namespace_tuple = self._check_valid_namespace_identifier(namespace)
namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple)
response = self._session.get(self.url(Endpoints.list_views, namespace=namespace_concat))
params: Dict[str, Any] = {}
if next_page_token is not None:
params["pageToken"] = next_page_token
if page_size is not None:
params["pageSize"] = page_size
response = self._session.get(self.url(Endpoints.list_views, namespace=namespace_concat), params=params)
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchNamespaceError})
return [(*view.namespace, view.name) for view in ListViewsResponse.model_validate_json(response.text).identifiers]

return ListViewsResponse.model_validate_json(response.text)

def iter_views(self, namespace: Union[str, Identifier], page_size: Optional[int] = None) -> Iterator[Identifier]:
"""Lazily iterate over views in a namespace."""
next_page_token = None
while True:
list_views_response = self.list_views_raw(namespace=namespace, page_size=page_size, next_page_token=next_page_token)
yield from ((*view.namespace, view.name) for view in list_views_response.identifiers)
if list_views_response.next_page_token is None:
break
else:
next_page_token = list_views_response.next_page_token

def list_views(self, namespace: Union[str, Identifier], page_size: Optional[int] = None) -> List[Identifier]:
"""List views in a namespace.

Note: This method loads all views into memory. For catalogs with many views,
consider using iter_views().
"""
return list(self.iter_views(namespace, page_size))

@retry(**_RETRY_ARGS)
def commit_table(
Expand Down Expand Up @@ -732,21 +809,61 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
_handle_non_200_response(exc, {404: NoSuchNamespaceError, 409: NamespaceNotEmptyError})

@retry(**_RETRY_ARGS)
def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]:
def list_namespaces_raw(
self, namespace: Union[str, Identifier] = (), page_size: Optional[int] = None, next_page_token: Optional[str] = None
) -> ListNamespaceResponse:
"""List namespaces, optionally provide page size and next page token.

Args:
namespace (Union[str, Identifier]): Namespace to list sub-namespaces for.
page_size (int): Number of namespaces to return per request.
next_page_token (Optional[str]): Token for pagination.

Returns:
ListNamespaceResponse: List of namespaces.
"""
namespace_tuple = self.identifier_to_tuple(namespace)
params: Dict[str, Any] = {}
if next_page_token is not None:
params["pageToken"] = next_page_token
if page_size is not None:
params["pageSize"] = page_size

response = self._session.get(
self.url(
f"{Endpoints.list_namespaces}?parent={NAMESPACE_SEPARATOR.join(namespace_tuple)}"
if namespace_tuple
else Endpoints.list_namespaces
),
params=params,
)
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchNamespaceError})

return ListNamespaceResponse.model_validate_json(response.text).namespaces
return ListNamespaceResponse.model_validate_json(response.text)

def iter_namespaces(self, namespace: Union[str, Identifier] = (), page_size: Optional[int] = None) -> Iterator[Identifier]:
"""Lazily iterate over namespaces"""
next_page_token = None
while True:
list_namespace_response = self.list_namespaces_raw(
namespace=namespace, page_size=page_size, next_page_token=next_page_token
)
yield from list_namespace_response.namespaces
if list_namespace_response.next_page_token is None:
break
else:
next_page_token = list_namespace_response.next_page_token

def list_namespaces(self, namespace: Union[str, Identifier] = (), page_size: Optional[int] = None) -> List[Identifier]:
"""List namespaces.

Note: This method loads all namespaces into memory. For catalogs with many namespaces,
consider using iter_namespaces().
"""
return list(self.iter_namespaces(namespace, page_size))

@retry(**_RETRY_ARGS)
def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties:
Expand Down
73 changes: 73 additions & 0 deletions tests/catalog/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,28 @@ def test_list_tables_200(rest_mock: Mocker) -> None:
assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace) == [("examples", "fooshare")]


def test_list_tables_paginated_200(rest_mock: Mocker) -> None:
namespace = "examples"
rest_mock.get(
f"{TEST_URI}v1/namespaces/{namespace}/tables",
json={"identifiers": [{"namespace": ["examples"], "name": "fooshare"}], "next-page-token": "page2"},
status_code=200,
request_headers=TEST_HEADERS,
)
rest_mock.get(
f"{TEST_URI}v1/namespaces/{namespace}/tables?pageToken=page2",
json={"identifiers": [{"namespace": ["examples"], "name": "fooshare2"}]},
status_code=200,
request_headers=TEST_HEADERS,
)

assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace) == [
("examples", "fooshare"),
("examples", "fooshare2"),
]
assert rest_mock.call_count == 3


def test_list_tables_200_sigv4(rest_mock: Mocker) -> None:
namespace = "examples"
rest_mock.get(
Expand Down Expand Up @@ -458,6 +480,30 @@ def test_list_views_200(rest_mock: Mocker) -> None:
assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace) == [("examples", "fooshare")]


def test_list_views_paginated_200(rest_mock: Mocker) -> None:
namespace = "examples"
rest_mock.get(
f"{TEST_URI}v1/namespaces/{namespace}/views",
json={"identifiers": [{"namespace": ["examples"], "name": "fooshare"}], "next-page-token": "page2"},
status_code=200,
request_headers=TEST_HEADERS,
)

rest_mock.get(
f"{TEST_URI}v1/namespaces/{namespace}/views?pageToken=page2",
json={"identifiers": [{"namespace": ["examples"], "name": "fooshare2"}]},
status_code=200,
request_headers=TEST_HEADERS,
)

assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace) == [
("examples", "fooshare"),
("examples", "fooshare2"),
]

assert rest_mock.call_count == 3


def test_list_views_200_sigv4(rest_mock: Mocker) -> None:
namespace = "examples"
rest_mock.get(
Expand Down Expand Up @@ -543,6 +589,33 @@ def test_list_namespaces_200(rest_mock: Mocker) -> None:
]


def test_list_namespaces_paginated_200(rest_mock: Mocker) -> None:
rest_mock.get(
f"{TEST_URI}v1/namespaces",
json={"namespaces": [["default"], ["examples"], ["fokko"], ["system"]], "next-page-token": "page2"},
status_code=200,
request_headers=TEST_HEADERS,
)
rest_mock.get(
f"{TEST_URI}v1/namespaces?pageToken=page2",
json={"namespaces": [["default2"], ["examples2"], ["fokko2"], ["system2"]]},
status_code=200,
request_headers=TEST_HEADERS,
)
assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces() == [
("default",),
("examples",),
("fokko",),
("system",),
("default2",),
("examples2",),
("fokko2",),
("system2",),
]

assert rest_mock.call_count == 3


def test_list_namespace_with_parent_200(rest_mock: Mocker) -> None:
rest_mock.get(
f"{TEST_URI}v1/namespaces?parent=accounting",
Expand Down