Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@ apify-client = false
apify_fingerprint_datapoints = false
crawlee = false

[tool.uv.sources]
crawlee = { git = "https://github.com/Mantisus/crawlee-python", branch = "queue-client-is-finished" }

# Run tasks with: uv run poe <task>
[tool.poe.tasks]
clean = "rm -rf .coverage .pytest_cache .ruff_cache .ty_cache build dist htmlcov"
Expand Down
4 changes: 4 additions & 0 deletions src/apify/storage_clients/_apify/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,7 @@ async def reclaim_request(
@override
async def is_empty(self) -> bool:
return await self._implementation.is_empty()

@override
async def is_finished(self) -> bool:
return await self._implementation.is_finished()
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,14 @@ async def is_empty(self) -> bool:
# Without the lock the `is_empty` is prone to falsely report True with some low probability race condition.
async with self._fetch_lock:
head = await self._list_head(limit=1)
return len(head.items) == 0 and not self._queue_has_locked_requests
return len(head.items) == 0

async def is_finished(self) -> bool:
"""Specific implementation of this method for the RQ shared access mode."""
if not await self.is_empty():
return False

return not self._queue_has_locked_requests
Comment thread
Mantisus marked this conversation as resolved.
Outdated

async def _get_metadata_estimate(self) -> RequestQueueMetadata:
"""Try to get cached metadata first. If multiple clients, fuse with global metadata.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,11 @@ async def is_empty(self) -> bool:
"""Specific implementation of this method for the RQ single access mode."""
# Without the lock the `is_empty` is prone to falsely report True with some low probability race condition.
Comment thread
vdusek marked this conversation as resolved.
Outdated
await self._ensure_head_is_non_empty()
return not self._head_requests and not self._requests_in_progress
return not self._head_requests

async def is_finished(self) -> bool:
"""Specific implementation of this method for the RQ single access mode."""
return await self.is_empty() and not self._requests_in_progress

async def _ensure_head_is_non_empty(self) -> None:
"""Ensure that the queue head has requests if they are available in the queue."""
Expand Down
38 changes: 38 additions & 0 deletions tests/integration/test_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,44 @@ async def test_empty_rq_behavior(request_queue_apify: RequestQueue, rq_poll_time
assert metadata.pending_request_count == 0, f'metadata.pending_request_count={metadata.pending_request_count}'


async def test_is_empty_and_is_finished(request_queue_apify: RequestQueue, rq_poll_timeout: int) -> None:
Comment thread
vdusek marked this conversation as resolved.
Outdated
"""Test `is_empty` and `is_finished` across the queue lifecycle."""

rq = request_queue_apify
Actor.log.info('Request queue opened')

# Initially the queue is empty and finished.
is_empty = await poll_until_condition(rq.is_empty, timeout=rq_poll_timeout, backoff_factor=2)
is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2)
assert is_empty is True, f'is_empty={is_empty}'
assert is_finished is True, f'is_finished={is_finished}'

# After adding a request it is neither empty nor finished.
await rq.add_request('https://example.com')
is_empty = await poll_until_condition(rq.is_empty, condition=lambda e: e is False, timeout=rq_poll_timeout)
is_finished = await poll_until_condition(rq.is_finished, condition=lambda f: f is False, timeout=rq_poll_timeout)
assert is_empty is False, f'is_empty={is_empty}'
assert is_finished is False, f'is_finished={is_finished}'

# Fetch the request without handling it.
request = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2)
assert request is not None, f'request={request}'

# The queue is empty, because there is no request available for fetching.
is_empty = await poll_until_condition(rq.is_empty, timeout=rq_poll_timeout, backoff_factor=2)
assert is_empty is True, f'is_empty={is_empty}'
# The queue is not finished, because there is a request being processed.
is_finished = await poll_until_condition(rq.is_finished, condition=lambda f: f is False, timeout=rq_poll_timeout)
assert is_finished is False, f'is_finished={is_finished}'

# After marking the request as handled the queue is empty and finished again.
await rq.mark_request_as_handled(request)
is_empty = await poll_until_condition(rq.is_empty, timeout=rq_poll_timeout, backoff_factor=2)
is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2)
assert is_empty is True, f'is_empty={is_empty}'
assert is_finished is True, f'is_finished={is_finished}'


async def test_large_batch_operations(
request_queue_apify: RequestQueue,
rq_poll_timeout: int,
Expand Down
12 changes: 4 additions & 8 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading