Skip to content

Commit cfee910

Browse files
authored
fix: retry unprocessed requests in RequestQueue.add_request (#1976)
### Description `RequestQueue.add_request` (singular) made a single best-effort `add_batch_of_requests([request])` call and, if the storage client returned the request as *unprocessed*, just logged a warning and returned `None` — silently dropping the request. `add_requests` (plural) already retries unprocessed requests via `_process_batch`, so the two adds had inconsistent durability against best-effort backends (e.g. the Apify platform's `batch_add_requests` endpoint, which may legitimately return a request as unprocessed). This makes a single add as durable as a batched one: - `_process_batch` now returns an `AddRequestsResponse` aggregating the requests processed across all attempts plus any still unprocessed after the retries are exhausted (it previously returned `None`). - `add_request` routes its single request through `_process_batch` and returns `processed_requests[0]`, returning `None` only after retries are exhausted. The `ProcessedRequest | None` return contract and the blocking semantics are preserved (for one request, `add_requests` already runs the first batch synchronously), and the retry is safe because adds are idempotent by `unique_key`. Note: on the failure path `add_request` is now blocking-with-backoff (worst case a few seconds of retry sleeps before returning `None`), where it previously returned immediately. That is the intended trade — durability over a fast silent drop. ### Issues - Closes: #1975 - Surfaced as an intermittent e2e flake in apify/apify-sdk-python#1000 ### Testing - Added `test_add_request_retries_unprocessed` (a request reported unprocessed on the first attempt is retried and survives) and `test_add_request_returns_none_after_exhausting_retries` (stays unprocessed across all attempts → returns `None`, 1 initial + 5 retries). Both fail on `master` and pass with this change. - `uv run poe lint`, `uv run poe type-check`, and the `tests/unit/storages/test_request_queue.py` suite all pass.
1 parent 8c635e8 commit cfee910

2 files changed

Lines changed: 116 additions & 30 deletions

File tree

src/crawlee/storages/_request_queue.py

Lines changed: 43 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from crawlee._utils.docs import docs_group
1212
from crawlee._utils.wait import wait_for_all_tasks_for_finish
1313
from crawlee.request_loaders import RequestManager
14+
from crawlee.storage_clients.models import AddRequestsResponse
1415

1516
from ._base import Storage
1617
from ._utils import validate_storage_name
@@ -181,19 +182,18 @@ async def add_request(
181182
forefront: bool = False,
182183
) -> ProcessedRequest | None:
183184
request = self._transform_request(request)
184-
response = await self._client.add_batch_of_requests([request], forefront=forefront)
185+
# Route through `_process_batch` so a single add retries unprocessed requests just like a batched one.
186+
response = await self._process_batch([request], base_retry_wait=timedelta(seconds=1), forefront=forefront)
185187

186188
if response.processed_requests:
187189
return response.processed_requests[0]
188190

189-
if response.unprocessed_requests:
191+
# `_process_batch` already warns about requests left unprocessed after retries; only an empty response
192+
# (neither processed nor unprocessed) is unexpected here.
193+
if not response.unprocessed_requests:
190194
logger.warning(
191-
f'Request {request.url} was not processed by storage client "{self._client.__class__.__name__}".'
192-
)
193-
else:
194-
logger.warning(
195-
f'Request {request.url} was not processed by storage client "{self._client.__class__.__name__}" '
196-
'received empty response.'
195+
f'Request {request.url} was not processed by storage client '
196+
f'"{self._client.__class__.__name__}" (received an empty response).'
197197
)
198198
return None
199199

@@ -352,33 +352,46 @@ async def _process_batch(
352352
base_retry_wait: timedelta,
353353
attempt: int = 1,
354354
forefront: bool = False,
355-
) -> None:
356-
"""Process a batch of requests with automatic retry mechanism."""
355+
) -> AddRequestsResponse:
356+
"""Process a batch of requests with automatic retry mechanism.
357+
358+
Returns:
359+
A response aggregating all requests processed across attempts plus any still unprocessed once the
360+
retries are exhausted.
361+
"""
357362
max_attempts = 5
358363
response = await self._client.add_batch_of_requests(batch, forefront=forefront)
359364

360-
if response.unprocessed_requests:
361-
logger.debug(f'Following requests were not processed: {response.unprocessed_requests}.')
362-
if attempt > max_attempts:
363-
logger.warning(
364-
f'Following requests were not processed even after {max_attempts} attempts:\n'
365-
f'{response.unprocessed_requests}'
366-
)
367-
else:
368-
logger.debug('Retry to add requests.')
369-
unprocessed_requests_unique_keys = {request.unique_key for request in response.unprocessed_requests}
370-
retry_batch = [request for request in batch if request.unique_key in unprocessed_requests_unique_keys]
371-
await asyncio.sleep((base_retry_wait * attempt).total_seconds())
372-
await self._process_batch(
373-
retry_batch,
374-
base_retry_wait=base_retry_wait,
375-
attempt=attempt + 1,
376-
forefront=forefront,
377-
)
378-
379365
request_count = len(batch) - len(response.unprocessed_requests)
380-
381366
if request_count:
382367
logger.debug(
383368
f'Added {request_count} requests to the queue. Processed requests: {response.processed_requests}'
384369
)
370+
371+
if not response.unprocessed_requests:
372+
return response
373+
374+
logger.debug(f'Following requests were not processed: {response.unprocessed_requests}.')
375+
if attempt > max_attempts:
376+
logger.warning(
377+
f'Following requests were not processed even after {max_attempts} attempts:\n'
378+
f'{response.unprocessed_requests}'
379+
)
380+
return response
381+
382+
logger.debug('Retry to add requests.')
383+
unprocessed_requests_unique_keys = {request.unique_key for request in response.unprocessed_requests}
384+
retry_batch = [request for request in batch if request.unique_key in unprocessed_requests_unique_keys]
385+
await asyncio.sleep((base_retry_wait * attempt).total_seconds())
386+
retry_response = await self._process_batch(
387+
retry_batch,
388+
base_retry_wait=base_retry_wait,
389+
attempt=attempt + 1,
390+
forefront=forefront,
391+
)
392+
393+
# Merge the retry outcome: processed requests accumulate, unprocessed is whatever the last attempt left.
394+
return AddRequestsResponse(
395+
processed_requests=[*response.processed_requests, *retry_response.processed_requests],
396+
unprocessed_requests=retry_response.unprocessed_requests,
397+
)

tests/unit/storages/test_request_queue.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,79 @@ async def patched_add_batch(
311311
)
312312

313313

314+
async def _no_sleep(_seconds: float) -> None:
315+
"""Drop-in replacement for `asyncio.sleep` that returns immediately, to keep retry tests fast."""
316+
317+
318+
async def test_add_request_retries_unprocessed(monkeypatch: pytest.MonkeyPatch) -> None:
319+
"""`add_request` must retry an unprocessed request (like `add_requests`) instead of silently dropping it."""
320+
rq = await RequestQueue.open(storage_client=MemoryStorageClient())
321+
monkeypatch.setattr('crawlee.storages._request_queue.asyncio.sleep', _no_sleep)
322+
calls = 0
323+
324+
async def patched_add_batch(
325+
requests: Sequence[Request],
326+
*,
327+
forefront: bool = False, # noqa: ARG001
328+
) -> AddRequestsResponse:
329+
nonlocal calls
330+
calls += 1
331+
# First attempt reports the request as unprocessed; the retry succeeds.
332+
if calls == 1:
333+
return AddRequestsResponse(
334+
processed_requests=[],
335+
unprocessed_requests=[UnprocessedRequest(unique_key=r.unique_key, url=r.url) for r in requests],
336+
)
337+
return AddRequestsResponse(
338+
processed_requests=[
339+
ProcessedRequest(unique_key=r.unique_key, was_already_present=False, was_already_handled=False)
340+
for r in requests
341+
],
342+
unprocessed_requests=[],
343+
)
344+
345+
monkeypatch.setattr(rq._client, 'add_batch_of_requests', patched_add_batch)
346+
347+
try:
348+
result = await rq.add_request('https://example.com/retry')
349+
finally:
350+
await rq.drop()
351+
352+
assert calls == 2, f'expected one retry after the unprocessed response, got {calls} calls'
353+
assert result is not None
354+
assert result.was_already_present is False
355+
356+
357+
async def test_add_request_returns_none_after_exhausting_retries(monkeypatch: pytest.MonkeyPatch) -> None:
358+
"""When a request stays unprocessed across all retries, `add_request` returns `None` rather than raising."""
359+
rq = await RequestQueue.open(storage_client=MemoryStorageClient())
360+
monkeypatch.setattr('crawlee.storages._request_queue.asyncio.sleep', _no_sleep)
361+
calls = 0
362+
363+
async def patched_add_batch(
364+
requests: Sequence[Request],
365+
*,
366+
forefront: bool = False, # noqa: ARG001
367+
) -> AddRequestsResponse:
368+
nonlocal calls
369+
calls += 1
370+
return AddRequestsResponse(
371+
processed_requests=[],
372+
unprocessed_requests=[UnprocessedRequest(unique_key=r.unique_key, url=r.url) for r in requests],
373+
)
374+
375+
monkeypatch.setattr(rq._client, 'add_batch_of_requests', patched_add_batch)
376+
377+
try:
378+
result = await rq.add_request('https://example.com/doomed')
379+
finally:
380+
await rq.drop()
381+
382+
assert result is None
383+
# One initial attempt plus five retries; the mechanism stops once `attempt` exceeds `max_attempts`.
384+
assert calls == 6, f'expected 6 attempts (1 initial + 5 retries), got {calls}'
385+
386+
314387
async def test_add_requests_mixed_forefront(rq: RequestQueue) -> None:
315388
"""Test the ordering when adding requests with mixed forefront values."""
316389
# Add normal requests

0 commit comments

Comments
 (0)