Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,6 @@ async def add_batch_of_requests(
forefront: bool = False,
) -> AddRequestsResponse:
async with self._lock:
self._is_empty_cache = None
new_total_request_count = self._metadata.total_request_count
new_pending_request_count = self._metadata.pending_request_count
processed_requests = list[ProcessedRequest]()
Expand Down Expand Up @@ -477,6 +476,8 @@ async def fetch_next_request(self) -> Request | None:

if next_request is not None:
state.in_progress_requests.add(next_request.unique_key)
# Invalidate is_empty cache since in-progress state changed.
self._is_empty_cache = None

return next_request

Expand Down Expand Up @@ -598,14 +599,14 @@ async def is_empty(self) -> bool:
self._is_empty_cache = False
return False

# If we have a cached requests, check them first (fast path).
# If we have cached requests, check them first (fast path).
if self._request_cache:
for req in self._request_cache:
if req.unique_key not in state.handled_requests:
self._is_empty_cache = False
return False
self._is_empty_cache = True
return len(state.in_progress_requests) == 0
return True

# Fallback: check state for unhandled requests.
await self._update_metadata(update_accessed_at=True)
Expand Down
84 changes: 83 additions & 1 deletion tests/unit/storage_clients/_file_system/test_fs_rq_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

import asyncio
import json
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any
from unittest.mock import patch

import pytest

Expand Down Expand Up @@ -232,3 +233,84 @@ async def test_get_request_does_not_mark_in_progress(rq_client: FileSystemReques
next_request = await rq_client.fetch_next_request()
assert next_request is not None
assert next_request.unique_key == request.unique_key


async def test_is_empty_cache_stale_after_full_lifecycle(rq_client: FileSystemRequestQueueClient) -> None:
"""Test that the is_empty cache stays correct through add -> fetch -> handle -> add cycle.

This exercises the scenario where a queue becomes empty, then new requests arrive.
The cache must be invalidated so the crawler doesn't shut down.
"""
# Add and fully process a request.
await rq_client.add_batch_of_requests([Request.from_url('https://example.com/1')])
request = await rq_client.fetch_next_request()
assert request is not None
await rq_client.mark_request_as_handled(request)

# Queue is now empty.
assert await rq_client.is_empty() is True

# Add a new request - cache must be invalidated.
await rq_client.add_batch_of_requests([Request.from_url('https://example.com/2')])

# Must not return the stale cached True.
assert await rq_client.is_empty() is False


async def test_is_empty_with_interleaved_operations(rq_client: FileSystemRequestQueueClient) -> None:
"""Test is_empty correctness with interleaved add, fetch, reclaim, and handle operations."""
await rq_client.add_batch_of_requests(
[
Request.from_url('https://example.com/1'),
Request.from_url('https://example.com/2'),
]
)
assert await rq_client.is_empty() is False

# Fetch first request.
req1 = await rq_client.fetch_next_request()
assert req1 is not None
assert await rq_client.is_empty() is False

# Reclaim it (put back in queue).
await rq_client.reclaim_request(req1)
assert await rq_client.is_empty() is False

# Fetch and handle both requests.
for _ in range(2):
req = await rq_client.fetch_next_request()
assert req is not None
assert await rq_client.is_empty() is False
await rq_client.mark_request_as_handled(req)

# Now should be empty.
assert await rq_client.is_empty() is True


async def test_is_empty_no_stale_true_during_concurrent_add(rq_client: FileSystemRequestQueueClient) -> None:
"""Test that is_empty never returns a stale True while requests are being added.

Uses an asyncio.Event to deterministically ensure is_empty contends on the lock
while add_batch_of_requests is mid-operation.
"""
assert await rq_client.is_empty() is True

add_holding_lock = asyncio.Event()
original_update_metadata = rq_client._update_metadata

async def slow_update_metadata(**kwargs: Any) -> None:
add_holding_lock.set()
await asyncio.sleep(0)
await original_update_metadata(**kwargs)

async def check_empty_after_add_starts() -> bool:
await add_holding_lock.wait()
return await rq_client.is_empty()

with patch.object(rq_client, '_update_metadata', side_effect=slow_update_metadata):
_, is_empty_result = await asyncio.gather(
rq_client.add_batch_of_requests([Request.from_url('https://example.com/race')]),
check_empty_after_add_starts(),
)

assert is_empty_result is False
Loading