Skip to content

Commit 164e427

Browse files
committed
fix: retry unprocessed requests in RequestQueue.add_request
1 parent 970f93b commit 164e427

2 files changed

Lines changed: 119 additions & 30 deletions

File tree

src/crawlee/storages/_request_queue.py

Lines changed: 46 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from crawlee._utils.docs import docs_group
1212
from crawlee._utils.wait import wait_for_all_tasks_for_finish
1313
from crawlee.request_loaders import RequestManager
14+
from crawlee.storage_clients.models import AddRequestsResponse
1415

1516
from ._base import Storage
1617
from ._utils import validate_storage_name
@@ -181,19 +182,20 @@ async def add_request(
181182
forefront: bool = False,
182183
) -> ProcessedRequest | None:
183184
request = self._transform_request(request)
184-
response = await self._client.add_batch_of_requests([request], forefront=forefront)
185+
# Route the single request through the same retry mechanism as `add_requests`, so that adding one
186+
# request is just as durable as a batched add against best-effort backends that may return a request
187+
# as unprocessed.
188+
response = await self._process_batch([request], base_retry_wait=timedelta(seconds=1), forefront=forefront)
185189

186190
if response.processed_requests:
187191
return response.processed_requests[0]
188192

189-
if response.unprocessed_requests:
193+
# `_process_batch` already warns when requests remain unprocessed after the retries are exhausted, so
194+
# only the empty-response case (neither processed nor unprocessed) needs a warning here.
195+
if not response.unprocessed_requests:
190196
logger.warning(
191-
f'Request {request.url} was not processed by storage client "{self._client.__class__.__name__}".'
192-
)
193-
else:
194-
logger.warning(
195-
f'Request {request.url} was not processed by storage client "{self._client.__class__.__name__}" '
196-
'received empty response.'
197+
f'Request {request.url} was not processed by storage client '
198+
f'"{self._client.__class__.__name__}" (received an empty response).'
197199
)
198200
return None
199201

@@ -352,33 +354,47 @@ async def _process_batch(
352354
base_retry_wait: timedelta,
353355
attempt: int = 1,
354356
forefront: bool = False,
355-
) -> None:
356-
"""Process a batch of requests with automatic retry mechanism."""
357+
) -> AddRequestsResponse:
358+
"""Process a batch of requests with automatic retry mechanism.
359+
360+
Returns:
361+
A response aggregating the requests processed across all attempts together with any that remained
362+
unprocessed after the retries were exhausted.
363+
"""
357364
max_attempts = 5
358365
response = await self._client.add_batch_of_requests(batch, forefront=forefront)
359366

360-
if response.unprocessed_requests:
361-
logger.debug(f'Following requests were not processed: {response.unprocessed_requests}.')
362-
if attempt > max_attempts:
363-
logger.warning(
364-
f'Following requests were not processed even after {max_attempts} attempts:\n'
365-
f'{response.unprocessed_requests}'
366-
)
367-
else:
368-
logger.debug('Retry to add requests.')
369-
unprocessed_requests_unique_keys = {request.unique_key for request in response.unprocessed_requests}
370-
retry_batch = [request for request in batch if request.unique_key in unprocessed_requests_unique_keys]
371-
await asyncio.sleep((base_retry_wait * attempt).total_seconds())
372-
await self._process_batch(
373-
retry_batch,
374-
base_retry_wait=base_retry_wait,
375-
attempt=attempt + 1,
376-
forefront=forefront,
377-
)
378-
379367
request_count = len(batch) - len(response.unprocessed_requests)
380-
381368
if request_count:
382369
logger.debug(
383370
f'Added {request_count} requests to the queue. Processed requests: {response.processed_requests}'
384371
)
372+
373+
if not response.unprocessed_requests:
374+
return response
375+
376+
logger.debug(f'Following requests were not processed: {response.unprocessed_requests}.')
377+
if attempt > max_attempts:
378+
logger.warning(
379+
f'Following requests were not processed even after {max_attempts} attempts:\n'
380+
f'{response.unprocessed_requests}'
381+
)
382+
return response
383+
384+
logger.debug('Retry to add requests.')
385+
unprocessed_requests_unique_keys = {request.unique_key for request in response.unprocessed_requests}
386+
retry_batch = [request for request in batch if request.unique_key in unprocessed_requests_unique_keys]
387+
await asyncio.sleep((base_retry_wait * attempt).total_seconds())
388+
retry_response = await self._process_batch(
389+
retry_batch,
390+
base_retry_wait=base_retry_wait,
391+
attempt=attempt + 1,
392+
forefront=forefront,
393+
)
394+
395+
# Fold the retry outcome back in: requests processed on retry join this attempt's processed requests,
396+
# and only those still unprocessed after the final attempt remain.
397+
return AddRequestsResponse(
398+
processed_requests=[*response.processed_requests, *retry_response.processed_requests],
399+
unprocessed_requests=retry_response.unprocessed_requests,
400+
)

tests/unit/storages/test_request_queue.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,79 @@ async def patched_add_batch(
311311
)
312312

313313

314+
async def _no_sleep(_seconds: float) -> None:
315+
"""Drop-in replacement for `asyncio.sleep` that returns immediately, to keep retry tests fast."""
316+
317+
318+
async def test_add_request_retries_unprocessed(monkeypatch: pytest.MonkeyPatch) -> None:
319+
"""`add_request` must retry an unprocessed request (like `add_requests`) instead of silently dropping it."""
320+
rq = await RequestQueue.open(storage_client=MemoryStorageClient())
321+
monkeypatch.setattr('crawlee.storages._request_queue.asyncio.sleep', _no_sleep)
322+
calls = 0
323+
324+
async def patched_add_batch(
325+
requests: Sequence[Request],
326+
*,
327+
forefront: bool = False, # noqa: ARG001
328+
) -> AddRequestsResponse:
329+
nonlocal calls
330+
calls += 1
331+
# First attempt reports the request as unprocessed; the retry succeeds.
332+
if calls == 1:
333+
return AddRequestsResponse(
334+
processed_requests=[],
335+
unprocessed_requests=[UnprocessedRequest(unique_key=r.unique_key, url=r.url) for r in requests],
336+
)
337+
return AddRequestsResponse(
338+
processed_requests=[
339+
ProcessedRequest(unique_key=r.unique_key, was_already_present=False, was_already_handled=False)
340+
for r in requests
341+
],
342+
unprocessed_requests=[],
343+
)
344+
345+
monkeypatch.setattr(rq._client, 'add_batch_of_requests', patched_add_batch)
346+
347+
try:
348+
result = await rq.add_request('https://example.com/retry')
349+
finally:
350+
await rq.drop()
351+
352+
assert calls == 2, f'expected one retry after the unprocessed response, got {calls} calls'
353+
assert result is not None
354+
assert result.was_already_present is False
355+
356+
357+
async def test_add_request_returns_none_after_exhausting_retries(monkeypatch: pytest.MonkeyPatch) -> None:
358+
"""When a request stays unprocessed across all retries, `add_request` returns `None` rather than raising."""
359+
rq = await RequestQueue.open(storage_client=MemoryStorageClient())
360+
monkeypatch.setattr('crawlee.storages._request_queue.asyncio.sleep', _no_sleep)
361+
calls = 0
362+
363+
async def patched_add_batch(
364+
requests: Sequence[Request],
365+
*,
366+
forefront: bool = False, # noqa: ARG001
367+
) -> AddRequestsResponse:
368+
nonlocal calls
369+
calls += 1
370+
return AddRequestsResponse(
371+
processed_requests=[],
372+
unprocessed_requests=[UnprocessedRequest(unique_key=r.unique_key, url=r.url) for r in requests],
373+
)
374+
375+
monkeypatch.setattr(rq._client, 'add_batch_of_requests', patched_add_batch)
376+
377+
try:
378+
result = await rq.add_request('https://example.com/doomed')
379+
finally:
380+
await rq.drop()
381+
382+
assert result is None
383+
# One initial attempt plus five retries; the mechanism stops once `attempt` exceeds `max_attempts`.
384+
assert calls == 6, f'expected 6 attempts (1 initial + 5 retries), got {calls}'
385+
386+
314387
async def test_add_requests_mixed_forefront(rq: RequestQueue) -> None:
315388
"""Test the ordering when adding requests with mixed forefront values."""
316389
# Add normal requests

0 commit comments

Comments
 (0)