Skip to content

Commit 4842ea4

Browse files
committed
Done for dataset as well. TODO: For KVS and RQ
1 parent 8954f4f commit 4842ea4

18 files changed

+396
-271
lines changed

src/apify_client/_types.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class ListPage(Generic[T]):
3131
desc: bool
3232
"""Whether the listing is descending or not."""
3333

34-
def __init__(self: ListPage, data: dict) -> None:
34+
def __init__(self, data: dict) -> None:
3535
"""Initialize a ListPage instance from the API response data."""
3636
self.items = data.get('items', [])
3737
self.offset = data.get('offset', 0)

src/apify_client/clients/base/base_client.py

Lines changed: 157 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,23 @@
11
from __future__ import annotations
22

3-
from typing import TYPE_CHECKING, Any
3+
from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Callable, Generator, Iterable, Iterator
4+
from typing import (
5+
TYPE_CHECKING,
6+
Any,
7+
Generic,
8+
Protocol,
9+
TypeVar,
10+
)
411

512
from apify_client._logging import WithLogDetailsClient
13+
from apify_client._types import ListPage
614
from apify_client._utils import to_safe_id
715

816
# Conditional import only executed when type checking, otherwise we'd get circular dependency issues
917
if TYPE_CHECKING:
1018
from apify_client import ApifyClient, ApifyClientAsync
1119
from apify_client._http_client import HTTPClient, HTTPClientAsync
20+
T = TypeVar('T')
1221

1322

1423
class _BaseBaseClient(metaclass=WithLogDetailsClient):
@@ -87,6 +96,42 @@ def __init__(
8796
self.safe_id = to_safe_id(self.resource_id)
8897
self.url = f'{self.url}/{self.safe_id}'
8998

99+
@staticmethod
100+
def _list_iterable_from_callback(callback: Callable[..., ListPage[T]], **kwargs: Any) -> ListPageProtocol[T]:
101+
"""Return object can be awaited or iterated over.
102+
103+
Not using total from the API response as it can change during iteration.
104+
"""
105+
chunk_size = kwargs.pop('chunk_size', 0) or 0
106+
offset = kwargs.get('offset') or 0
107+
limit = kwargs.get('limit') or 0
108+
109+
list_page = callback(**{**kwargs, 'limit': _min_for_limit_param(kwargs.get('limit'), chunk_size)})
110+
111+
def iterator() -> Iterator[T]:
112+
current_page = list_page
113+
for item in current_page.items:
114+
yield item
115+
116+
fetched_items = len(current_page.items)
117+
while (
118+
current_page.items # If there are any items left to fetch
119+
and (
120+
not limit or (limit > fetched_items)
121+
) # If there are is limit to fetch and have not reached it yet.
122+
):
123+
new_kwargs = {
124+
**kwargs,
125+
'offset': offset + fetched_items,
126+
'limit': chunk_size if not limit else _min_for_limit_param(limit - fetched_items, chunk_size),
127+
}
128+
current_page = callback(**new_kwargs)
129+
for item in current_page.items:
130+
yield item
131+
fetched_items += len(current_page.items)
132+
133+
return IterableListPage[T](list_page, iterator())
134+
90135

91136
class BaseClientAsync(_BaseBaseClient):
92137
"""Base class for async sub-clients."""
@@ -127,3 +172,114 @@ def __init__(
127172
if self.resource_id is not None:
128173
self.safe_id = to_safe_id(self.resource_id)
129174
self.url = f'{self.url}/{self.safe_id}'
175+
176+
@staticmethod
177+
def _list_iterable_from_callback(
178+
callback: Callable[..., Awaitable[ListPage[T]]], **kwargs: Any
179+
) -> ListPageProtocolAsync[T]:
180+
"""Return object can be awaited or iterated over.
181+
182+
Not using total from the API response as it can change during iteration.
183+
"""
184+
chunk_size = kwargs.pop('chunk_size', 0) or 0
185+
offset = kwargs.get('offset') or 0
186+
limit = kwargs.get('limit') or 0
187+
188+
list_page_awaitable = callback(**{**kwargs, 'limit': _min_for_limit_param(kwargs.get('limit'), chunk_size)})
189+
190+
async def async_iterator() -> AsyncIterator[T]:
191+
current_page = await list_page_awaitable
192+
for item in current_page.items:
193+
yield item
194+
195+
fetched_items = len(current_page.items)
196+
while (
197+
current_page.items # If there are any items left to fetch
198+
and (
199+
not limit or (limit > fetched_items)
200+
) # If there are is limit to fetch and have not reached it yet.
201+
):
202+
new_kwargs = {
203+
**kwargs,
204+
'offset': offset + fetched_items,
205+
'limit': chunk_size if not limit else _min_for_limit_param(limit - fetched_items, chunk_size),
206+
}
207+
current_page = await callback(**new_kwargs)
208+
for item in current_page.items:
209+
yield item
210+
fetched_items += len(current_page.items)
211+
212+
return IterableListPageAsync[T](list_page_awaitable, async_iterator())
213+
214+
215+
def _min_for_limit_param(a: int | None, b: int | None) -> int | None:
216+
"""Return minimum of two limit parameters, treating None or 0 as infinity. Return None for infinity."""
217+
# API treats 0 as None for limit parameter, in this context API understands 0 as infinity.
218+
if a == 0:
219+
a = None
220+
if b == 0:
221+
b = None
222+
if a is None:
223+
return b
224+
if b is None:
225+
return a
226+
return min(a, b)
227+
228+
229+
class ListPageProtocol(Iterable[T], Protocol[T]):
230+
"""Protocol for an object that can be both awaited and asynchronously iterated over."""
231+
232+
items: list[T]
233+
"""List of returned objects on this page."""
234+
235+
count: int
236+
"""Count of the returned objects on this page."""
237+
238+
offset: int
239+
"""The limit on the number of returned objects offset specified in the API call."""
240+
241+
limit: int
242+
"""The offset of the first object specified in the API call"""
243+
244+
total: int
245+
"""Total number of objects matching the API call criteria."""
246+
247+
desc: bool
248+
"""Whether the listing is descending or not."""
249+
250+
251+
class ListPageProtocolAsync(AsyncIterable[T], Awaitable[ListPage[T]], Protocol[T]):
252+
"""Protocol for an object that can be both awaited and asynchronously iterated over."""
253+
254+
255+
class IterableListPage(ListPage[T], Generic[T]):
256+
"""Can be called to get ListPage with items or iterated over to get individual items."""
257+
258+
def __init__(self, list_page: ListPage[T], iterator: Iterator[T]) -> None:
259+
self.items = list_page.items
260+
self.offset = list_page.offset
261+
self.limit = list_page.limit
262+
self.count = list_page.count
263+
self.total = list_page.total
264+
self.desc = list_page.desc
265+
self._iterator = iterator
266+
267+
def __iter__(self) -> Iterator[T]:
268+
"""Return an iterator over the items from API, possibly doing multiple API calls."""
269+
return self._iterator
270+
271+
272+
class IterableListPageAsync(Generic[T]):
273+
"""Can be awaited to get ListPage with items or asynchronously iterated over to get individual items."""
274+
275+
def __init__(self, awaitable: Awaitable[ListPage[T]], async_iterator: AsyncIterator[T]) -> None:
276+
self._awaitable = awaitable
277+
self._async_iterator = async_iterator
278+
279+
def __aiter__(self) -> AsyncIterator[T]:
280+
"""Return an asynchronous iterator over the items from API, possibly doing multiple API calls."""
281+
return self._async_iterator
282+
283+
def __await__(self) -> Generator[Any, Any, ListPage[T]]:
284+
"""Return an awaitable that resolves to the ListPage doing exactly one API call."""
285+
return self._awaitable.__await__()

src/apify_client/clients/base/resource_collection_client.py

Lines changed: 13 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,24 @@
11
from __future__ import annotations
22

3-
from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Generator, Iterable, Iterator
4-
from typing import Any, Generic, Protocol, TypeVar
3+
from typing import TYPE_CHECKING, Any, TypeVar
54

65
from apify_client._utils import parse_date_fields, pluck_data
7-
from apify_client.clients.base.base_client import BaseClient, BaseClientAsync
6+
from apify_client.clients.base.base_client import (
7+
BaseClient,
8+
BaseClientAsync,
9+
IterableListPage,
10+
IterableListPageAsync,
11+
ListPage,
12+
ListPageProtocolAsync,
13+
_min_for_limit_param,
14+
)
15+
16+
if TYPE_CHECKING:
17+
from collections.abc import AsyncIterator, Iterator
818

919
T = TypeVar('T')
1020

1121

12-
class ListPage(Generic[T]):
13-
"""A single page of items returned from a list() method."""
14-
15-
items: list[T]
16-
"""List of returned objects on this page"""
17-
18-
count: int
19-
"""Count of the returned objects on this page"""
20-
21-
offset: int
22-
"""The limit on the number of returned objects offset specified in the API call"""
23-
24-
limit: int
25-
"""The offset of the first object specified in the API call"""
26-
27-
total: int
28-
"""Total number of objects matching the API call criteria"""
29-
30-
desc: bool
31-
"""Whether the listing is descending or not"""
32-
33-
def __init__(self, data: dict) -> None:
34-
"""Initialize a ListPage instance from the API response data."""
35-
self.items = data.get('items', [])
36-
self.offset = data.get('offset', 0)
37-
self.limit = data.get('limit', 0)
38-
self.count = data['count'] if 'count' in data else len(self.items)
39-
self.total = data['total'] if 'total' in data else self.offset + self.count
40-
self.desc = data.get('desc', False)
41-
42-
4322
class ResourceCollectionClient(BaseClient):
4423
"""Base class for sub-clients manipulating a resource collection."""
4524

@@ -178,76 +157,3 @@ async def _get_or_create(
178157
)
179158

180159
return parse_date_fields(pluck_data(response.json()))
181-
182-
183-
class ListPageProtocol(Iterable[T], Protocol[T]):
184-
"""Protocol for an object that can be both awaited and asynchronously iterated over."""
185-
186-
items: list[T]
187-
"""List of returned objects on this page"""
188-
189-
count: int
190-
"""Count of the returned objects on this page"""
191-
192-
offset: int
193-
"""The limit on the number of returned objects offset specified in the API call"""
194-
195-
limit: int
196-
"""The offset of the first object specified in the API call"""
197-
198-
total: int
199-
"""Total number of objects matching the API call criteria"""
200-
201-
desc: bool
202-
"""Whether the listing is descending or not"""
203-
204-
205-
class IterableListPage(ListPage[T], Generic[T]):
206-
"""Can be called to get ListPage with items or iterated over to get individual items."""
207-
208-
def __init__(self, list_page: ListPage[T], iterator: Iterator[T]) -> None:
209-
self.items = list_page.items
210-
self.offset = list_page.offset
211-
self.limit = list_page.limit
212-
self.count = list_page.count
213-
self.total = list_page.total
214-
self.desc = list_page.desc
215-
self._iterator = iterator
216-
217-
def __iter__(self) -> Iterator[T]:
218-
"""Return an iterator over the items from API, possibly doing multiple API calls."""
219-
return self._iterator
220-
221-
222-
class ListPageProtocolAsync(AsyncIterable[T], Awaitable[ListPage[T]], Protocol[T]):
223-
"""Protocol for an object that can be both awaited and asynchronously iterated over."""
224-
225-
226-
class IterableListPageAsync(Generic[T]):
227-
"""Can be awaited to get ListPage with items or asynchronously iterated over to get individual items."""
228-
229-
def __init__(self, awaitable: Awaitable[ListPage[T]], async_iterator: AsyncIterator[T]) -> None:
230-
self._awaitable = awaitable
231-
self._async_iterator = async_iterator
232-
233-
def __aiter__(self) -> AsyncIterator[T]:
234-
"""Return an asynchronous iterator over the items from API, possibly doing multiple API calls."""
235-
return self._async_iterator
236-
237-
def __await__(self) -> Generator[Any, Any, ListPage[T]]:
238-
"""Return an awaitable that resolves to the ListPage doing exactly one API call."""
239-
return self._awaitable.__await__()
240-
241-
242-
def _min_for_limit_param(a: int | None, b: int | None) -> int | None:
243-
"""Return minimum of two limit parameters, treating None or 0 as infinity. Return None for infinity."""
244-
# API treats 0 as None for limit parameter, in this context API understands 0 as infinity.
245-
if a == 0:
246-
a = None
247-
if b == 0:
248-
b = None
249-
if a is None:
250-
return b
251-
if b is None:
252-
return a
253-
return min(a, b)

src/apify_client/clients/resource_clients/actor_collection.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from apify_client.clients.resource_clients.actor import get_actor_representation
88

99
if TYPE_CHECKING:
10-
from apify_client.clients.base.resource_collection_client import ListPageProtocol, ListPageProtocolAsync
10+
from apify_client.clients.base.base_client import ListPageProtocol, ListPageProtocolAsync
1111

1212

1313
class ActorCollectionClient(ResourceCollectionClient):
@@ -165,7 +165,9 @@ def list(
165165
Returns:
166166
The list of available Actors matching the specified filters.
167167
"""
168-
return self._list_iterable(my=my, limit=limit, offset=offset, desc=desc, sortBy=sort_by)
168+
return self._list_iterable_from_callback(
169+
callback=self._list, my=my, limit=limit, offset=offset, desc=desc, sortBy=sort_by
170+
)
169171

170172
async def create(
171173
self,

src/apify_client/clients/resource_clients/actor_env_var_collection.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from apify_client.clients.resource_clients.actor_env_var import get_actor_env_var_representation
88

99
if TYPE_CHECKING:
10-
from apify_client.clients.base.resource_collection_client import ListPageProtocol, ListPageProtocolAsync
10+
from apify_client.clients.base.base_client import ListPageProtocol, ListPageProtocolAsync
1111

1212

1313
class ActorEnvVarCollectionClient(ResourceCollectionClient):
@@ -70,7 +70,7 @@ def list(self) -> ListPageProtocolAsync[dict]:
7070
Returns:
7171
The list of available actor environment variables.
7272
"""
73-
return self._list_iterable()
73+
return self._list_iterable_from_callback(callback=self._list)
7474

7575
async def create(
7676
self,

src/apify_client/clients/resource_clients/actor_version_collection.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
if TYPE_CHECKING:
1010
from apify_shared.consts import ActorSourceType
1111

12-
from apify_client.clients.base.resource_collection_client import ListPageProtocol, ListPageProtocolAsync
12+
from apify_client.clients.base.base_client import ListPageProtocol, ListPageProtocolAsync
1313

1414

1515
class ActorVersionCollectionClient(ResourceCollectionClient):
@@ -96,7 +96,7 @@ def list(self) -> ListPageProtocolAsync[dict]:
9696
Returns:
9797
The list of available Actor versions.
9898
"""
99-
return self._list_iterable()
99+
return self._list_iterable_from_callback(callback=self._list)
100100

101101
async def create(
102102
self,

src/apify_client/clients/resource_clients/build_collection.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from apify_client.clients.base import ResourceCollectionClient, ResourceCollectionClientAsync
66

77
if TYPE_CHECKING:
8-
from apify_client.clients.base.resource_collection_client import ListPageProtocol, ListPageProtocolAsync
8+
from apify_client.clients.base.base_client import ListPageProtocol, ListPageProtocolAsync
99

1010

1111
class BuildCollectionClient(ResourceCollectionClient):
@@ -71,4 +71,4 @@ def list(
7171
Returns:
7272
The retrieved Actor builds.
7373
"""
74-
return self._list_iterable(limit=limit, offset=offset, desc=desc)
74+
return self._list_iterable_from_callback(callback=self._list, limit=limit, offset=offset, desc=desc)

0 commit comments

Comments
 (0)