Skip to content

Commit 1ac00b2

Browse files
vdusekclaude
andauthored
test: Fix flaky is_finished checks in shared RQ integration tests (#813)
## Summary - Introduced a generic `call_with_exp_backoff` helper in `tests/integration/_utils.py` that retries any async callable with exponential backoff until it returns a truthy value, replacing the previous `fetch_next_request_with_exp_backoff` - Applied the helper to all `is_finished()` checks in shared request queue mode across 9 integration tests to handle API propagation delays - Renamed local variables that shadowed `request: pytest.FixtureRequest` to fix type checker errors ## Test plan - [x] Integration tests pass in CI (both `single` and `shared` RQ modes) - [x] No more flaky `is_finished` failures in shared mode ## Motivation One of the failed run: https://github.com/apify/apify-sdk-python/actions/runs/22375456542/job/64764491226?pr=812 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 082afeb commit 1ac00b2

File tree

2 files changed

+125
-51
lines changed

2 files changed

+125
-51
lines changed

tests/integration/_utils.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,39 @@
11
from __future__ import annotations
22

3+
import asyncio
4+
from typing import TYPE_CHECKING, TypeVar
5+
36
from crawlee._utils.crypto import crypto_random_object_id
47

8+
from apify import Actor
9+
10+
if TYPE_CHECKING:
11+
from collections.abc import Awaitable, Callable
12+
13+
T = TypeVar('T')
14+
15+
16+
async def call_with_exp_backoff(fn: Callable[[], Awaitable[T]], *, max_retries: int = 3) -> T | None:
17+
"""Call an async callable with exponential backoff retries until it returns a truthy value.
18+
19+
In shared request queue mode, there is a propagation delay before newly added, reclaimed, or handled requests
20+
become visible in the API (see https://github.com/apify/apify-sdk-python/issues/808). This helper retries with
21+
exponential backoff to handle that delay in integration tests.
22+
"""
23+
result = None
24+
25+
for attempt in range(max_retries):
26+
result = await fn()
27+
28+
if result:
29+
return result
30+
31+
delay = 2**attempt
32+
Actor.log.info(f'{fn} returned {result!r}, retrying in {delay}s (attempt {attempt + 1}/{max_retries})')
33+
await asyncio.sleep(delay)
34+
35+
return result
36+
537

638
def generate_unique_resource_name(label: str) -> str:
739
"""Generates a unique resource name, which will contain the given label."""

tests/integration/test_request_queue.py

Lines changed: 93 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from crawlee import service_locator
1313
from crawlee.crawlers import BasicCrawler
1414

15-
from ._utils import generate_unique_resource_name
15+
from ._utils import call_with_exp_backoff, generate_unique_resource_name
1616
from apify import Actor, Request
1717
from apify.storage_clients import ApifyStorageClient
1818
from apify.storage_clients._apify import ApifyRequestQueueClient
@@ -26,25 +26,12 @@
2626
from apify.storage_clients._apify._models import ApifyRequestQueueMetadata
2727

2828

29-
async def fetch_next_request_with_exp_backoff(rq: RequestQueue, max_retries: int = 5) -> Request | None:
30-
"""Fetch the next request with exponential backoff retries.
31-
32-
In shared request queue mode, there is a propagation delay before newly added or reclaimed requests become visible
33-
(see https://github.com/apify/apify-sdk-python/issues/808). This helper retries with exponential backoff to handle
34-
that delay in integration tests.
35-
"""
36-
for attempt in range(max_retries):
37-
result = await rq.fetch_next_request()
38-
if result is not None:
39-
return result
40-
delay = 2**attempt
41-
Actor.log.info(f'fetch_next_request returned None, retrying in {delay}s (attempt {attempt + 1}/{max_retries})')
42-
await asyncio.sleep(delay)
43-
return None
44-
45-
46-
async def test_add_and_fetch_requests(request_queue_apify: RequestQueue) -> None:
29+
async def test_add_and_fetch_requests(
30+
request_queue_apify: RequestQueue,
31+
request: pytest.FixtureRequest,
32+
) -> None:
4733
"""Test basic functionality of adding and fetching requests."""
34+
rq_access_mode = request.node.callspec.params.get('request_queue_apify')
4835

4936
desired_request_count = 100
5037
Actor.log.info('Opening request queue...')
@@ -70,12 +57,19 @@ async def test_add_and_fetch_requests(request_queue_apify: RequestQueue) -> None
7057
f'desired_request_count={desired_request_count}',
7158
)
7259
Actor.log.info('Waiting for queue to be finished...')
73-
is_finished = await rq.is_finished()
60+
if rq_access_mode == 'shared':
61+
is_finished = await call_with_exp_backoff(rq.is_finished)
62+
else:
63+
is_finished = await rq.is_finished()
7464
assert is_finished is True, f'is_finished={is_finished}'
7565

7666

77-
async def test_add_requests_in_batches(request_queue_apify: RequestQueue) -> None:
67+
async def test_add_requests_in_batches(
68+
request_queue_apify: RequestQueue,
69+
request: pytest.FixtureRequest,
70+
) -> None:
7871
"""Test adding multiple requests in a single batch operation."""
72+
rq_access_mode = request.node.callspec.params.get('request_queue_apify')
7973

8074
desired_request_count = 100
8175
rq = request_queue_apify
@@ -101,12 +95,19 @@ async def test_add_requests_in_batches(request_queue_apify: RequestQueue) -> Non
10195
f'handled_request_count={handled_request_count}',
10296
f'desired_request_count={desired_request_count}',
10397
)
104-
is_finished = await rq.is_finished()
98+
if rq_access_mode == 'shared':
99+
is_finished = await call_with_exp_backoff(rq.is_finished)
100+
else:
101+
is_finished = await rq.is_finished()
105102
assert is_finished is True, f'is_finished={is_finished}'
106103

107104

108-
async def test_add_non_unique_requests_in_batch(request_queue_apify: RequestQueue) -> None:
105+
async def test_add_non_unique_requests_in_batch(
106+
request_queue_apify: RequestQueue,
107+
request: pytest.FixtureRequest,
108+
) -> None:
109109
"""Test adding requests with duplicate unique keys in batch."""
110+
rq_access_mode = request.node.callspec.params.get('request_queue_apify')
110111

111112
desired_request_count = 100
112113
rq = request_queue_apify
@@ -137,7 +138,10 @@ async def test_add_non_unique_requests_in_batch(request_queue_apify: RequestQueu
137138
f'handled_request_count={handled_request_count}',
138139
f'expected_count={expected_count}',
139140
)
140-
is_finished = await rq.is_finished()
141+
if rq_access_mode == 'shared':
142+
is_finished = await call_with_exp_backoff(rq.is_finished)
143+
else:
144+
is_finished = await rq.is_finished()
141145
Actor.log.info(f'Processed {handled_request_count}/{expected_count} requests, finished: {is_finished}')
142146
assert is_finished is True, f'is_finished={is_finished}'
143147

@@ -255,7 +259,7 @@ async def test_request_reclaim_functionality(
255259
# In shared mode, there is a propagation delay before the reclaimed request becomes visible
256260
# (see https://github.com/apify/apify-sdk-python/issues/808).
257261
if rq_access_mode == 'shared':
258-
request2 = await fetch_next_request_with_exp_backoff(rq)
262+
request2 = await call_with_exp_backoff(rq.fetch_next_request)
259263
else:
260264
request2 = await rq.fetch_next_request()
261265

@@ -266,7 +270,10 @@ async def test_request_reclaim_functionality(
266270

267271
# Mark as handled this time
268272
await rq.mark_request_as_handled(request2)
269-
is_finished = await rq.is_finished()
273+
if rq_access_mode == 'shared':
274+
is_finished = await call_with_exp_backoff(rq.is_finished)
275+
else:
276+
is_finished = await rq.is_finished()
270277
assert is_finished is True
271278

272279

@@ -300,7 +307,7 @@ async def test_request_reclaim_with_forefront(
300307
# In shared mode, there is a propagation delay before the reclaimed request becomes visible
301308
# (see https://github.com/apify/apify-sdk-python/issues/808).
302309
if rq_access_mode == 'shared':
303-
next_request = await fetch_next_request_with_exp_backoff(rq)
310+
next_request = await call_with_exp_backoff(rq.fetch_next_request)
304311
else:
305312
next_request = await rq.fetch_next_request()
306313

@@ -427,8 +434,12 @@ async def test_metadata_tracking(request_queue_apify: RequestQueue) -> None:
427434
assert final_handled == 3, f'final_handled={final_handled}'
428435

429436

430-
async def test_batch_operations_performance(request_queue_apify: RequestQueue) -> None:
437+
async def test_batch_operations_performance(
438+
request_queue_apify: RequestQueue,
439+
request: pytest.FixtureRequest,
440+
) -> None:
431441
"""Test batch operations vs individual operations."""
442+
rq_access_mode = request.node.callspec.params.get('request_queue_apify')
432443

433444
rq = request_queue_apify
434445
Actor.log.info('Request queue opened')
@@ -459,12 +470,20 @@ async def test_batch_operations_performance(request_queue_apify: RequestQueue) -
459470
Actor.log.info(f'Processing completed. Total processed: {processed_count}')
460471
assert processed_count == 50, f'processed_count={processed_count}'
461472

462-
is_finished = await rq.is_finished()
473+
if rq_access_mode == 'shared':
474+
is_finished = await call_with_exp_backoff(rq.is_finished)
475+
else:
476+
is_finished = await rq.is_finished()
477+
463478
assert is_finished is True, f'is_finished={is_finished}'
464479

465480

466-
async def test_state_consistency(request_queue_apify: RequestQueue) -> None:
481+
async def test_state_consistency(
482+
request_queue_apify: RequestQueue,
483+
request: pytest.FixtureRequest,
484+
) -> None:
467485
"""Test queue state consistency during concurrent operations."""
486+
rq_access_mode = request.node.callspec.params.get('request_queue_apify')
468487

469488
rq = request_queue_apify
470489
Actor.log.info('Request queue opened')
@@ -482,14 +501,14 @@ async def test_state_consistency(request_queue_apify: RequestQueue) -> None:
482501
reclaimed_requests = []
483502

484503
for i in range(5):
485-
request = await rq.fetch_next_request()
486-
if request:
504+
next_request = await rq.fetch_next_request()
505+
if next_request:
487506
if i % 2 == 0: # Process even indices
488-
await rq.mark_request_as_handled(request)
489-
processed_requests.append(request)
507+
await rq.mark_request_as_handled(next_request)
508+
processed_requests.append(next_request)
490509
else: # Reclaim odd indices
491-
await rq.reclaim_request(request)
492-
reclaimed_requests.append(request)
510+
await rq.reclaim_request(next_request)
511+
reclaimed_requests.append(next_request)
493512

494513
Actor.log.info(f'Processed {len(processed_requests)} requests, reclaimed {len(reclaimed_requests)}')
495514

@@ -514,7 +533,10 @@ async def test_state_consistency(request_queue_apify: RequestQueue) -> None:
514533
await rq.mark_request_as_handled(next_request)
515534

516535
Actor.log.info(f'Processed {remaining_count} remaining requests')
517-
is_finished = await rq.is_finished()
536+
if rq_access_mode == 'shared':
537+
is_finished = await call_with_exp_backoff(rq.is_finished)
538+
else:
539+
is_finished = await rq.is_finished()
518540
assert is_finished is True, f'is_finished={is_finished}'
519541

520542

@@ -549,8 +571,12 @@ async def test_empty_rq_behavior(request_queue_apify: RequestQueue) -> None:
549571
assert metadata.pending_request_count == 0, f'metadata.pending_request_count={metadata.pending_request_count}'
550572

551573

552-
async def test_large_batch_operations(request_queue_apify: RequestQueue) -> None:
574+
async def test_large_batch_operations(
575+
request_queue_apify: RequestQueue,
576+
request: pytest.FixtureRequest,
577+
) -> None:
553578
"""Test handling large batches of requests."""
579+
rq_access_mode = request.node.callspec.params.get('request_queue_apify')
554580

555581
rq = request_queue_apify
556582
Actor.log.info('Request queue opened')
@@ -571,18 +597,21 @@ async def test_large_batch_operations(request_queue_apify: RequestQueue) -> None
571597
processed_count = 0
572598

573599
while not await rq.is_empty():
574-
request = await rq.fetch_next_request()
600+
next_request = await rq.fetch_next_request()
575601

576602
# The RQ is_empty should ensure we don't get None
577-
assert request is not None, f'request={request}'
603+
assert next_request is not None, f'next_request={next_request}'
578604

579-
await rq.mark_request_as_handled(request)
605+
await rq.mark_request_as_handled(next_request)
580606
processed_count += 1
581607

582608
Actor.log.info(f'Processing completed. Total processed: {processed_count}')
583609
assert processed_count == 500, f'processed_count={processed_count}'
584610

585-
is_finished = await rq.is_finished()
611+
if rq_access_mode == 'shared':
612+
is_finished = await call_with_exp_backoff(rq.is_finished)
613+
else:
614+
is_finished = await rq.is_finished()
586615
assert is_finished is True, f'is_finished={is_finished}'
587616

588617

@@ -993,26 +1022,33 @@ async def test_request_queue_has_stats(request_queue_apify: RequestQueue) -> Non
9931022
assert apify_metadata.stats.write_count == add_request_count
9941023

9951024

996-
async def test_rq_long_url(request_queue_apify: RequestQueue) -> None:
1025+
async def test_rq_long_url(
1026+
request_queue_apify: RequestQueue,
1027+
request: pytest.FixtureRequest,
1028+
) -> None:
9971029
"""Test handling of requests with long URLs and extended unique keys."""
1030+
rq_access_mode = request.node.callspec.params.get('request_queue_apify')
9981031
rq = request_queue_apify
999-
request = Request.from_url(
1032+
long_url_request = Request.from_url(
10001033
'https://portal.isoss.gov.cz/irj/portal/anonymous/mvrest?path=/eosm-public-offer&officeLabels=%7B%7D&page=1&pageSize=100000&sortColumn=zdatzvsm&sortOrder=-1',
10011034
use_extended_unique_key=True,
10021035
always_enqueue=True,
10031036
)
10041037

1005-
request_id = unique_key_to_request_id(request.unique_key)
1038+
request_id = unique_key_to_request_id(long_url_request.unique_key)
10061039

1007-
processed_request = await rq.add_request(request)
1040+
processed_request = await rq.add_request(long_url_request)
10081041
assert processed_request.id == request_id
10091042

10101043
request_obtained = await rq.fetch_next_request()
10111044
assert request_obtained is not None
10121045

10131046
await rq.mark_request_as_handled(request_obtained)
10141047

1015-
is_finished = await rq.is_finished()
1048+
if rq_access_mode == 'shared':
1049+
is_finished = await call_with_exp_backoff(rq.is_finished)
1050+
else:
1051+
is_finished = await rq.is_finished()
10161052
assert is_finished
10171053

10181054

@@ -1061,18 +1097,24 @@ async def test_force_cloud(
10611097

10621098
async def test_request_queue_is_finished(
10631099
request_queue_apify: RequestQueue,
1100+
request: pytest.FixtureRequest,
10641101
) -> None:
1102+
rq_access_mode = request.node.callspec.params.get('request_queue_apify')
1103+
10651104
await request_queue_apify.add_request(Request.from_url('http://example.com'))
10661105
assert not await request_queue_apify.is_finished()
10671106

1068-
request = await request_queue_apify.fetch_next_request()
1069-
assert request is not None
1107+
fetched = await request_queue_apify.fetch_next_request()
1108+
assert fetched is not None
10701109
assert not await request_queue_apify.is_finished(), (
10711110
'RequestQueue should not be finished unless the request is marked as handled.'
10721111
)
10731112

1074-
await request_queue_apify.mark_request_as_handled(request)
1075-
assert await request_queue_apify.is_finished()
1113+
await request_queue_apify.mark_request_as_handled(fetched)
1114+
if rq_access_mode == 'shared':
1115+
assert await call_with_exp_backoff(request_queue_apify.is_finished)
1116+
else:
1117+
assert await request_queue_apify.is_finished()
10761118

10771119

10781120
async def test_request_queue_deduplication_unprocessed_requests(

0 commit comments

Comments
 (0)