@@ -420,18 +420,12 @@ async def test_metadata_tracking(
420420 assert handled_after_add == 0 , f'handled_after_add={ handled_after_add } '
421421
422422 # Process some requests.
423- next_request = await call_with_exp_backoff (rq .fetch_next_request , rq_access_mode = rq_access_mode )
424- handled = 0
425- if next_request :
426- await rq .mark_request_as_handled (next_request )
427- handled += 1
428- for _ in range (2 ):
429- next_request = await rq .fetch_next_request ()
423+ for _ in range (3 ):
424+ next_request = await call_with_exp_backoff (rq .fetch_next_request , rq_access_mode = rq_access_mode )
430425 if next_request :
431426 await rq .mark_request_as_handled (next_request )
432- handled += 1
433427
434- Actor .log .info (f 'Processed { handled } requests' )
428+ Actor .log .info ('Processed 3 requests' )
435429
436430 # Check counts after processing
437431 final_total = await rq .get_total_count ()
@@ -504,12 +498,8 @@ async def test_state_consistency(
504498 processed_requests = []
505499 reclaimed_requests = []
506500
507- next_request = await call_with_exp_backoff (rq .fetch_next_request , rq_access_mode = rq_access_mode )
508- if next_request :
509- await rq .mark_request_as_handled (next_request )
510- processed_requests .append (next_request )
511- for i in range (1 , 5 ):
512- next_request = await rq .fetch_next_request ()
501+ for i in range (5 ):
502+ next_request = await call_with_exp_backoff (rq .fetch_next_request , rq_access_mode = rq_access_mode )
513503 if next_request :
514504 if i % 2 == 0 : # Process even indices
515505 await rq .mark_request_as_handled (next_request )
@@ -685,12 +675,8 @@ async def test_persistence_across_operations(
685675
686676 # Process some requests.
687677 processed_count = 0
688- next_request = await call_with_exp_backoff (rq .fetch_next_request , rq_access_mode = rq_access_mode )
689- if next_request :
690- await rq .mark_request_as_handled (next_request )
691- processed_count += 1
692- for _ in range (4 ):
693- next_request = await rq .fetch_next_request ()
678+ for _ in range (5 ):
679+ next_request = await call_with_exp_backoff (rq .fetch_next_request , rq_access_mode = rq_access_mode )
694680 if next_request :
695681 await rq .mark_request_as_handled (next_request )
696682 processed_count += 1
@@ -1389,9 +1375,13 @@ async def worker() -> int:
13891375 assert total_after_workers == 20
13901376
13911377 remaining_count = 0
1392- while next_request := await call_with_exp_backoff (rq .fetch_next_request , rq_access_mode = 'shared' ):
1393- remaining_count += 1
1394- await rq .mark_request_as_handled (next_request )
1378+ while not await call_with_exp_backoff (rq .is_finished , rq_access_mode = 'shared' ):
1379+ request = await call_with_exp_backoff (rq .fetch_next_request , rq_access_mode = 'shared' )
1380+ if request :
1381+ remaining_count += 1
1382+ await rq .mark_request_as_handled (request )
1383+ else :
1384+ break
13951385
13961386 final_handled = await rq .get_handled_count ()
13971387 final_total = await rq .get_total_count ()
0 commit comments