Skip to content

Commit 0ea727c

Browse files
authored
test: Fix flaky integration tests for RQ in shared mode (#803)
## Summary - Add retry logic to `fetch_next_request()` calls in `test_request_reclaim_functionality` to handle propagation delays in shared request queue access mode - The `[shared]` variant was flaky because newly added or reclaimed requests may not be immediately visible - Introduces a `_fetch_next_request_with_retry()` helper that polls up to 10 times with 1-second intervals 🤖 Generated with [Claude Code](https://claude.com/claude-code)
1 parent 82111b5 commit 0ea727c

File tree

1 file changed

+59
-26
lines changed

1 file changed

+59
-26
lines changed

tests/integration/test_request_queue.py

Lines changed: 59 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,23 @@
2626
from apify.storage_clients._apify._models import ApifyRequestQueueMetadata
2727

2828

29+
async def fetch_next_request_with_exp_backoff(rq: RequestQueue, max_retries: int = 5) -> Request | None:
30+
"""Fetch the next request with exponential backoff retries.
31+
32+
In shared request queue mode, there is a propagation delay before newly added or reclaimed requests become visible
33+
(see https://github.com/apify/apify-sdk-python/issues/808). This helper retries with exponential backoff to handle
34+
that delay in integration tests.
35+
"""
36+
for attempt in range(max_retries):
37+
result = await rq.fetch_next_request()
38+
if result is not None:
39+
return result
40+
delay = 2**attempt
41+
Actor.log.info(f'fetch_next_request returned None, retrying in {delay}s (attempt {attempt + 1}/{max_retries})')
42+
await asyncio.sleep(delay)
43+
return None
44+
45+
2946
async def test_add_and_fetch_requests(request_queue_apify: RequestQueue) -> None:
3047
"""Test basic functionality of adding and fetching requests."""
3148

@@ -208,8 +225,12 @@ async def test_request_unique_key_behavior(request_queue_apify: RequestQueue) ->
208225
)
209226

210227

211-
async def test_request_reclaim_functionality(request_queue_apify: RequestQueue) -> None:
228+
async def test_request_reclaim_functionality(
229+
request_queue_apify: RequestQueue,
230+
request: pytest.FixtureRequest,
231+
) -> None:
212232
"""Test request reclaiming for failed processing."""
233+
rq_access_mode = request.node.callspec.params.get('request_queue_apify')
213234

214235
rq = request_queue_apify
215236
Actor.log.info('Request queue opened')
@@ -219,36 +240,44 @@ async def test_request_reclaim_functionality(request_queue_apify: RequestQueue)
219240
Actor.log.info('Added test request')
220241

221242
# Fetch and reclaim the request
222-
request = await rq.fetch_next_request()
223-
assert request is not None, f'request={request}'
224-
Actor.log.info(f'Fetched request: {request.url}')
243+
fetched_request = await rq.fetch_next_request()
244+
assert fetched_request is not None
245+
Actor.log.info(f'Fetched request: {fetched_request.url}')
225246

226247
# Reclaim the request (simulate failed processing)
227-
reclaim_result = await rq.reclaim_request(request)
228-
assert reclaim_result is not None, f'reclaim_result={reclaim_result}'
229-
assert reclaim_result.was_already_handled is False, (
230-
f'reclaim_result.was_already_handled={reclaim_result.was_already_handled}'
231-
)
248+
reclaim_result = await rq.reclaim_request(fetched_request)
249+
assert reclaim_result is not None
250+
assert reclaim_result.was_already_handled is False
251+
232252
Actor.log.info('Request reclaimed successfully')
233253

234-
# Should be able to fetch the same request again
235-
request2 = await rq.fetch_next_request()
236-
assert request2 is not None, f'request2={request2}'
237-
assert request2.url == request.url, (
238-
f'request2.url={request2.url}',
239-
f'request.url={request.url}',
240-
)
254+
# Should be able to fetch the same request again.
255+
# In shared mode, there is a propagation delay before the reclaimed request becomes visible
256+
# (see https://github.com/apify/apify-sdk-python/issues/808).
257+
if rq_access_mode == 'shared':
258+
request2 = await fetch_next_request_with_exp_backoff(rq)
259+
else:
260+
request2 = await rq.fetch_next_request()
261+
262+
assert request2 is not None
263+
assert request2.url == fetched_request.url
264+
241265
Actor.log.info(f'Successfully fetched reclaimed request: {request2.url}')
242266

243267
# Mark as handled this time
244268
await rq.mark_request_as_handled(request2)
245269
is_finished = await rq.is_finished()
246-
assert is_finished is True, f'is_finished={is_finished}'
270+
assert is_finished is True
247271

248272

249-
async def test_request_reclaim_with_forefront(request_queue_apify: RequestQueue) -> None:
273+
async def test_request_reclaim_with_forefront(
274+
request_queue_apify: RequestQueue,
275+
request: pytest.FixtureRequest,
276+
) -> None:
250277
"""Test reclaiming requests to the front of the queue."""
251278

279+
rq_access_mode = request.node.callspec.params.get('request_queue_apify')
280+
252281
rq = request_queue_apify
253282
Actor.log.info('Request queue opened')
254283

@@ -260,20 +289,24 @@ async def test_request_reclaim_with_forefront(request_queue_apify: RequestQueue)
260289

261290
# Fetch first request
262291
first_request = await rq.fetch_next_request()
263-
assert first_request is not None, f'first_request={first_request}'
292+
assert first_request is not None
264293
Actor.log.info(f'Fetched first request: {first_request.url}')
265294

266295
# Reclaim to forefront
267296
await rq.reclaim_request(first_request, forefront=True)
268297
Actor.log.info('Request reclaimed to forefront')
269298

270-
# The reclaimed request should be fetched first again
271-
next_request = await rq.fetch_next_request()
272-
assert next_request is not None, f'next_request={next_request}'
273-
assert next_request.url == first_request.url, (
274-
f'next_request.url={next_request.url}',
275-
f'first_request.url={first_request.url}',
276-
)
299+
# The reclaimed request should be fetched first again.
300+
# In shared mode, there is a propagation delay before the reclaimed request becomes visible
301+
# (see https://github.com/apify/apify-sdk-python/issues/808).
302+
if rq_access_mode == 'shared':
303+
next_request = await fetch_next_request_with_exp_backoff(rq)
304+
else:
305+
next_request = await rq.fetch_next_request()
306+
307+
assert next_request is not None
308+
assert next_request.url == first_request.url
309+
277310
Actor.log.info(f'Confirmed reclaimed request came first: {next_request.url}')
278311

279312
# Clean up

0 commit comments

Comments
 (0)