Skip to content

Commit b0b2dac

Browse files
vdusekclaude
andauthored
test: fix flaky request queue integration tests (#665)
## Summary - Replace fixed 1-second sleeps with retry polling (up to 5 attempts) for eventual consistency in request queue integration tests - [Failing CI run](https://github.com/apify/apify-client-python/actions/runs/22895735069/job/66429223983?pr=664) Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent c0d3932 commit b0b2dac

File tree

1 file changed

+91
-48
lines changed

1 file changed

+91
-48
lines changed

tests/integration/test_request_queue.py

Lines changed: 91 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ async def test_request_queue_collection_get_or_create(client: ApifyClient | Apif
6666
await maybe_await(client.request_queue(rq.id).delete())
6767

6868

69-
async def test_request_queue_lock(client: ApifyClient | ApifyClientAsync) -> None:
69+
async def test_request_queue_lock(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None:
7070
result = await maybe_await(client.request_queues().get_or_create(name=get_random_resource_name('queue')))
7171
created_rq = cast('RequestQueue', result)
7272
rq = client.request_queue(created_rq.id, client_key=get_random_string(10))
@@ -78,8 +78,17 @@ async def test_request_queue_lock(client: ApifyClient | ApifyClientAsync) -> Non
7878
rq.add_request({'url': f'http://test-lock.com/{i}', 'uniqueKey': f'http://test-lock.com/{i}'})
7979
)
8080

81-
result = await maybe_await(rq.list_and_lock_head(limit=10, lock_duration=timedelta(seconds=10)))
82-
get_head_and_lock_response = cast('LockedRequestQueueHead', result)
81+
# Poll until all requests are available for locking (eventual consistency)
82+
get_head_and_lock_response: LockedRequestQueueHead | None = None
83+
for _ in range(5):
84+
await maybe_sleep(1, is_async=is_async)
85+
result = await maybe_await(rq.list_and_lock_head(limit=10, lock_duration=timedelta(seconds=10)))
86+
get_head_and_lock_response = cast('LockedRequestQueueHead', result)
87+
if len(get_head_and_lock_response.items) == 10:
88+
break
89+
90+
assert get_head_and_lock_response is not None
91+
assert len(get_head_and_lock_response.items) == 10
8392

8493
for locked_request in get_head_and_lock_response.items:
8594
assert locked_request.lock_expires_at is not None
@@ -219,12 +228,15 @@ async def test_request_queue_list_head(client: ApifyClient | ApifyClientAsync, *
219228
)
220229
)
221230

222-
# Wait briefly for eventual consistency
223-
await maybe_sleep(1, is_async=is_async)
231+
# Poll until requests are available (eventual consistency)
232+
head_response: RequestQueueHead | None = None
233+
for _ in range(5):
234+
await maybe_sleep(1, is_async=is_async)
235+
result = await maybe_await(rq_client.list_head(limit=3))
236+
head_response = cast('RequestQueueHead', result)
237+
if len(head_response.items) == 3:
238+
break
224239

225-
# List head
226-
result = await maybe_await(rq_client.list_head(limit=3))
227-
head_response = cast('RequestQueueHead', result)
228240
assert head_response is not None
229241
assert len(head_response.items) == 3
230242
finally:
@@ -251,12 +263,15 @@ async def test_request_queue_list_requests(client: ApifyClient | ApifyClientAsyn
251263
)
252264
)
253265

254-
# Wait briefly for eventual consistency
255-
await maybe_sleep(1, is_async=is_async)
266+
# Poll until all requests are available (eventual consistency)
267+
list_response: ListOfRequests | None = None
268+
for _ in range(5):
269+
await maybe_sleep(1, is_async=is_async)
270+
result = await maybe_await(rq_client.list_requests())
271+
list_response = cast('ListOfRequests', result)
272+
if len(list_response.items) == 5:
273+
break
256274

257-
# List all requests
258-
result = await maybe_await(rq_client.list_requests())
259-
list_response = cast('ListOfRequests', result)
260275
assert list_response is not None
261276
assert len(list_response.items) == 5
262277
finally:
@@ -320,12 +335,16 @@ async def test_request_queue_batch_add_requests(client: ApifyClient | ApifyClien
320335
assert len(batch_response.processed_requests) == 10
321336
assert len(batch_response.unprocessed_requests) == 0
322337

323-
# Wait briefly for eventual consistency
324-
await maybe_sleep(1, is_async=is_async)
338+
# Poll until all requests are available (eventual consistency)
339+
list_response: ListOfRequests | None = None
340+
for _ in range(5):
341+
await maybe_sleep(1, is_async=is_async)
342+
result = await maybe_await(rq_client.list_requests())
343+
list_response = cast('ListOfRequests', result)
344+
if len(list_response.items) == 10:
345+
break
325346

326-
# Verify requests were added
327-
result = await maybe_await(rq_client.list_requests())
328-
list_response = cast('ListOfRequests', result)
347+
assert list_response is not None
329348
assert len(list_response.items) == 10
330349
finally:
331350
await maybe_await(rq_client.delete())
@@ -351,12 +370,17 @@ async def test_request_queue_batch_delete_requests(client: ApifyClient | ApifyCl
351370
)
352371
)
353372

354-
# Wait briefly for eventual consistency
355-
await maybe_sleep(1, is_async=is_async)
373+
# Poll until all requests are available (eventual consistency)
374+
list_response: ListOfRequests | None = None
375+
for _ in range(5):
376+
await maybe_sleep(1, is_async=is_async)
377+
result = await maybe_await(rq_client.list_requests())
378+
list_response = cast('ListOfRequests', result)
379+
if len(list_response.items) == 10:
380+
break
356381

357-
# List requests to get IDs
358-
result = await maybe_await(rq_client.list_requests())
359-
list_response = cast('ListOfRequests', result)
382+
assert list_response is not None
383+
assert len(list_response.items) == 10
360384
requests_to_delete = [{'uniqueKey': item.unique_key} for item in list_response.items[:5]]
361385

362386
# Batch delete
@@ -365,12 +389,16 @@ async def test_request_queue_batch_delete_requests(client: ApifyClient | ApifyCl
365389
assert delete_response is not None
366390
assert len(delete_response.processed_requests) == 5
367391

368-
# Wait briefly
369-
await maybe_sleep(1, is_async=is_async)
392+
# Poll until deletions are reflected (eventual consistency)
393+
remaining: ListOfRequests | None = None
394+
for _ in range(5):
395+
await maybe_sleep(1, is_async=is_async)
396+
result = await maybe_await(rq_client.list_requests())
397+
remaining = cast('ListOfRequests', result)
398+
if len(remaining.items) == 5:
399+
break
370400

371-
# Verify remaining requests
372-
result = await maybe_await(rq_client.list_requests())
373-
remaining = cast('ListOfRequests', result)
401+
assert remaining is not None
374402
assert len(remaining.items) == 5
375403
finally:
376404
await maybe_await(rq_client.delete())
@@ -405,12 +433,15 @@ async def test_request_queue_list_and_lock_head(client: ApifyClient | ApifyClien
405433
for i in range(5):
406434
await maybe_await(rq_client.add_request({'url': f'https://example.com/lock-{i}', 'uniqueKey': f'lock-{i}'}))
407435

408-
# Wait briefly for eventual consistency
409-
await maybe_sleep(1, is_async=is_async)
436+
# Poll until requests are available for locking (eventual consistency)
437+
lock_response: LockedRequestQueueHead | None = None
438+
for _ in range(5):
439+
await maybe_sleep(1, is_async=is_async)
440+
result = await maybe_await(rq_client.list_and_lock_head(limit=3, lock_duration=timedelta(seconds=60)))
441+
lock_response = cast('LockedRequestQueueHead', result)
442+
if len(lock_response.items) == 3:
443+
break
410444

411-
# Lock head requests
412-
result = await maybe_await(rq_client.list_and_lock_head(limit=3, lock_duration=timedelta(seconds=60)))
413-
lock_response = cast('LockedRequestQueueHead', result)
414445
assert lock_response is not None
415446
assert len(lock_response.items) == 3
416447

@@ -434,12 +465,16 @@ async def test_request_queue_prolong_request_lock(client: ApifyClient | ApifyCli
434465
# Add a request
435466
await maybe_await(rq_client.add_request({'url': 'https://example.com/prolong', 'uniqueKey': 'prolong-test'}))
436467

437-
# Wait briefly for eventual consistency
438-
await maybe_sleep(1, is_async=is_async)
468+
# Poll until the request is available for locking (eventual consistency)
469+
lock_response: LockedRequestQueueHead | None = None
470+
for _ in range(5):
471+
await maybe_sleep(1, is_async=is_async)
472+
result = await maybe_await(rq_client.list_and_lock_head(limit=1, lock_duration=timedelta(seconds=60)))
473+
lock_response = cast('LockedRequestQueueHead', result)
474+
if len(lock_response.items) == 1:
475+
break
439476

440-
# Lock the request
441-
result = await maybe_await(rq_client.list_and_lock_head(limit=1, lock_duration=timedelta(seconds=60)))
442-
lock_response = cast('LockedRequestQueueHead', result)
477+
assert lock_response is not None
443478
assert len(lock_response.items) == 1
444479
locked_request = lock_response.items[0]
445480
original_lock_expires = locked_request.lock_expires_at
@@ -468,12 +503,16 @@ async def test_request_queue_delete_request_lock(client: ApifyClient | ApifyClie
468503
# Add a request
469504
await maybe_await(rq_client.add_request({'url': 'https://example.com/unlock', 'uniqueKey': 'unlock-test'}))
470505

471-
# Wait briefly for eventual consistency
472-
await maybe_sleep(1, is_async=is_async)
506+
# Poll until the request is available for locking (eventual consistency)
507+
lock_response: LockedRequestQueueHead | None = None
508+
for _ in range(5):
509+
await maybe_sleep(1, is_async=is_async)
510+
result = await maybe_await(rq_client.list_and_lock_head(limit=1, lock_duration=timedelta(seconds=60)))
511+
lock_response = cast('LockedRequestQueueHead', result)
512+
if len(lock_response.items) == 1:
513+
break
473514

474-
# Lock the request
475-
result = await maybe_await(rq_client.list_and_lock_head(limit=1, lock_duration=timedelta(seconds=60)))
476-
lock_response = cast('LockedRequestQueueHead', result)
515+
assert lock_response is not None
477516
assert len(lock_response.items) == 1
478517
locked_request = lock_response.items[0]
479518

@@ -503,12 +542,16 @@ async def test_request_queue_unlock_requests(client: ApifyClient | ApifyClientAs
503542
rq_client.add_request({'url': f'https://example.com/unlock-{i}', 'uniqueKey': f'unlock-{i}'})
504543
)
505544

506-
# Wait briefly for eventual consistency
507-
await maybe_sleep(1, is_async=is_async)
545+
# Poll until requests are available for locking (eventual consistency)
546+
lock_response: LockedRequestQueueHead | None = None
547+
for _ in range(5):
548+
await maybe_sleep(1, is_async=is_async)
549+
result = await maybe_await(rq_client.list_and_lock_head(limit=3, lock_duration=timedelta(seconds=60)))
550+
lock_response = cast('LockedRequestQueueHead', result)
551+
if len(lock_response.items) == 3:
552+
break
508553

509-
# Lock some requests
510-
result = await maybe_await(rq_client.list_and_lock_head(limit=3, lock_duration=timedelta(seconds=60)))
511-
lock_response = cast('LockedRequestQueueHead', result)
554+
assert lock_response is not None
512555
assert len(lock_response.items) == 3
513556

514557
# Unlock all requests

0 commit comments

Comments
 (0)