[Serve] decouple routing primitives#60865
Conversation
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
There was a problem hiding this comment.
Code Review
This pull request introduces new primitives to decouple replica selection from request dispatching in Ray Serve's routing logic. It adds a new ReplicaSelection data class and choose_replica and dispatch methods to the AsyncioRouter. This change enables a two-phase process where a replica can be chosen and a slot reserved before the request is actually sent.
While this is a good step towards more flexible routing, I've found a few critical issues in the current implementation that will cause runtime errors. Specifically, there are calls to attributes and methods that don't exist (_deployment_handle on AsyncioRouter, and reserve_slot, release_slot, send_request_with_slot on RunningReplica). Additionally, there's a logic error in checking for replica availability that would cause all dispatches to fail. Please see my detailed comments for suggestions on how to fix these issues.
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
This method is consumed by the ingress routing logic in this PR, not by the base substrate PR. Renamed to private with TODO to migrate to DeploymentHandle.choose_replica() once ray-project#60865 lands. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
This method is consumed by the ingress routing logic in this PR, not by the base substrate PR. Renamed to private with TODO to migrate to DeploymentHandle.choose_replica() once ray-project#60865 lands. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
This method is consumed by the ingress routing logic in this PR, not by the base substrate PR. Renamed to private with TODO to migrate to DeploymentHandle.choose_replica() once ray-project#60865 lands. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
This method is consumed by the ingress routing logic in this PR, not by the base substrate PR. Renamed to private with TODO to migrate to DeploymentHandle.choose_replica() once ray-project#60865 lands. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
…decouple-routing-primitives Signed-off-by: machichima <nary12321@gmail.com>
…called Signed-off-by: machichima <nary12321@gmail.com>
This method is consumed by the ingress routing logic in this PR, not by the base substrate PR. Renamed to private with TODO to migrate to DeploymentHandle.choose_replica() once ray-project#60865 lands. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
This method is consumed by the ingress routing logic in this PR, not by the base substrate PR. Renamed to private with TODO to migrate to DeploymentHandle.choose_replica() once ray-project#60865 lands. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
This method is consumed by the ingress routing logic in this PR, not by the base substrate PR. Renamed to private with TODO to migrate to DeploymentHandle.choose_replica() once ray-project#60865 lands. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
This method is consumed by the ingress routing logic in this PR, not by the base substrate PR. Renamed to private with TODO to migrate to DeploymentHandle.choose_replica() once ray-project#60865 lands. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
This method is consumed by the ingress routing logic in this PR, not by the base substrate PR. Renamed to private with TODO to migrate to DeploymentHandle.choose_replica() once ray-project#60865 lands. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
This method is consumed by the ingress routing logic in this PR, not by the base substrate PR. Renamed to private with TODO to migrate to DeploymentHandle.choose_replica() once ray-project#60865 lands. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
This method is consumed by the ingress routing logic in this PR, not by the base substrate PR. Renamed to private with TODO to migrate to DeploymentHandle.choose_replica() once ray-project#60865 lands. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
This method is consumed by the ingress routing logic in this PR, not by the base substrate PR. Renamed to private with TODO to migrate to DeploymentHandle.choose_replica() once ray-project#60865 lands. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
This method is consumed by the ingress routing logic in this PR, not by the base substrate PR. Renamed to private with TODO to migrate to DeploymentHandle.choose_replica() once ray-project#60865 lands. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
|
Going to break this PR into 2-3 PRs. |
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Reviewed by Cursor Bugbot for commit c9509df. Configure here.
| self.request_router.on_new_queue_len_info( | ||
| replica.replica_id, queue_info | ||
| ) | ||
| raise |
There was a problem hiding this comment.
Slot leak when CancelledError interrupts dispatch cleanup
Medium Severity
When _dispatch_to_marked_selection catches asyncio.CancelledError via except BaseException, the subsequent await selection._release_slot(force=True) can itself raise CancelledError (since the task is still cancelled). This prevents the slot from being released. The choose_replica finally block won't help because _dispatched is already True, causing _release_slot() without force to return None. The reserved semaphore slot on the replica leaks until the actor restarts.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit c9509df. Configure here.
| self._release_slot_if_still_reserved(selection) | ||
| ) | ||
| ) | ||
| ) |
There was a problem hiding this comment.
Unnecessary remote call on every dispatch completion
Medium Severity
The _release_slot_if_still_reserved done callback fires on every completed dispatch, unconditionally calling _release_slot(force=True) which always issues a remote actor call to release_slot. In the normal happy path (the vast majority of requests), _start_request has already consumed the token, so this remote call is a guaranteed no-op that adds latency and actor mailbox pressure. For the PD disaggregation use case (latency-sensitive), this adds overhead proportional to request throughput.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit c9509df. Configure here.
|
Closing this as the following PRs are merged:
Thanks again @machichima! |


Description
Based on the RFC #59792, to better support Parallel PD pattern for prefill-decode disaggregation, we can add two new primitives to Ray Serve's DeploymentHandle API:
choose_replica()(an async context manager) anddispatch()Main changes
Public API
python/ray/serve/handle.pyDeploymentHandle.choose_replica(): Returns async context manager for replica selectionDeploymentHandle.dispatch(): Dispatch request to selected replicapython/ray/serve/exceptions.pyCore Router Logic
python/ray/serve/_private/router.pyAsyncioRouter.choose_replica(): Context manager to select and reserve a replica slotAsyncioRouter.dispatch(): Send request to a previously selected replicachoose_replica()anddispatch()toSingletonThreadRouterandCurrentLoopRouterpython/ray/serve/_private/request_router/request_router.pyRequestRouter.on_replica_result_finished(): Decrement queue length cache when reserved slot is releasedpython/ray/serve/_private/request_router/replica_wrapper.pyreserve_slot()andrelease_slot()to reserve and release a slot and return unique tokenTests
python/ray/serve/tests/test_handle_1.py(integration test based on the examples showed in #59792)test_choose_replica_and_dispatch_single(): Tests basic single selection patterntest_choose_replica_and_dispatch_parallel(): Tests parallel selection pattern (PD proxy use case) using AsyncExitStackpython/ray/serve/tests/unit/test_router.py(unit test)TestChooseReplicafor checkingAsyncioRouter'schoose_replica()anddispatch()behaviorRelated issues
Related to #59792
Additional information