Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 111 additions & 9 deletions pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ class ConfigResponse(IcebergBaseModel):

class ListNamespaceResponse(IcebergBaseModel):
namespaces: List[Identifier] = Field()
next_page_token: Optional[str] = Field(alias="next-page-token", default=None)


class NamespaceResponse(IcebergBaseModel):
Expand All @@ -209,10 +210,12 @@ class ListViewResponseEntry(IcebergBaseModel):

class ListTablesResponse(IcebergBaseModel):
identifiers: List[ListTableResponseEntry] = Field()
next_page_token: Optional[str] = Field(alias="next-page-token", default=None)


class ListViewsResponse(IcebergBaseModel):
identifiers: List[ListViewResponseEntry] = Field()
next_page_token: Optional[str] = Field(alias="next-page-token", default=None)


class RestCatalog(Catalog):
Expand Down Expand Up @@ -584,15 +587,47 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
return self._response_to_table(self.identifier_to_tuple(identifier), table_response)

@retry(**_RETRY_ARGS)
def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
namespace_tuple = self._check_valid_namespace_identifier(namespace)
def list_tables_raw(
self, namespace: Union[str, Identifier], page_size: Optional[int] = None, next_page_token: Optional[str] = None
) -> ListTablesResponse:
"""List Tables, optionally provide page size and next page token.

Args:
namespace (Union[str, Identifier]): Namespace to list against
page_size (int): Number of namespaces to return per request.
next_page_token (Optional[str]): Token for pagination.

Returns:
ListTablesResponse: List of tables.
"""
namespace_tuple = self.identifier_to_tuple(namespace)
namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple)
response = self._session.get(self.url(Endpoints.list_tables, namespace=namespace_concat))
params: Dict[str, Any] = {}
if next_page_token is not None:
params["pageToken"] = next_page_token
if page_size is not None:
params["pageSize"] = page_size
response = self._session.get(self.url(Endpoints.list_tables, namespace=namespace_concat), params=params)
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchNamespaceError})
return [(*table.namespace, table.name) for table in ListTablesResponse.model_validate_json(response.text).identifiers]
return ListTablesResponse.model_validate_json(response.text)

@retry(**_RETRY_ARGS)
def list_tables(self, namespace: Union[str, Identifier], page_size: Optional[int] = None) -> List[Identifier]:
tables = []

next_page_token = None
while True:
list_tables_response = self.list_tables_raw(namespace=namespace, page_size=page_size, next_page_token=next_page_token)
tables.extend([(*table.namespace, table.name) for table in list_tables_response.identifiers])
if list_tables_response.next_page_token is None:
break
else:
next_page_token = list_tables_response.next_page_token

return tables
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, we want to return a special Iterable[Identifier] that calls the next page when the current page is exhausted. This avoids pulling in all the Identifiers right away, reducing memory pressure.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, I think I can implement something. That will be a breaking change though right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an iter_* method for each listable and opted to just call list(iter) in the existing list_* methods to avoid a breaking change -- If you have suggestions/guidance on how to accomplish both without making a breaking change I will happily implement as that would be much better but I didn't see an obvious way to do it

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will be partially breaking, but Iterable is pretty close to List, so I think the community might be okay with the change.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is swapping to Iterable what we want to do then? It makes the rest catalog break the interface as every other catalog returns lists -- so would need to change every other catalog to also return an Iterable in the type signature

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overnight, I was thinking that maybe we could subclass List?

Otherwise, switching to the Iterable makes the most sense to me. We can also split out the change in a separate PR and send out an email to the devlist to see what others think.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed on switching to iterable, subclassing list seems sort of like a lot just to support pagination. I will have a commit that makes all list responses from catalogs iterables later...fighting one issue with click and how it handles the context but other than that all tests are passing with that swap

Copy link
Copy Markdown
Contributor Author

@jayceslesar jayceslesar Jul 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, all iterators now -- seriously would have been impossible without the test coverage we have

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think its even worth making a separate issue with just the iterator change as we will still have to modify the list_* methods in the rest catalog due to that retry decorator to essentially be wrappers around a similar list_raw call. Should I just send this PR to the dev list to get some more eyes on it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


@retry(**_RETRY_ARGS)
def load_table(self, identifier: Union[str, Identifier]) -> Table:
Expand Down Expand Up @@ -655,15 +690,47 @@ def _remove_catalog_name_from_table_request_identifier(self, table_request: Comm
return table_request

@retry(**_RETRY_ARGS)
def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
def list_views_raw(
self, namespace: Union[str, Identifier], page_size: Optional[int] = None, next_page_token: Optional[str] = None
) -> ListViewsResponse:
"""List Views, optionally provide page size and next page token.

Args:
namespace (Union[str, Identifier]): Namespace to list against
page_size (int): Number of namespaces to return per request.
next_page_token (Optional[str]): Token for pagination.

Returns:
ListViewsResponse: List of views.
"""
namespace_tuple = self._check_valid_namespace_identifier(namespace)
namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple)
response = self._session.get(self.url(Endpoints.list_views, namespace=namespace_concat))
params: Dict[str, Any] = {}
if next_page_token is not None:
params["pageToken"] = next_page_token
if page_size is not None:
params["pageSize"] = page_size
response = self._session.get(self.url(Endpoints.list_views, namespace=namespace_concat), params=params)
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchNamespaceError})
return [(*view.namespace, view.name) for view in ListViewsResponse.model_validate_json(response.text).identifiers]

return ListViewsResponse.model_validate_json(response.text)

def list_views(self, namespace: Union[str, Identifier], page_size: Optional[int] = None) -> List[Identifier]:
views = []

next_page_token = None
while True:
list_views_response = self.list_views_raw(namespace=namespace, page_size=page_size, next_page_token=next_page_token)
views.extend([(*view.namespace, view.name) for view in list_views_response.identifiers])
if list_views_response.next_page_token is None:
break
else:
next_page_token = list_views_response.next_page_token

return views

@retry(**_RETRY_ARGS)
def commit_table(
Expand Down Expand Up @@ -732,21 +799,56 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
_handle_non_200_response(exc, {404: NoSuchNamespaceError, 409: NamespaceNotEmptyError})

@retry(**_RETRY_ARGS)
def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]:
def list_namespaces_raw(
self, namespace: Union[str, Identifier] = (), page_size: Optional[int] = None, next_page_token: Optional[str] = None
) -> ListNamespaceResponse:
"""List namespaces, optionally provide page size and next page token.

Args:
namespace (Union[str, Identifier]): Namespace to list sub-namespaces for.
page_size (int): Number of namespaces to return per request.
next_page_token (Optional[str]): Token for pagination.

Returns:
ListNamespaceResponse: List of namespaces.
"""
namespace_tuple = self.identifier_to_tuple(namespace)
params: Dict[str, Any] = {}
if next_page_token is not None:
params["pageToken"] = next_page_token
if page_size is not None:
params["pageSize"] = page_size

response = self._session.get(
self.url(
f"{Endpoints.list_namespaces}?parent={NAMESPACE_SEPARATOR.join(namespace_tuple)}"
if namespace_tuple
else Endpoints.list_namespaces
),
params=params,
)
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchNamespaceError})

return ListNamespaceResponse.model_validate_json(response.text).namespaces
return ListNamespaceResponse.model_validate_json(response.text)

def list_namespaces(self, namespace: Union[str, Identifier] = (), page_size: Optional[int] = None) -> List[Identifier]:
namespaces = []

next_page_token = None
while True:
list_namespace_response = self.list_namespaces_raw(
namespace=namespace, page_size=page_size, next_page_token=next_page_token
)
namespaces.extend(list_namespace_response.namespaces)
if list_namespace_response.next_page_token is None:
break
else:
next_page_token = list_namespace_response.next_page_token

return namespaces

@retry(**_RETRY_ARGS)
def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties:
Expand Down
73 changes: 73 additions & 0 deletions tests/catalog/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,28 @@ def test_list_tables_200(rest_mock: Mocker) -> None:
assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace) == [("examples", "fooshare")]


def test_list_tables_paginated_200(rest_mock: Mocker) -> None:
namespace = "examples"
rest_mock.get(
f"{TEST_URI}v1/namespaces/{namespace}/tables",
json={"identifiers": [{"namespace": ["examples"], "name": "fooshare"}], "next-page-token": "page2"},
status_code=200,
request_headers=TEST_HEADERS,
)
rest_mock.get(
f"{TEST_URI}v1/namespaces/{namespace}/tables?pageToken=page2",
json={"identifiers": [{"namespace": ["examples"], "name": "fooshare2"}]},
status_code=200,
request_headers=TEST_HEADERS,
)

assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace) == [
("examples", "fooshare"),
("examples", "fooshare2"),
]
assert rest_mock.call_count == 3


def test_list_tables_200_sigv4(rest_mock: Mocker) -> None:
namespace = "examples"
rest_mock.get(
Expand Down Expand Up @@ -458,6 +480,30 @@ def test_list_views_200(rest_mock: Mocker) -> None:
assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace) == [("examples", "fooshare")]


def test_list_views_paginated_200(rest_mock: Mocker) -> None:
namespace = "examples"
rest_mock.get(
f"{TEST_URI}v1/namespaces/{namespace}/views",
json={"identifiers": [{"namespace": ["examples"], "name": "fooshare"}], "next-page-token": "page2"},
status_code=200,
request_headers=TEST_HEADERS,
)

rest_mock.get(
f"{TEST_URI}v1/namespaces/{namespace}/views?pageToken=page2",
json={"identifiers": [{"namespace": ["examples"], "name": "fooshare2"}]},
status_code=200,
request_headers=TEST_HEADERS,
)

assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace) == [
("examples", "fooshare"),
("examples", "fooshare2"),
]

assert rest_mock.call_count == 3


def test_list_views_200_sigv4(rest_mock: Mocker) -> None:
namespace = "examples"
rest_mock.get(
Expand Down Expand Up @@ -543,6 +589,33 @@ def test_list_namespaces_200(rest_mock: Mocker) -> None:
]


def test_list_namespaces_paginated_200(rest_mock: Mocker) -> None:
rest_mock.get(
f"{TEST_URI}v1/namespaces",
json={"namespaces": [["default"], ["examples"], ["fokko"], ["system"]], "next-page-token": "page2"},
status_code=200,
request_headers=TEST_HEADERS,
)
rest_mock.get(
f"{TEST_URI}v1/namespaces?pageToken=page2",
json={"namespaces": [["default2"], ["examples2"], ["fokko2"], ["system2"]]},
status_code=200,
request_headers=TEST_HEADERS,
)
assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces() == [
("default",),
("examples",),
("fokko",),
("system",),
("default2",),
("examples2",),
("fokko2",),
("system2",),
]

assert rest_mock.call_count == 3


def test_list_namespace_with_parent_200(rest_mock: Mocker) -> None:
rest_mock.get(
f"{TEST_URI}v1/namespaces?parent=accounting",
Expand Down