Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/crawlee/storage_clients/_base/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,16 @@ async def reclaim_request(

@abstractmethod
async def is_empty(self) -> bool:
"""Check if the request queue is empty.
"""Check if the request queue is empty. That means there are no requests available to fetch.

Returns:
True if the request queue is empty, False otherwise.
"""

async def is_finished(self) -> bool:
"""Check if the request queue is finished. That means the queue is empty and no requests are being processed.

Returns:
True if the request queue is finished, False otherwise.
"""
return await self.is_empty()
Comment thread
Mantisus marked this conversation as resolved.
34 changes: 24 additions & 10 deletions src/crawlee/storage_clients/_file_system/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,8 @@ async def fetch_next_request(self) -> Request | None:

if next_request is not None:
state.in_progress_requests.add(next_request.unique_key)
# `in_progress_requests` is updated, so we need to invalidate the `is_empty` cache.
self._is_empty_cache = None

return next_request

Expand Down Expand Up @@ -592,34 +594,46 @@ async def is_empty(self) -> bool:

state = self._state.current_value

# If there are in-progress requests, return False immediately.
if len(state.in_progress_requests) > 0:
self._is_empty_cache = False
return False

# If we have a cached requests, check them first (fast path).
if self._request_cache:
for req in self._request_cache:
if req.unique_key not in state.handled_requests:
self._is_empty_cache = False
return False
self._is_empty_cache = True
return len(state.in_progress_requests) == 0
return True

# Fallback: check state for unhandled requests.
await self._update_metadata(update_accessed_at=True)

# Check if there are any requests that are not handled
all_requests = set(state.forefront_requests.keys()) | set(state.regular_requests.keys())
unhandled_requests = all_requests - state.handled_requests
# Check pending requests is state.
Comment thread
Mantisus marked this conversation as resolved.
Outdated
queue_requests = (
set(state.forefront_requests.keys()) | set(state.regular_requests.keys())
) - state.in_progress_requests

if unhandled_requests:
pending_requests = queue_requests - state.handled_requests

if pending_requests:
self._is_empty_cache = False
return False

self._is_empty_cache = True
return True

@override
async def is_finished(self) -> bool:
# If there are requests available to fetch, the queue is not finished.
if not await self.is_empty():
return False

async with self._lock:
# Check, if cache changed while waiting for the lock.
if self._is_empty_cache is not True:
return False

# If there are any in-progress requests, the queue is not finished.
return not self._state.current_value.in_progress_requests

def _get_request_path(self, unique_key: str) -> Path:
"""Get the path to a specific request file.

Expand Down
14 changes: 12 additions & 2 deletions src/crawlee/storage_clients/_memory/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,18 @@ async def is_empty(self) -> bool:
"""
await self._update_metadata(update_accessed_at=True)

# Queue is empty if there are no pending requests and no requests in progress.
return len(self._pending_requests) == 0 and len(self._in_progress_requests) == 0
# Queue is empty if there are no pending requests.
return len(self._pending_requests) == 0

@override
async def is_finished(self) -> bool:
"""Check if the queue is finished.
Comment thread
Mantisus marked this conversation as resolved.
Outdated

Returns:
True if the queue is finished, False otherwise.
"""
# Queue is finished if it is empty and there are no in-progress requests.
return await self.is_empty() and len(self._in_progress_requests) == 0

async def _update_metadata(
self,
Expand Down
17 changes: 11 additions & 6 deletions src/crawlee/storage_clients/_redis/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,11 +496,7 @@ async def reclaim_request(
@retry_on_error(RedisError)
@override
async def is_empty(self) -> bool:
"""Check if the queue is empty.

Returns:
True if the queue is empty, False otherwise.
"""
# Requests buffered for fetching mean the queue is not empty.
if self._pending_fetch_cache:
return False

Expand All @@ -509,9 +505,18 @@ async def is_empty(self) -> bool:
await self._reclaim_stale_requests()
self._next_reclaim_stale = datetime.now(tz=timezone.utc) + self._RECLAIM_INTERVAL

# Check if there are any requests in the queue.
requests_in_queue = await await_redis_response(self._redis.llen(self._queue_key))
return requests_in_queue == 0

@retry_on_error(RedisError)
@override
async def is_finished(self) -> bool:
is_empty = await self.is_empty()

metadata = await self.get_metadata()

return metadata.pending_request_count == 0
return is_empty and metadata.pending_request_count == 0
Comment thread
Mantisus marked this conversation as resolved.
Outdated

async def _load_scripts(self) -> None:
"""Ensure Lua scripts are loaded in Redis."""
Expand Down
29 changes: 27 additions & 2 deletions src/crawlee/storage_clients/_sql/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,10 +595,34 @@ async def reclaim_request(
@retry_on_error(SQLAlchemyError)
@override
async def is_empty(self) -> bool:
# Check in-memory cache for requests
# Requests buffered for fetching mean the queue is not empty.
if self._pending_fetch_cache:
return False

now = datetime.now(timezone.utc)

# Check if there are any unhandled requests that are not blocked.
async with self.get_session(with_simple_commit=True) as session:
stmt = select(
exists().where(
self._ITEM_TABLE.request_queue_id == self._id,
self._ITEM_TABLE.is_handled == False, # noqa: E712
or_(self._ITEM_TABLE.time_blocked_until.is_(None), self._ITEM_TABLE.time_blocked_until < now),
)
)
result = await session.execute(stmt)

await self._add_buffer_record(session)

return not result.scalar()

@retry_on_error(SQLAlchemyError)
@override
async def is_finished(self) -> bool:
# If the queue is not empty, it is not finished
if not await self.is_empty():

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is_finished calls await self.is_empty() here, which opens a DB session, and then opens a second session below for get_metadata() and the remaining queries. Since the autoscaled pool polls is_finished/is_empty while scheduling requests, this could have a performance impact.

Maybe we could handle the empty check directly inside is_finished, using only a single session?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting a session from the pool is quite cheap, but it can actually impact performance if the pool is small. However, this will only become apparent when the queue is empty but not yet finished.

But merging is_empty and get_metadata into a single session in is_finished will make the method harder to read.

If we need to optimize these methods, I would consider removing the _add_buffer_record calls in is_empty and is_finished, just to update accessed_at in the metadata. This will have a greater impact on performance.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, could you please write these optimization ideas down in an issue?

return False

metadata = await self.get_metadata()

async with self.get_session(with_simple_commit=True) as session:
Expand Down Expand Up @@ -629,7 +653,8 @@ async def is_empty(self) -> bool:
has_pending_buffer_updates = buffer_result.scalar()

await self._add_buffer_record(session)
# If there are no pending requests and no buffered updates, the queue is empty

# If there are no pending requests and no buffered updates, the queue is finished
return not has_pending_buffer_updates

# There are pending requests (may be inaccurate), ensure recalculated metadata
Expand Down
17 changes: 8 additions & 9 deletions src/crawlee/storages/_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,9 @@ async def reclaim_request(
async def is_empty(self) -> bool:
"""Check if the request queue is empty.

An empty queue means that there are no requests currently in the queue, either pending or being processed.
However, this does not necessarily mean that the crawling operation is finished, as there still might be
tasks that could add additional requests to the queue.
An empty queue means that there are no requests currently available to fetch. However, this does not
necessarily mean that the crawling operation is finished, as there still might be requests being processed
or tasks that could add additional requests to the queue.

Returns:
True if the request queue is empty, False otherwise.
Expand All @@ -328,19 +328,18 @@ async def is_empty(self) -> bool:
async def is_finished(self) -> bool:
"""Check if the request queue is finished.

A finished queue means that all requests in the queue have been processed (the queue is empty) and there
are no more tasks that could add additional requests to the queue. This is the definitive way to check
if a crawling operation is complete.
A finished queue means that all requests have been processed and there are no more tasks that could add
additional requests to the queue. This is the definitive way to check if a crawling operation is complete.

Returns:
True if the request queue is finished (empty and no pending add operations), False otherwise.
True if the request queue is finished and no pending add operations, False otherwise.
"""
if self._add_requests_tasks:
logger.debug('Background add requests tasks are still in progress.')
return False

if await self.is_empty():
logger.debug('The request queue is empty.')
if await self._client.is_finished():
logger.debug('The request queue is finished.')
return True

return False
Expand Down
37 changes: 33 additions & 4 deletions tests/unit/crawlers/_basic/test_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,29 @@ async def handler(context: BasicCrawlingContext) -> None:
]


async def test_no_new_tasks_while_only_request_in_progress() -> None:
Comment thread
Mantisus marked this conversation as resolved.
concurrency = 4
crawler = BasicCrawler(
concurrency_settings=ConcurrencySettings(desired_concurrency=concurrency, max_concurrency=concurrency),
)

request_manager = await crawler.get_request_manager()

@crawler.router.default_handler
async def handler(context: BasicCrawlingContext) -> None:
await asyncio.sleep(0.2)

with patch.object(
request_manager,
'fetch_next_request',
wraps=request_manager.fetch_next_request,
) as fetch_counter:
await crawler.run(['https://a.placeholder.com'])

# `concurrency` tasks can be scheduled if control was never yielded with `await` during task scheduling
assert fetch_counter.call_count <= 4
Comment thread
Mantisus marked this conversation as resolved.
Outdated


async def test_respects_no_retry() -> None:
crawler = BasicCrawler(max_request_retries=2)
calls = list[str]()
Expand Down Expand Up @@ -1883,10 +1906,16 @@ class _CrawlerInput:


def _process_run_crawlers(crawler_inputs: list[_CrawlerInput], storage_dir: str) -> list[StatisticsState]:
return [
asyncio.run(_run_crawler(crawler_id=crawler_input.id, requests=crawler_input.requests, storage_dir=storage_dir))
for crawler_input in crawler_inputs
]
states = list[StatisticsState]()
for crawler_input in crawler_inputs:
states.append(
asyncio.run(
_run_crawler(crawler_id=crawler_input.id, requests=crawler_input.requests, storage_dir=storage_dir)
)
)
# Each crawler runs in its own event loop. Drop the cached storage instances between runs.
service_locator.storage_instance_manager.clear_cache()
return states


async def test_crawler_state_persistence(tmp_path: Path) -> None:
Expand Down
20 changes: 15 additions & 5 deletions tests/unit/storages/test_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,23 +544,33 @@ async def test_reclaim_request_with_forefront(rq: RequestQueue) -> None:
assert next_request.url == 'https://example.com/first'


async def test_is_empty(rq: RequestQueue) -> None:
"""Test checking if a request queue is empty."""
# Initially the queue should be empty
async def test_is_empty_and_is_finished(rq: RequestQueue) -> None:
"""Test checking if a request queue is empty and finished."""
# Initially the queue should be empty and finished
assert await rq.is_empty() is True
assert await rq.is_finished() is True

# Add a request
await rq.add_request('https://example.com')
assert await rq.is_empty() is False
assert await rq.is_finished() is False

# Fetch and handle the request
# Fetch the request
request = await rq.fetch_next_request()

assert request is not None

# Queue is empty, because there is no request for fetching
assert await rq.is_empty() is True
# Queue is not finished, because there is a request being processed
assert await rq.is_finished() is False

# Mark the request as handled
await rq.mark_request_as_handled(request)

# Queue should be empty again
# Queue should be empty and finished again
assert await rq.is_empty() is True
assert await rq.is_finished() is True


@pytest.mark.parametrize(
Expand Down
Loading