|
6 | 6 | from logging import getLogger |
7 | 7 | from typing import Annotated |
8 | 8 |
|
9 | | -from pydantic import BaseModel, ConfigDict, Field |
| 9 | +from pydantic import BaseModel, ConfigDict, Field, ValidationError |
10 | 10 | from typing_extensions import override |
11 | 11 |
|
12 | 12 | from crawlee._request import Request |
@@ -106,10 +106,14 @@ async def _get_state(self) -> RequestListState: |
106 | 106 | if self._persist_request_data: |
107 | 107 | async with self._requests_lock: |
108 | 108 | if not await self._requests_data.has_persisted_state(): |
109 | | - self._requests_data.current_value.requests = [ |
110 | | - request if isinstance(request, Request) else Request.from_url(request) |
111 | | - async for request in self._requests |
112 | | - ] |
| 109 | + self._requests_data.current_value.requests = [] |
| 110 | + async for processing_request in self._requests: |
| 111 | + try: |
| 112 | + request = self._transform_request(processing_request) |
| 113 | + except ValidationError: |
| 114 | + logger.warning(f'Invalid request encountered in the request list: {processing_request}') |
| 115 | + continue |
| 116 | + self._requests_data.current_value.requests.append(request) |
113 | 117 | await self._requests_data.persist_state() |
114 | 118 |
|
115 | 119 | self._requests = self._iterate_in_threadpool( |
@@ -202,11 +206,18 @@ async def _ensure_next_request(self) -> None: |
202 | 206 | self._next = (self._next[0], to_enqueue[0]) |
203 | 207 |
|
204 | 208 | async def _dequeue_requests(self, count: int) -> AsyncGenerator[Request | None]: |
205 | | - for _ in range(count): |
| 209 | + while count > 0: |
206 | 210 | try: |
207 | | - yield self._transform_request(await self._requests.__anext__()) |
208 | | - except StopAsyncIteration: # noqa: PERF203 |
| 211 | + processing_request = await self._requests.__anext__() |
| 212 | + try: |
| 213 | + request = self._transform_request(processing_request) |
| 214 | + except ValidationError: |
| 215 | + logger.warning(f'Invalid request encountered in the request list: {processing_request}') |
| 216 | + continue |
| 217 | + yield request |
| 218 | + except StopAsyncIteration: |
209 | 219 | yield None |
| 220 | + count -= 1 |
210 | 221 |
|
211 | 222 | async def _iterate_in_threadpool(self, iterable: Iterable[str | Request]) -> AsyncIterator[str | Request]: |
212 | 223 | """Inspired by a function of the same name from encode/starlette.""" |
|
0 commit comments