@@ -574,5 +574,49 @@ async def handle_request(self, request: str):
574574 )
575575
576576
577+ @pytest .mark .asyncio
578+ @pytest .mark .skipif (
579+ RAY_SERVE_FORCE_LOCAL_TESTING_MODE ,
580+ reason = "local_testing_mode doesn't support choose_replica/dispatch" ,
581+ )
582+ async def test_choose_replica_cancel_releases_slot_across_loop_boundary (
583+ serve_instance ,
584+ ):
585+ """Cancelling a task holding a selection must release the slot, even when
586+ the router lives on a different thread (SingletonThreadRouter)."""
587+
588+ @serve .deployment (num_replicas = 1 , max_ongoing_requests = 1 )
589+ class Backend :
590+ def process (self , msg : str ):
591+ return msg
592+
593+ handle = serve .run (Backend .bind ())
594+ # Warm up so the cancellation isn't racing initialization.
595+ assert await handle .process .remote ("warmup" ) == "warmup"
596+
597+ ready = asyncio .Event ()
598+ never_set = asyncio .Event ()
599+
600+ async def hold ():
601+ async with handle .process .choose_replica ("first" ):
602+ ready .set ()
603+ await never_set .wait ()
604+
605+ task = asyncio .create_task (hold ())
606+ await asyncio .wait_for (ready .wait (), timeout = 5 )
607+
608+ task .cancel ()
609+ with pytest .raises (asyncio .CancelledError ):
610+ await task
611+
612+ # Hangs if the cancelled task leaked the only slot.
613+ async with handle .process .choose_replica ("second" ) as selection :
614+ result = await asyncio .wait_for (
615+ handle .process .dispatch (selection , "second" ),
616+ timeout = 5 ,
617+ )
618+ assert result == "second"
619+
620+
577621if __name__ == "__main__" :
578622 sys .exit (pytest .main (["-v" , "-s" , __file__ ]))
0 commit comments