Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
91f3640
feat: add PaginationList for lazy page fetching in catalog list opera…
stark256-spec Jun 3, 2026
5a01899
fix(pagination): resolve mypy and ruff lint errors
stark256-spec Jun 3, 2026
98d6900
fix(tests): resolve ruff lint errors and add len() performance tests
stark256-spec Jun 3, 2026
9fbcdd3
fix(pagination): add docstrings to __getitem__ overload stubs for pyd…
stark256-spec Jun 3, 2026
10c2ba0
fix(pagination): use noqa:D105 on overload stubs instead of docstrings
stark256-spec Jun 3, 2026
0b08854
fix(pagination): drop @overload stubs to resolve D105/D418 pydocstyle…
stark256-spec Jun 3, 2026
6a7b6d0
fix(pagination): add D105/D418 to pydocstyle ignore list; restore @ov…
stark256-spec Jun 3, 2026
d71317c
fix(tests): move PaginationList import to correct alphabetical positi…
stark256-spec Jun 4, 2026
1780ae6
fix(tests): account for RestCatalog config endpoint call in call_coun…
stark256-spec Jun 4, 2026
e2bb799
refactor(pagination): move PaginationList to typedef, add PageFetchRe…
stark256-spec Jun 6, 2026
392a8f5
fix: update PaginationList import in test_rest.py
stark256-spec Jun 6, 2026
7ab945a
fix(lint): add D105 back to pydocstyle ignore for @overload stubs
stark256-spec Jun 10, 2026
329b1bd
fix: resolve ruff import ordering and pagination cleanup bug
stark256-spec Jun 10, 2026
769728d
feat(pagination): add count, index, reversed, copy, and + to Paginati…
stark256-spec Jun 11, 2026
3883280
Address review nits: named FetchNextPage alias and HTTP call count as…
stark256-spec Jun 19, 2026
1c3abd9
Add __mul__ and __rmul__ overrides to PaginationList
stark256-spec Jun 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ repos:
- id: pydocstyle
args:
[
"--ignore=D100,D102,D101,D103,D104,D107,D203,D212,D213,D404,D405,D406,D407,D411,D413,D415,D417",
"--ignore=D100,D102,D101,D103,D104,D105,D107,D203,D212,D213,D404,D405,D406,D407,D411,D413,D415,D417,D418",
Comment thread
Fokko marked this conversation as resolved.
Outdated
]
additional_dependencies:
- tomli==2.0.1
Expand Down
81 changes: 37 additions & 44 deletions pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
from pyiceberg.typedef import EMPTY_DICT, UTF8, IcebergBaseModel, Identifier, Properties
from pyiceberg.types import transform_dict_value_to_str
from pyiceberg.utils.deprecated import deprecation_message
from pyiceberg.utils.pagination import PaginationList
from pyiceberg.utils.properties import get_first_property_value, get_header_properties, property_as_bool, property_as_int
from pyiceberg.view import View
from pyiceberg.view.metadata import ViewMetadata, ViewVersion
Expand Down Expand Up @@ -1051,26 +1052,24 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
raise ValueError(f"{PAGE_SIZE} must be a positive integer")
params["pageSize"] = str(page_size)

tables: list[Identifier] = []
page_token: str | None = None

while True:
if page_token:
params["pageToken"] = page_token
def _fetch_page(page_token: str) -> tuple[list[Identifier], str | None]:
Comment thread
Fokko marked this conversation as resolved.
Outdated
params["pageToken"] = page_token
response = self._session.get(url, params=params)
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchNamespaceError})

parsed = ListTablesResponse.model_validate_json(response.text)
tables.extend([(*table.namespace, table.name) for table in parsed.identifiers])

if not parsed.next_page_token:
break
page_token = parsed.next_page_token
return [(*t.namespace, t.name) for t in parsed.identifiers], parsed.next_page_token

return tables
response = self._session.get(url, params=params)
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchNamespaceError})
parsed = ListTablesResponse.model_validate_json(response.text)
first_page: list[Identifier] = [(*t.namespace, t.name) for t in parsed.identifiers]
return PaginationList(first_page, parsed.next_page_token, _fetch_page)

@retry(**_RETRY_ARGS)
@override
Expand Down Expand Up @@ -1165,27 +1164,24 @@ def list_views(self, namespace: str | Identifier) -> list[Identifier]:
raise ValueError(f"{PAGE_SIZE} must be a positive integer")
params["pageSize"] = str(page_size)

views: list[Identifier] = []
page_token: str | None = None

while True:
if page_token:
params["pageToken"] = page_token

def _fetch_page(page_token: str) -> tuple[list[Identifier], str | None]:
params["pageToken"] = page_token
response = self._session.get(url, params=params)
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchNamespaceError})

parsed = ListViewsResponse.model_validate_json(response.text)
views.extend([(*view.namespace, view.name) for view in parsed.identifiers])

if not parsed.next_page_token:
break
page_token = parsed.next_page_token
return [(*v.namespace, v.name) for v in parsed.identifiers], parsed.next_page_token

return views
response = self._session.get(url, params=params)
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchNamespaceError})
parsed = ListViewsResponse.model_validate_json(response.text)
first_page: list[Identifier] = [(*v.namespace, v.name) for v in parsed.identifiers]
return PaginationList(first_page, parsed.next_page_token, _fetch_page)

@retry(**_RETRY_ARGS)
@override
Expand Down Expand Up @@ -1279,37 +1275,34 @@ def drop_namespace(self, namespace: str | Identifier) -> None:
def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
self._check_endpoint(Capability.V1_LIST_NAMESPACES)
namespace_tuple = self.identifier_to_tuple(namespace)
namespaces_url = self.url(Endpoints.list_namespaces)

params: dict[str, str] = {}
page_size = property_as_int(self.properties, PAGE_SIZE, None)
if page_size is not None:
if page_size <= 0:
raise ValueError(f"{PAGE_SIZE} must be a positive integer")
params["pageSize"] = str(page_size)
if namespace_tuple:
params["parent"] = self._encode_namespace_path(namespace_tuple)

namespaces: list[Identifier] = []
page_token: str | None = None

while True:
if namespace_tuple:
params["parent"] = self._encode_namespace_path(namespace_tuple)
if page_token:
params["pageToken"] = page_token
response = self._session.get(self.url(Endpoints.list_namespaces), params=params)

def _fetch_page(page_token: str) -> tuple[list[Identifier], str | None]:
params["pageToken"] = page_token
response = self._session.get(namespaces_url, params=params)
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchNamespaceError})

parsed = ListNamespaceResponse.model_validate_json(response.text)
namespaces.extend(parsed.namespaces)
return list(parsed.namespaces), parsed.next_page_token

if not parsed.next_page_token:
break
page_token = parsed.next_page_token

return namespaces
response = self._session.get(namespaces_url, params=params)
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchNamespaceError})
parsed = ListNamespaceResponse.model_validate_json(response.text)
return PaginationList(list(parsed.namespaces), parsed.next_page_token, _fetch_page)

@retry(**_RETRY_ARGS)
@override
Expand Down
135 changes: 135 additions & 0 deletions pyiceberg/utils/pagination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Lazy-loading pagination utilities."""

from __future__ import annotations

from collections.abc import Callable, Iterator
from typing import SupportsIndex, TypeVar, overload

T = TypeVar("T")


class PaginationList(list[T]):
Comment thread
Fokko marked this conversation as resolved.
Outdated
"""A list that lazily fetches subsequent pages from a paginated API.

The first page is pre-loaded on construction. Subsequent pages are only
fetched when the caller iterates past items already in memory. Operations
that require the complete result set — ``len()``, ``in``, slicing,
``repr()`` — trigger a full fetch of all remaining pages.

Args:
first_page: Items from the first API response.
next_page_token: Pagination token returned with the first response,
or ``None`` if no further pages exist.
fetch_next_page: Callable that accepts a page token and returns a
tuple of ``(items, next_page_token_or_None)``.
"""

def __init__(
self,
first_page: list[T],
next_page_token: str | None,
fetch_next_page: Callable[[str], tuple[list[T], str | None]],
) -> None:
super().__init__(first_page)
self._next_page_token = next_page_token
self._fetch_next_page = fetch_next_page

# ------------------------------------------------------------------
# Internal helpers — use list's own methods to avoid infinite loops.
# ------------------------------------------------------------------

def _fetch_all(self) -> None:
"""Fetch all remaining pages into the list."""
while self._next_page_token:
items, self._next_page_token = self._fetch_next_page(self._next_page_token)
list.extend(self, items)

def _fetch_through_index(self, idx: int) -> None:
"""Fetch pages until the list contains at least *idx + 1* items."""
while list.__len__(self) <= idx and self._next_page_token:
items, self._next_page_token = self._fetch_next_page(self._next_page_token)
list.extend(self, items)

# ------------------------------------------------------------------
# Lazy iteration
# ------------------------------------------------------------------

def __iter__(self) -> Iterator[T]:
"""Iterate lazily, fetching pages only as the caller advances."""
idx = 0
while True:
if idx < list.__len__(self):
yield list.__getitem__(self, idx)
idx += 1
elif self._next_page_token:
items, self._next_page_token = self._fetch_next_page(self._next_page_token)
list.extend(self, items)
else:
return

# ------------------------------------------------------------------
# Operations that require the complete result set
# ------------------------------------------------------------------

def __len__(self) -> int:
"""Return the total number of items, fetching all pages first."""
self._fetch_all()
return list.__len__(self)

def __contains__(self, item: object) -> bool:
"""Return True if item is present, fetching all pages first."""
self._fetch_all()
return list.__contains__(self, item)

def __repr__(self) -> str:
"""Return string representation after fetching all pages."""
self._fetch_all()
return f"PaginationList({list.__repr__(self)})"

def __eq__(self, other: object) -> bool:
"""Compare equality after fetching all pages."""
self._fetch_all()
return list.__eq__(self, other)

def __ne__(self, other: object) -> bool:
"""Compare inequality after fetching all pages."""
return not self.__eq__(other)

# ------------------------------------------------------------------
# Index / slice access
# ------------------------------------------------------------------

@overload
def __getitem__(self, idx: SupportsIndex) -> T: ...

@overload
def __getitem__(self, idx: slice) -> list[T]: ...

def __getitem__(self, idx: SupportsIndex | slice) -> T | list[T]:
"""Fetch pages as needed before returning the requested item(s)."""
if isinstance(idx, slice):
self._fetch_all()
else:
i = idx.__index__()
if i < 0:
self._fetch_all()
else:
self._fetch_through_index(i)
return list.__getitem__(self, idx)
57 changes: 57 additions & 0 deletions tests/catalog/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
from pyiceberg.typedef import RecursiveDict
from pyiceberg.types import StringType
from pyiceberg.utils.config import Config
from pyiceberg.utils.pagination import PaginationList
from pyiceberg.view import View
from pyiceberg.view.metadata import ViewMetadata, ViewVersion

Expand Down Expand Up @@ -529,6 +530,62 @@ def test_list_tables_paginated_200(rest_mock: Mocker) -> None:
]


def test_list_tables_returns_pagination_list(rest_mock: Mocker) -> None:
"""list_tables returns a PaginationList that defers fetching page 2."""
namespace = "examples"

rest_mock.get(
f"{TEST_URI}v1/namespaces/{namespace}/tables",
json={
"identifiers": [
{"namespace": ["examples"], "name": "table1"},
{"namespace": ["examples"], "name": "table2"},
],
"next-page-token": "pagetoken",
},
status_code=200,
request_headers=TEST_HEADERS,
)
# Second page — registered but should only be called when iterated past page 1.
rest_mock.get(
f"{TEST_URI}v1/namespaces/{namespace}/tables?pageToken=pagetoken",
json={
"identifiers": [
{"namespace": ["examples"], "name": "table3"},
],
},
status_code=200,
request_headers=TEST_HEADERS,
)

catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
calls_after_init = rest_mock.call_count # config endpoint called during __init__

result = catalog.list_tables(namespace)

assert isinstance(result, PaginationList)

# Consuming only the first two items must not trigger the second HTTP request.
first_two = []
for item in result:
first_two.append(item)
if len(first_two) == 2:
break

assert first_two == [("examples", "table1"), ("examples", "table2")]
# Only the initial list_tables request should have been made beyond __init__.
assert rest_mock.call_count == calls_after_init + 1

# Consuming all items forces the second request.
all_tables = list(result)
assert all_tables == [
("examples", "table1"),
("examples", "table2"),
("examples", "table3"),
]
assert rest_mock.call_count == calls_after_init + 2


def test_list_tables_paginated_200_none_next_page_token(rest_mock: Mocker) -> None:
namespace = "examples"
# First page with next-page-token
Expand Down
Loading