1212from crawlee import service_locator
1313from crawlee .crawlers import BasicCrawler
1414
15- from ._utils import generate_unique_resource_name
15+ from ._utils import call_with_exp_backoff , generate_unique_resource_name
1616from apify import Actor , Request
1717from apify .storage_clients import ApifyStorageClient
1818from apify .storage_clients ._apify import ApifyRequestQueueClient
2626 from apify .storage_clients ._apify ._models import ApifyRequestQueueMetadata
2727
2828
29- async def fetch_next_request_with_exp_backoff (rq : RequestQueue , max_retries : int = 5 ) -> Request | None :
30- """Fetch the next request with exponential backoff retries.
31-
32- In shared request queue mode, there is a propagation delay before newly added or reclaimed requests become visible
33- (see https://github.com/apify/apify-sdk-python/issues/808). This helper retries with exponential backoff to handle
34- that delay in integration tests.
35- """
36- for attempt in range (max_retries ):
37- result = await rq .fetch_next_request ()
38- if result is not None :
39- return result
40- delay = 2 ** attempt
41- Actor .log .info (f'fetch_next_request returned None, retrying in { delay } s (attempt { attempt + 1 } /{ max_retries } )' )
42- await asyncio .sleep (delay )
43- return None
44-
45-
46- async def test_add_and_fetch_requests (request_queue_apify : RequestQueue ) -> None :
29+ async def test_add_and_fetch_requests (
30+ request_queue_apify : RequestQueue ,
31+ request : pytest .FixtureRequest ,
32+ ) -> None :
4733 """Test basic functionality of adding and fetching requests."""
34+ rq_access_mode = request .node .callspec .params .get ('request_queue_apify' )
4835
4936 desired_request_count = 100
5037 Actor .log .info ('Opening request queue...' )
@@ -70,12 +57,19 @@ async def test_add_and_fetch_requests(request_queue_apify: RequestQueue) -> None
7057 f'desired_request_count={ desired_request_count } ' ,
7158 )
7259 Actor .log .info ('Waiting for queue to be finished...' )
73- is_finished = await rq .is_finished ()
60+ if rq_access_mode == 'shared' :
61+ is_finished = await call_with_exp_backoff (rq .is_finished )
62+ else :
63+ is_finished = await rq .is_finished ()
7464 assert is_finished is True , f'is_finished={ is_finished } '
7565
7666
77- async def test_add_requests_in_batches (request_queue_apify : RequestQueue ) -> None :
67+ async def test_add_requests_in_batches (
68+ request_queue_apify : RequestQueue ,
69+ request : pytest .FixtureRequest ,
70+ ) -> None :
7871 """Test adding multiple requests in a single batch operation."""
72+ rq_access_mode = request .node .callspec .params .get ('request_queue_apify' )
7973
8074 desired_request_count = 100
8175 rq = request_queue_apify
@@ -101,12 +95,19 @@ async def test_add_requests_in_batches(request_queue_apify: RequestQueue) -> Non
10195 f'handled_request_count={ handled_request_count } ' ,
10296 f'desired_request_count={ desired_request_count } ' ,
10397 )
104- is_finished = await rq .is_finished ()
98+ if rq_access_mode == 'shared' :
99+ is_finished = await call_with_exp_backoff (rq .is_finished )
100+ else :
101+ is_finished = await rq .is_finished ()
105102 assert is_finished is True , f'is_finished={ is_finished } '
106103
107104
108- async def test_add_non_unique_requests_in_batch (request_queue_apify : RequestQueue ) -> None :
105+ async def test_add_non_unique_requests_in_batch (
106+ request_queue_apify : RequestQueue ,
107+ request : pytest .FixtureRequest ,
108+ ) -> None :
109109 """Test adding requests with duplicate unique keys in batch."""
110+ rq_access_mode = request .node .callspec .params .get ('request_queue_apify' )
110111
111112 desired_request_count = 100
112113 rq = request_queue_apify
@@ -137,7 +138,10 @@ async def test_add_non_unique_requests_in_batch(request_queue_apify: RequestQueu
137138 f'handled_request_count={ handled_request_count } ' ,
138139 f'expected_count={ expected_count } ' ,
139140 )
140- is_finished = await rq .is_finished ()
141+ if rq_access_mode == 'shared' :
142+ is_finished = await call_with_exp_backoff (rq .is_finished )
143+ else :
144+ is_finished = await rq .is_finished ()
141145 Actor .log .info (f'Processed { handled_request_count } /{ expected_count } requests, finished: { is_finished } ' )
142146 assert is_finished is True , f'is_finished={ is_finished } '
143147
@@ -255,7 +259,7 @@ async def test_request_reclaim_functionality(
255259 # In shared mode, there is a propagation delay before the reclaimed request becomes visible
256260 # (see https://github.com/apify/apify-sdk-python/issues/808).
257261 if rq_access_mode == 'shared' :
258- request2 = await fetch_next_request_with_exp_backoff (rq )
262+ request2 = await call_with_exp_backoff (rq . fetch_next_request )
259263 else :
260264 request2 = await rq .fetch_next_request ()
261265
@@ -266,7 +270,10 @@ async def test_request_reclaim_functionality(
266270
267271 # Mark as handled this time
268272 await rq .mark_request_as_handled (request2 )
269- is_finished = await rq .is_finished ()
273+ if rq_access_mode == 'shared' :
274+ is_finished = await call_with_exp_backoff (rq .is_finished )
275+ else :
276+ is_finished = await rq .is_finished ()
270277 assert is_finished is True
271278
272279
@@ -300,7 +307,7 @@ async def test_request_reclaim_with_forefront(
300307 # In shared mode, there is a propagation delay before the reclaimed request becomes visible
301308 # (see https://github.com/apify/apify-sdk-python/issues/808).
302309 if rq_access_mode == 'shared' :
303- next_request = await fetch_next_request_with_exp_backoff (rq )
310+ next_request = await call_with_exp_backoff (rq . fetch_next_request )
304311 else :
305312 next_request = await rq .fetch_next_request ()
306313
@@ -427,8 +434,12 @@ async def test_metadata_tracking(request_queue_apify: RequestQueue) -> None:
427434 assert final_handled == 3 , f'final_handled={ final_handled } '
428435
429436
430- async def test_batch_operations_performance (request_queue_apify : RequestQueue ) -> None :
437+ async def test_batch_operations_performance (
438+ request_queue_apify : RequestQueue ,
439+ request : pytest .FixtureRequest ,
440+ ) -> None :
431441 """Test batch operations vs individual operations."""
442+ rq_access_mode = request .node .callspec .params .get ('request_queue_apify' )
432443
433444 rq = request_queue_apify
434445 Actor .log .info ('Request queue opened' )
@@ -459,12 +470,20 @@ async def test_batch_operations_performance(request_queue_apify: RequestQueue) -
459470 Actor .log .info (f'Processing completed. Total processed: { processed_count } ' )
460471 assert processed_count == 50 , f'processed_count={ processed_count } '
461472
462- is_finished = await rq .is_finished ()
473+ if rq_access_mode == 'shared' :
474+ is_finished = await call_with_exp_backoff (rq .is_finished )
475+ else :
476+ is_finished = await rq .is_finished ()
477+
463478 assert is_finished is True , f'is_finished={ is_finished } '
464479
465480
466- async def test_state_consistency (request_queue_apify : RequestQueue ) -> None :
481+ async def test_state_consistency (
482+ request_queue_apify : RequestQueue ,
483+ request : pytest .FixtureRequest ,
484+ ) -> None :
467485 """Test queue state consistency during concurrent operations."""
486+ rq_access_mode = request .node .callspec .params .get ('request_queue_apify' )
468487
469488 rq = request_queue_apify
470489 Actor .log .info ('Request queue opened' )
@@ -482,14 +501,14 @@ async def test_state_consistency(request_queue_apify: RequestQueue) -> None:
482501 reclaimed_requests = []
483502
484503 for i in range (5 ):
485- request = await rq .fetch_next_request ()
486- if request :
504+ next_request = await rq .fetch_next_request ()
505+ if next_request :
487506 if i % 2 == 0 : # Process even indices
488- await rq .mark_request_as_handled (request )
489- processed_requests .append (request )
507+ await rq .mark_request_as_handled (next_request )
508+ processed_requests .append (next_request )
490509 else : # Reclaim odd indices
491- await rq .reclaim_request (request )
492- reclaimed_requests .append (request )
510+ await rq .reclaim_request (next_request )
511+ reclaimed_requests .append (next_request )
493512
494513 Actor .log .info (f'Processed { len (processed_requests )} requests, reclaimed { len (reclaimed_requests )} ' )
495514
@@ -514,7 +533,10 @@ async def test_state_consistency(request_queue_apify: RequestQueue) -> None:
514533 await rq .mark_request_as_handled (next_request )
515534
516535 Actor .log .info (f'Processed { remaining_count } remaining requests' )
517- is_finished = await rq .is_finished ()
536+ if rq_access_mode == 'shared' :
537+ is_finished = await call_with_exp_backoff (rq .is_finished )
538+ else :
539+ is_finished = await rq .is_finished ()
518540 assert is_finished is True , f'is_finished={ is_finished } '
519541
520542
@@ -549,8 +571,12 @@ async def test_empty_rq_behavior(request_queue_apify: RequestQueue) -> None:
549571 assert metadata .pending_request_count == 0 , f'metadata.pending_request_count={ metadata .pending_request_count } '
550572
551573
552- async def test_large_batch_operations (request_queue_apify : RequestQueue ) -> None :
574+ async def test_large_batch_operations (
575+ request_queue_apify : RequestQueue ,
576+ request : pytest .FixtureRequest ,
577+ ) -> None :
553578 """Test handling large batches of requests."""
579+ rq_access_mode = request .node .callspec .params .get ('request_queue_apify' )
554580
555581 rq = request_queue_apify
556582 Actor .log .info ('Request queue opened' )
@@ -571,18 +597,21 @@ async def test_large_batch_operations(request_queue_apify: RequestQueue) -> None
571597 processed_count = 0
572598
573599 while not await rq .is_empty ():
574- request = await rq .fetch_next_request ()
600+ next_request = await rq .fetch_next_request ()
575601
576602 # The RQ is_empty should ensure we don't get None
577- assert request is not None , f'request= { request } '
603+ assert next_request is not None , f'next_request= { next_request } '
578604
579- await rq .mark_request_as_handled (request )
605+ await rq .mark_request_as_handled (next_request )
580606 processed_count += 1
581607
582608 Actor .log .info (f'Processing completed. Total processed: { processed_count } ' )
583609 assert processed_count == 500 , f'processed_count={ processed_count } '
584610
585- is_finished = await rq .is_finished ()
611+ if rq_access_mode == 'shared' :
612+ is_finished = await call_with_exp_backoff (rq .is_finished )
613+ else :
614+ is_finished = await rq .is_finished ()
586615 assert is_finished is True , f'is_finished={ is_finished } '
587616
588617
@@ -993,26 +1022,33 @@ async def test_request_queue_has_stats(request_queue_apify: RequestQueue) -> Non
9931022 assert apify_metadata .stats .write_count == add_request_count
9941023
9951024
996- async def test_rq_long_url (request_queue_apify : RequestQueue ) -> None :
1025+ async def test_rq_long_url (
1026+ request_queue_apify : RequestQueue ,
1027+ request : pytest .FixtureRequest ,
1028+ ) -> None :
9971029 """Test handling of requests with long URLs and extended unique keys."""
1030+ rq_access_mode = request .node .callspec .params .get ('request_queue_apify' )
9981031 rq = request_queue_apify
999- request = Request .from_url (
1032+ long_url_request = Request .from_url (
10001033 'https://portal.isoss.gov.cz/irj/portal/anonymous/mvrest?path=/eosm-public-offer&officeLabels=%7B%7D&page=1&pageSize=100000&sortColumn=zdatzvsm&sortOrder=-1' ,
10011034 use_extended_unique_key = True ,
10021035 always_enqueue = True ,
10031036 )
10041037
1005- request_id = unique_key_to_request_id (request .unique_key )
1038+ request_id = unique_key_to_request_id (long_url_request .unique_key )
10061039
1007- processed_request = await rq .add_request (request )
1040+ processed_request = await rq .add_request (long_url_request )
10081041 assert processed_request .id == request_id
10091042
10101043 request_obtained = await rq .fetch_next_request ()
10111044 assert request_obtained is not None
10121045
10131046 await rq .mark_request_as_handled (request_obtained )
10141047
1015- is_finished = await rq .is_finished ()
1048+ if rq_access_mode == 'shared' :
1049+ is_finished = await call_with_exp_backoff (rq .is_finished )
1050+ else :
1051+ is_finished = await rq .is_finished ()
10161052 assert is_finished
10171053
10181054
@@ -1061,18 +1097,24 @@ async def test_force_cloud(
10611097
10621098async def test_request_queue_is_finished (
10631099 request_queue_apify : RequestQueue ,
1100+ request : pytest .FixtureRequest ,
10641101) -> None :
1102+ rq_access_mode = request .node .callspec .params .get ('request_queue_apify' )
1103+
10651104 await request_queue_apify .add_request (Request .from_url ('http://example.com' ))
10661105 assert not await request_queue_apify .is_finished ()
10671106
1068- request = await request_queue_apify .fetch_next_request ()
1069- assert request is not None
1107+ fetched = await request_queue_apify .fetch_next_request ()
1108+ assert fetched is not None
10701109 assert not await request_queue_apify .is_finished (), (
10711110 'RequestQueue should not be finished unless the request is marked as handled.'
10721111 )
10731112
1074- await request_queue_apify .mark_request_as_handled (request )
1075- assert await request_queue_apify .is_finished ()
1113+ await request_queue_apify .mark_request_as_handled (fetched )
1114+ if rq_access_mode == 'shared' :
1115+ assert await call_with_exp_backoff (request_queue_apify .is_finished )
1116+ else :
1117+ assert await request_queue_apify .is_finished ()
10761118
10771119
10781120async def test_request_queue_deduplication_unprocessed_requests (
0 commit comments