diff --git a/src/crawlee/storage_clients/_file_system/_request_queue_client.py b/src/crawlee/storage_clients/_file_system/_request_queue_client.py index 4954d8a4d2..de9fef1b63 100644 --- a/src/crawlee/storage_clients/_file_system/_request_queue_client.py +++ b/src/crawlee/storage_clients/_file_system/_request_queue_client.py @@ -327,7 +327,6 @@ async def add_batch_of_requests( forefront: bool = False, ) -> AddRequestsResponse: async with self._lock: - self._is_empty_cache = None new_total_request_count = self._metadata.total_request_count new_pending_request_count = self._metadata.pending_request_count processed_requests = list[ProcessedRequest]() @@ -477,6 +476,8 @@ async def fetch_next_request(self) -> Request | None: if next_request is not None: state.in_progress_requests.add(next_request.unique_key) + # Invalidate is_empty cache since in-progress state changed. + self._is_empty_cache = None return next_request @@ -598,14 +599,14 @@ async def is_empty(self) -> bool: self._is_empty_cache = False return False - # If we have a cached requests, check them first (fast path). + # If we have cached requests, check them first (fast path). if self._request_cache: for req in self._request_cache: if req.unique_key not in state.handled_requests: self._is_empty_cache = False return False self._is_empty_cache = True - return len(state.in_progress_requests) == 0 + return True # Fallback: check state for unhandled requests. await self._update_metadata(update_accessed_at=True) diff --git a/tests/unit/storage_clients/_file_system/test_fs_rq_client.py b/tests/unit/storage_clients/_file_system/test_fs_rq_client.py index 275665d9d5..2ff4ca015b 100644 --- a/tests/unit/storage_clients/_file_system/test_fs_rq_client.py +++ b/tests/unit/storage_clients/_file_system/test_fs_rq_client.py @@ -2,7 +2,8 @@ import asyncio import json -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any +from unittest.mock import patch import pytest @@ -232,3 +233,84 @@ async def test_get_request_does_not_mark_in_progress(rq_client: FileSystemReques next_request = await rq_client.fetch_next_request() assert next_request is not None assert next_request.unique_key == request.unique_key + + +async def test_is_empty_cache_stale_after_full_lifecycle(rq_client: FileSystemRequestQueueClient) -> None: + """Test that the is_empty cache stays correct through add -> fetch -> handle -> add cycle. + + This exercises the scenario where a queue becomes empty, then new requests arrive. + The cache must be invalidated so the crawler doesn't shut down. + """ + # Add and fully process a request. + await rq_client.add_batch_of_requests([Request.from_url('https://example.com/1')]) + request = await rq_client.fetch_next_request() + assert request is not None + await rq_client.mark_request_as_handled(request) + + # Queue is now empty. + assert await rq_client.is_empty() is True + + # Add a new request - cache must be invalidated. + await rq_client.add_batch_of_requests([Request.from_url('https://example.com/2')]) + + # Must not return the stale cached True. + assert await rq_client.is_empty() is False + + +async def test_is_empty_with_interleaved_operations(rq_client: FileSystemRequestQueueClient) -> None: + """Test is_empty correctness with interleaved add, fetch, reclaim, and handle operations.""" + await rq_client.add_batch_of_requests( + [ + Request.from_url('https://example.com/1'), + Request.from_url('https://example.com/2'), + ] + ) + assert await rq_client.is_empty() is False + + # Fetch first request. + req1 = await rq_client.fetch_next_request() + assert req1 is not None + assert await rq_client.is_empty() is False + + # Reclaim it (put back in queue). + await rq_client.reclaim_request(req1) + assert await rq_client.is_empty() is False + + # Fetch and handle both requests. + for _ in range(2): + req = await rq_client.fetch_next_request() + assert req is not None + assert await rq_client.is_empty() is False + await rq_client.mark_request_as_handled(req) + + # Now should be empty. + assert await rq_client.is_empty() is True + + +async def test_is_empty_no_stale_true_during_concurrent_add(rq_client: FileSystemRequestQueueClient) -> None: + """Test that is_empty never returns a stale True while requests are being added. + + Uses an asyncio.Event to deterministically ensure is_empty contends on the lock + while add_batch_of_requests is mid-operation. + """ + assert await rq_client.is_empty() is True + + add_holding_lock = asyncio.Event() + original_update_metadata = rq_client._update_metadata + + async def slow_update_metadata(**kwargs: Any) -> None: + add_holding_lock.set() + await asyncio.sleep(0) + await original_update_metadata(**kwargs) + + async def check_empty_after_add_starts() -> bool: + await add_holding_lock.wait() + return await rq_client.is_empty() + + with patch.object(rq_client, '_update_metadata', side_effect=slow_update_metadata): + _, is_empty_result = await asyncio.gather( + rq_client.add_batch_of_requests([Request.from_url('https://example.com/race')]), + check_empty_after_add_starts(), + ) + + assert is_empty_result is False