Skip to content

Commit bd7a715

Browse files
vdusekclaude
andauthored
test: replace fixed sleep with polling for RQ stats assertions (#864)
## Summary - Added `poll_until_condition()` helper to `tests/integration/_utils.py` — polls an async callable until a condition is met or a 60-second timeout expires (5-second intervals). - Replaced `await asyncio.sleep(10)` + single-fetch patterns in 7 request queue integration tests with `poll_until_condition`, fixing intermittent failures where 10 s was not enough for Apify's eventually-consistent stats API to propagate. The following tests were intermittently failing in CI due to this timing issue: - `test_request_queue_metadata_another_client[single]` - `test_crawler_run_request_queue_variant_stats[Full rq client]` - `test_request_queue_has_stats[shared]` - `test_request_queue_deduplication_unprocessed_requests[shared]` - `test_request_queue_deduplication_use_extended_unique_key[single]` Failed CI runs that triggered this fix: - https://github.com/apify/apify-sdk-python/actions/runs/24509008169/job/71634833798 (Python 3.10 — 4 failures) - https://github.com/apify/apify-sdk-python/actions/runs/24509008169/job/71634833797 (Python 3.14 — 1 failure) Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 76559af commit bd7a715

2 files changed

Lines changed: 85 additions & 30 deletions

File tree

tests/integration/_utils.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import time
45
from typing import TYPE_CHECKING, Literal, TypeVar
56

67
from crawlee._utils.crypto import crypto_random_object_id
@@ -48,6 +49,32 @@ async def call_with_exp_backoff(
4849
raise ValueError(f'Invalid rq_access_mode: {rq_access_mode}')
4950

5051

52+
async def poll_until_condition(
53+
fn: Callable[[], Awaitable[T]],
54+
condition: Callable[[T], bool],
55+
*,
56+
timeout: float = 60,
57+
poll_interval: float = 5,
58+
) -> T:
59+
"""Poll `fn` until `condition(result)` is True or the timeout expires.
60+
61+
Polls `fn` at `poll_interval`-second intervals until `condition` is satisfied or `timeout` seconds have elapsed.
62+
Returns the last polled result regardless of whether the condition was met.
63+
64+
Use this instead of a fixed `asyncio.sleep` when waiting for eventually-consistent API state (e.g. request queue
65+
stats) that may take a variable amount of time to propagate.
66+
"""
67+
deadline = time.monotonic() + timeout
68+
result = await fn()
69+
while not condition(result):
70+
remaining = deadline - time.monotonic()
71+
if remaining <= 0:
72+
break
73+
await asyncio.sleep(min(poll_interval, remaining))
74+
result = await fn()
75+
return result
76+
77+
5178
def generate_unique_resource_name(label: str) -> str:
5279
"""Generates a unique resource name, which will contain the given label."""
5380
name_template = 'python-sdk-tests-{}-generated-{}'

tests/integration/test_request_queue.py

Lines changed: 58 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from crawlee import service_locator
1313
from crawlee.crawlers import BasicCrawler
1414

15-
from ._utils import call_with_exp_backoff, generate_unique_resource_name
15+
from ._utils import call_with_exp_backoff, generate_unique_resource_name, poll_until_condition
1616
from apify import Actor, Request
1717
from apify.storage_clients import ApifyStorageClient
1818
from apify.storage_clients._apify import ApifyRequestQueueClient
@@ -856,10 +856,9 @@ async def test_request_queue_metadata_another_client(
856856
api_client = apify_client_async.request_queue(request_queue_id=rq.id, client_key=None)
857857
await api_client.add_request(Request.from_url('http://example.com/1').model_dump(by_alias=True, exclude={'id'}))
858858

859-
# Wait to be sure that the API has updated the global metadata
860-
await asyncio.sleep(10)
861-
862-
assert (await rq.get_metadata()).total_request_count == 1
859+
# Poll until the API has propagated the metadata change.
860+
metadata = await poll_until_condition(rq.get_metadata, lambda m: m.total_request_count >= 1)
861+
assert metadata.total_request_count == 1
863862

864863

865864
async def test_request_queue_had_multiple_clients(
@@ -950,12 +949,18 @@ async def default_handler(context: BasicCrawlingContext) -> None:
950949
assert crawler.statistics.state.requests_finished == requests
951950

952951
try:
953-
# Check the request queue stats
954-
await asyncio.sleep(10) # Wait to be sure that metadata are updated
952+
# Poll until request queue stats are propagated by the API.
953+
expected_write_count = requests * expected_write_count_per_request
954+
955+
async def _get_rq_metadata() -> ApifyRequestQueueMetadata:
956+
return cast('ApifyRequestQueueMetadata', await rq.get_metadata())
955957

956-
metadata = cast('ApifyRequestQueueMetadata', await rq.get_metadata())
958+
metadata = await poll_until_condition(
959+
_get_rq_metadata,
960+
lambda m: m.stats.write_count >= expected_write_count,
961+
)
957962
Actor.log.info(f'{metadata.stats=}')
958-
assert metadata.stats.write_count == requests * expected_write_count_per_request
963+
assert metadata.stats.write_count == expected_write_count
959964

960965
finally:
961966
await rq.drop()
@@ -1009,13 +1014,16 @@ async def test_request_queue_has_stats(request_queue_apify: RequestQueue) -> Non
10091014

10101015
await rq.add_requests([Request.from_url(f'http://example.com/{i}') for i in range(add_request_count)])
10111016

1012-
# Wait for stats to become stable
1013-
await asyncio.sleep(10)
1017+
# Poll until stats are propagated by the API.
1018+
async def _get_rq_metadata() -> ApifyRequestQueueMetadata:
1019+
return cast('ApifyRequestQueueMetadata', await rq.get_metadata())
10141020

1015-
metadata = await rq.get_metadata()
1021+
apify_metadata = await poll_until_condition(
1022+
_get_rq_metadata,
1023+
lambda m: m.stats.write_count >= add_request_count,
1024+
)
10161025

1017-
assert hasattr(metadata, 'stats')
1018-
apify_metadata = cast('ApifyRequestQueueMetadata', metadata)
1026+
assert hasattr(apify_metadata, 'stats')
10191027
assert apify_metadata.stats.write_count == add_request_count
10201028

10211029

@@ -1153,10 +1161,15 @@ def return_unprocessed_requests(requests: list[dict], *_: Any, **__: Any) -> dic
11531161
# This will succeed.
11541162
await request_queue_apify.add_requests(['http://example.com/1'])
11551163

1156-
await asyncio.sleep(10) # Wait to be sure that metadata are updated
1157-
_rq = await rq_client.get()
1158-
assert _rq
1159-
stats_after = _rq.get('stats', {})
1164+
# Poll until stats reflect the successful write.
1165+
async def _get_rq_stats() -> dict:
1166+
result = await rq_client.get()
1167+
return (result or {}).get('stats', {})
1168+
1169+
stats_after = await poll_until_condition(
1170+
_get_rq_stats,
1171+
lambda s: s.get('writeCount', 0) - stats_before.get('writeCount', 0) >= 1,
1172+
)
11601173
Actor.log.info(stats_after)
11611174

11621175
assert (stats_after['writeCount'] - stats_before['writeCount']) == 1
@@ -1256,10 +1269,15 @@ async def test_request_queue_deduplication(
12561269
await rq.add_request(request1)
12571270
await rq.add_request(request2)
12581271

1259-
await asyncio.sleep(10) # Wait to be sure that metadata are updated
1260-
_rq = await rq_client.get()
1261-
assert _rq
1262-
stats_after = _rq.get('stats', {})
1272+
# Poll until stats reflect the write.
1273+
async def _get_rq_stats() -> dict:
1274+
result = await rq_client.get()
1275+
return (result or {}).get('stats', {})
1276+
1277+
stats_after = await poll_until_condition(
1278+
_get_rq_stats,
1279+
lambda s: s.get('writeCount', 0) - stats_before.get('writeCount', 0) >= 1,
1280+
)
12631281

12641282
assert (stats_after['writeCount'] - stats_before['writeCount']) == 1
12651283

@@ -1283,10 +1301,15 @@ async def test_request_queue_deduplication_use_extended_unique_key(
12831301
await rq.add_request(request1)
12841302
await rq.add_request(request2)
12851303

1286-
await asyncio.sleep(10) # Wait to be sure that metadata are updated
1287-
_rq = await rq_client.get()
1288-
assert _rq
1289-
stats_after = _rq.get('stats', {})
1304+
# Poll until stats reflect both writes.
1305+
async def _get_rq_stats() -> dict:
1306+
result = await rq_client.get()
1307+
return (result or {}).get('stats', {})
1308+
1309+
stats_after = await poll_until_condition(
1310+
_get_rq_stats,
1311+
lambda s: s.get('writeCount', 0) - stats_before.get('writeCount', 0) >= 2,
1312+
)
12901313

12911314
assert (stats_after['writeCount'] - stats_before['writeCount']) == 2
12921315

@@ -1316,10 +1339,15 @@ async def add_requests_worker() -> None:
13161339
add_requests_workers = [asyncio.create_task(add_requests_worker()) for _ in range(worker_count)]
13171340
await asyncio.gather(*add_requests_workers)
13181341

1319-
await asyncio.sleep(10) # Wait to be sure that metadata are updated
1320-
_rq = await rq_client.get()
1321-
assert _rq
1322-
stats_after = _rq.get('stats', {})
1342+
# Poll until stats reflect all written requests.
1343+
async def _get_rq_stats() -> dict:
1344+
result = await rq_client.get()
1345+
return (result or {}).get('stats', {})
1346+
1347+
stats_after = await poll_until_condition(
1348+
_get_rq_stats,
1349+
lambda s: s.get('writeCount', 0) - stats_before.get('writeCount', 0) >= len(requests),
1350+
)
13231351

13241352
assert (stats_after['writeCount'] - stats_before['writeCount']) == len(requests)
13251353

0 commit comments

Comments
 (0)