Skip to content

Commit 9d5e86e

Browse files
authored
fix: Try to solve race conditions (#544)
- Closes: #540
1 parent 76eea47 commit 9d5e86e

3 files changed

Lines changed: 69 additions & 60 deletions

File tree

src/apify/storage_clients/_apify/_request_queue_client.py

Lines changed: 53 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import asyncio
34
from collections import deque
45
from datetime import datetime, timedelta, timezone
56
from logging import getLogger
@@ -84,6 +85,9 @@ def __init__(
8485
self._assumed_handled_count = 0
8586
"""The number of requests we assume have been handled (tracked manually for this instance)."""
8687

88+
self._fetch_lock = asyncio.Lock()
89+
"""Fetch lock to minimize race conditions when communicating with API."""
90+
8791
@override
8892
async def get_metadata(self) -> RequestQueueMetadata:
8993
total_count = self._initial_total_count + self._assumed_total_count
@@ -290,15 +294,17 @@ async def fetch_next_request(self) -> Request | None:
290294
Returns:
291295
The request or `None` if there are no more pending requests.
292296
"""
293-
# Ensure the queue head has requests if available
294-
await self._ensure_head_is_non_empty()
297+
# Ensure the queue head has requests if available. Fetching the head with lock to prevent race conditions.
298+
async with self._fetch_lock:
299+
await self._ensure_head_is_non_empty()
295300

296-
# If queue head is empty after ensuring, there are no requests
297-
if not self._queue_head:
298-
return None
301+
# If queue head is empty after ensuring, there are no requests
302+
if not self._queue_head:
303+
return None
304+
305+
# Get the next request ID from the queue head
306+
next_request_id = self._queue_head.popleft()
299307

300-
# Get the next request ID from the queue head
301-
next_request_id = self._queue_head.popleft()
302308
request = await self._get_or_hydrate_request(next_request_id)
303309

304310
# Handle potential inconsistency where request might not be in the main table yet
@@ -344,6 +350,8 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
344350
if request.handled_at is None:
345351
request.handled_at = datetime.now(tz=timezone.utc)
346352

353+
if cached_request := self._requests_cache[request.id]:
354+
cached_request.was_already_handled = request.was_already_handled
347355
try:
348356
# Update the request in the API
349357
processed_request = await self._update_request(request)
@@ -389,39 +397,41 @@ async def reclaim_request(
389397
if request.was_already_handled:
390398
request.handled_at = None
391399

392-
try:
393-
# Update the request in the API.
394-
processed_request = await self._update_request(request, forefront=forefront)
395-
processed_request.unique_key = request.unique_key
396-
397-
# If the request was previously handled, decrement our handled count since
398-
# we're putting it back for processing.
399-
if request.was_already_handled and not processed_request.was_already_handled:
400-
self._assumed_handled_count -= 1
401-
402-
# Update the cache
403-
cache_key = unique_key_to_request_id(request.unique_key)
404-
self._cache_request(
405-
cache_key,
406-
processed_request,
407-
hydrated_request=request,
408-
)
400+
# Reclaim with lock to prevent race conditions that could lead to double processing of the same request.
401+
async with self._fetch_lock:
402+
try:
403+
# Update the request in the API.
404+
processed_request = await self._update_request(request, forefront=forefront)
405+
processed_request.unique_key = request.unique_key
406+
407+
# If the request was previously handled, decrement our handled count since
408+
# we're putting it back for processing.
409+
if request.was_already_handled and not processed_request.was_already_handled:
410+
self._assumed_handled_count -= 1
411+
412+
# Update the cache
413+
cache_key = unique_key_to_request_id(request.unique_key)
414+
self._cache_request(
415+
cache_key,
416+
processed_request,
417+
hydrated_request=request,
418+
)
409419

410-
# If we're adding to the forefront, we need to check for forefront requests
411-
# in the next list_head call
412-
if forefront:
413-
self._should_check_for_forefront_requests = True
420+
# If we're adding to the forefront, we need to check for forefront requests
421+
# in the next list_head call
422+
if forefront:
423+
self._should_check_for_forefront_requests = True
414424

415-
# Try to release the lock on the request
416-
try:
417-
await self._delete_request_lock(request.id, forefront=forefront)
418-
except Exception as err:
419-
logger.debug(f'Failed to delete request lock for request {request.id}', exc_info=err)
420-
except Exception as exc:
421-
logger.debug(f'Error reclaiming request {request.id}: {exc!s}')
422-
return None
423-
else:
424-
return processed_request
425+
# Try to release the lock on the request
426+
try:
427+
await self._delete_request_lock(request.id, forefront=forefront)
428+
except Exception as err:
429+
logger.debug(f'Failed to delete request lock for request {request.id}', exc_info=err)
430+
except Exception as exc:
431+
logger.debug(f'Error reclaiming request {request.id}: {exc!s}')
432+
return None
433+
else:
434+
return processed_request
425435

426436
@override
427437
async def is_empty(self) -> bool:
@@ -430,9 +440,11 @@ async def is_empty(self) -> bool:
430440
Returns:
431441
True if the queue is empty, False otherwise.
432442
"""
433-
head = await self._list_head(limit=1, lock_time=None)
434-
435-
return len(head.items) == 0 and not self._queue_has_locked_requests
443+
# Check _list_head and self._queue_has_locked_requests with lock to make sure they are consistent.
444+
# Without the lock the `is_empty` is prone to falsely report True with some low probability race condition.
445+
async with self._fetch_lock:
446+
head = await self._list_head(limit=1, lock_time=None)
447+
return len(head.items) == 0 and not self._queue_has_locked_requests
436448

437449
async def _ensure_head_is_non_empty(self) -> None:
438450
"""Ensure that the queue head has requests if they are available in the queue."""
@@ -545,7 +557,6 @@ async def _list_head(
545557
# Return from cache if available and we're not checking for new forefront requests
546558
if self._queue_head and not self._should_check_for_forefront_requests:
547559
logger.debug(f'Using cached queue head with {len(self._queue_head)} requests')
548-
549560
# Create a list of requests from the cached queue head
550561
items = []
551562
for request_id in list(self._queue_head)[:limit]:
@@ -563,7 +574,6 @@ async def _list_head(
563574
queue_has_locked_requests=self._queue_has_locked_requests,
564575
lock_time=lock_time,
565576
)
566-
567577
leftover_buffer = list[str]()
568578
if self._should_check_for_forefront_requests:
569579
leftover_buffer = list(self._queue_head)
@@ -607,13 +617,11 @@ async def _list_head(
607617
),
608618
hydrated_request=request,
609619
)
610-
611620
self._queue_head.append(request.id)
612621

613622
for leftover_request_id in leftover_buffer:
614623
# After adding new requests to the forefront, any existing leftover locked request is kept in the end.
615624
self._queue_head.append(leftover_request_id)
616-
617625
return RequestQueueHead.model_validate(response)
618626

619627
async def _prolong_request_lock(

tests/integration/test_actor_request_queue.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -98,15 +98,18 @@ async def test_request_queue_is_finished(
9898
request_queue_name = generate_unique_resource_name('request_queue')
9999

100100
async with Actor:
101-
request_queue = await Actor.open_request_queue(name=request_queue_name, force_cloud=True)
102-
await request_queue.add_request(Request.from_url('http://example.com'))
103-
assert not await request_queue.is_finished()
104-
105-
request = await request_queue.fetch_next_request()
106-
assert request is not None
107-
assert not await request_queue.is_finished(), (
108-
'RequestQueue should not be finished unless the request is marked as handled.'
109-
)
110-
111-
await request_queue.mark_request_as_handled(request)
112-
assert await request_queue.is_finished()
101+
try:
102+
request_queue = await Actor.open_request_queue(name=request_queue_name, force_cloud=True)
103+
await request_queue.add_request(Request.from_url('http://example.com'))
104+
assert not await request_queue.is_finished()
105+
106+
request = await request_queue.fetch_next_request()
107+
assert request is not None
108+
assert not await request_queue.is_finished(), (
109+
'RequestQueue should not be finished unless the request is marked as handled.'
110+
)
111+
112+
await request_queue.mark_request_as_handled(request)
113+
assert await request_queue.is_finished()
114+
finally:
115+
await request_queue.drop()

tests/integration/test_crawlers_with_storages.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
from typing import TYPE_CHECKING
44

5-
import pytest
6-
75
if TYPE_CHECKING:
86
from .conftest import MakeActorFunction, RunActorFunction
97

@@ -78,7 +76,6 @@ async def default_handler(context: ParselCrawlingContext) -> None:
7876
assert run_result.status == 'SUCCEEDED'
7977

8078

81-
@pytest.mark.skip(reason='Sometimes crawler does not respect max_request_retries argument, see issue #540')
8279
async def test_actor_on_platform_max_request_retries(
8380
make_actor: MakeActorFunction,
8481
run_actor: RunActorFunction,
@@ -87,6 +84,7 @@ async def test_actor_on_platform_max_request_retries(
8784

8885
async def main() -> None:
8986
"""The crawler entry point."""
87+
9088
from crawlee.crawlers import BasicCrawlingContext, ParselCrawler, ParselCrawlingContext
9189

9290
from apify import Actor

0 commit comments

Comments
 (0)