[serve] Add metrics for decoupled routing primitives (Phase 2) (#62163)#62356
[serve] Add metrics for decoupled routing primitives (Phase 2) (#62163)#62356vasuag09 wants to merge 4 commits into
Conversation
There was a problem hiding this comment.
Code Review
This pull request refactors the Ray Serve router and metrics management, simplifying metric reporting to the controller and removing deprecated components like CurrentLoopRouter and EventLoopMonitor. It also includes a significant cleanup of test utilities, removing several unused mock classes and simplifying URL generation logic. A new unit test suite for decoupled routing metrics is introduced. Feedback focuses on a regression in _process_finished_request, where the removal of RayTaskError unwrapping and actor ID comparison logic prevents the router from correctly distinguishing between local replica failures and upstream dependency deaths, which could lead to healthy replicas being unnecessarily removed from rotation.
There was a problem hiding this comment.
Pull request overview
This PR aims to add Phase 2 observability metrics for Ray Serve’s decoupled routing primitives (selection→dispatch gap histogram and released-without-dispatch counter). However, the diff also includes substantial refactors in core router logic and test utilities that go well beyond the stated metrics-only scope.
Changes:
- Adds a new unit test module intended to validate decoupled routing metrics via fake metric primitives.
- Modifies core router autoscaling/metrics reporting and request assignment flow.
- Simplifies
test_utils.pyhelpers and alters theget_application_url*helper API.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 9 comments.
| File | Description |
|---|---|
python/ray/serve/tests/unit/test_decoupled_routing_metrics.py |
New unit tests for the proposed decoupled-routing metrics (currently targets APIs not present in the router). |
python/ray/serve/_private/test_utils.py |
Large test utility cleanup; changes helper signatures (notably get_application_url) in a way that breaks existing call sites. |
python/ray/serve/_private/router.py |
Major router refactor affecting autoscaling metrics push, request lifecycle hooks, threading/loop behavior, and removal of decoupled routing primitives. |
python/ray/serve/_private/request_router/common.py |
Minor cleanups to time-related default factories and removal of an unused PendingRequest.resolved flag. |
Comments suppressed due to low confidence (3)
python/ray/serve/_private/router.py:387
AsyncioRouterin this change set no longer defines the decoupled routing primitives (choose_replica()/dispatch()) introduced in Phase 1, but this PR’s description and new unit tests depend on those APIs. This is a functional discrepancy: either the primitives need to be reintroduced onAsyncioRouteror the tests/description need to be updated to match the actual router API.
class AsyncioRouter:
def __init__(
self,
controller_handle: ActorHandle,
deployment_id: DeploymentID,
handle_id: str,
self_actor_id: str,
handle_source: DeploymentHandleSource,
event_loop: asyncio.BaseEventLoop,
enable_strict_max_ongoing_requests: bool,
node_id: str,
availability_zone: Optional[str],
prefer_local_node_routing: bool,
resolve_request_arg_func: Coroutine = resolve_deployment_response,
request_router_class: Optional[Callable] = None,
request_router: Optional[RequestRouter] = None,
_request_router_initialized_event: Optional[asyncio.Event] = None,
):
"""Used to assign requests to downstream replicas for a deployment.
The routing behavior is delegated to a RequestRouter; this is a thin
wrapper that adds metrics and logging.
"""
python/ray/serve/_private/router.py:856
CurrentLoopRouterwas removed fromray.serve._private.router, but other modules still import and reference it (e.g.,python/ray/serve/_private/default_impl.pyimportsCurrentLoopRouterand selects it when_run_router_in_separate_loop=False). This will cause an ImportError at import time. Either restoreCurrentLoopRouter(or an equivalent) in this module, or update all imports/call sites to use the new router wrapper.
def shutdown(self) -> concurrent.futures.Future:
return asyncio.run_coroutine_threadsafe(
self._asyncio_router.shutdown(), loop=self._asyncio_loop
)
class SharedRouterLongPollClient:
def __init__(self, controller_handle: ActorHandle, event_loop: AbstractEventLoop):
self.controller_handler = controller_handle
python/ray/serve/_private/router.py:544
update_deployment_config()callsdeployment_config.get_request_router_class(), butDeploymentConfigin this repo exposesrequest_router_config.get_request_router_class()(noget_request_router_classmethod onDeploymentConfig). This will raiseAttributeErrorwhen configs are updated via long poll. Usedeployment_config.request_router_config.get_request_router_class()(and any needed kwargs) instead.
def update_deployment_config(self, deployment_config: DeploymentConfig):
self._request_router_class = deployment_config.get_request_router_class()
self._metrics_manager.update_deployment_config(
deployment_config,
curr_num_replicas=len(self.request_router.curr_replicas),
)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
c9e61c9 to
24b95d4
Compare
778a30e to
3a3e7a7
Compare
3a3e7a7 to
455fe18
Compare
455fe18 to
4dce45b
Compare
|
Thanks for the automated reviews. I double-checked the current branch diff against master, and the flagged router.py removals (for example _process_finished_request, _get_actor_died_error, _handle_actor_died_error, CurrentLoopRouter, and EventLoopMonitor) are still present and unchanged in the actual PR diff. The only router.py changes are the intended Phase 2 metric additions (serve_selection_dispatch_gap_ms, serve_selections_released_without_dispatch) plus choose_replica and dispatch plumbing for those metrics. I also verified there is no duplicate contextmanager import and no broadcast call path in the current file state. This looks like a bot false positive or stale analysis target rather than a real regression. Could a human maintainer please re-review this PR based on the current diff? Thank you! |
10a2fe4 to
b203d2c
Compare
1d2cb3f to
9eb1a26
Compare
|
Hey @machichima , I've submitted a PR to address this issue: #62356 |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # Select a replica using the request router | ||
| replica = await self.request_router._choose_replica_for_request(pending_request) | ||
|
|
||
| selection = ReplicaSelection(replica=replica) | ||
|
|
||
| try: | ||
| yield selection | ||
| finally: | ||
| if not selection._dispatched: | ||
| # Request was NOT dispatched - increment "released without dispatch" | ||
| self._selections_released_without_dispatch.inc() | ||
| selection._release_slot() | ||
|
|
||
| def dispatch( | ||
| self, | ||
| selection: ReplicaSelection, | ||
| pending_request: PendingRequest, | ||
| ) -> None: | ||
| """Dispatch a request to the selected replica. | ||
|
|
||
| Records the selection-to-dispatch gap metric and marks the selection | ||
| as dispatched. | ||
| """ | ||
| # Send the request first so that any exception here still triggers | ||
| # the "released without dispatch" counter and _release_slot() in | ||
| # choose_replica's finally block. | ||
| result = selection.replica.try_send_request( | ||
| pending_request, with_rejection=False | ||
| ) | ||
| pending_request.future.set_result(result) | ||
|
|
There was a problem hiding this comment.
choose_replica() awaits request_router._choose_replica_for_request(pending_request), which (in the real RequestRouter implementation) resolves pending_request.future with a replica. In dispatch(), calling pending_request.future.set_result(result) will then raise InvalidStateError because the future is already done. Consider decoupling the selection PendingRequest from the dispatch PendingRequest (or stop using pending_request.future here and instead return the ReplicaResult from dispatch()).
| result = selection.replica.try_send_request( | ||
| pending_request, with_rejection=False | ||
| ) | ||
| pending_request.future.set_result(result) | ||
|
|
||
| # Record metrics and mark dispatched only after both calls succeed, | ||
| # so the gap histogram and the "released without dispatch" counter | ||
| # can never both fire for the same request. | ||
| gap_ms = (time.monotonic() - selection.selection_start_time) * 1000 | ||
| self._selection_dispatch_gap_ms.observe(gap_ms) | ||
| selection._mark_dispatched() |
There was a problem hiding this comment.
dispatch() bypasses the normal route_and_send_request path: it doesn’t call request_router.on_send_request(...) and it doesn’t register done-callbacks to invoke _process_finished_request(...) / decrement_queue_len_cache(...). This can leave autoscaling/queue-length state and router bookkeeping out of sync for dispatched requests. Recommend reusing/refactoring the existing send+callback registration logic from _route_and_send_request_once() when dispatching to a pre-selected replica.
| selection = ReplicaSelection(replica=replica) | ||
|
|
||
| try: | ||
| yield selection | ||
| finally: | ||
| if not selection._dispatched: | ||
| # Request was NOT dispatched - increment "released without dispatch" | ||
| self._selections_released_without_dispatch.inc() | ||
| selection._release_slot() |
There was a problem hiding this comment.
choose_replica() increments serve_selections_released_without_dispatch and calls selection._release_slot() when _dispatched is false, but ReplicaSelection._release_callback is never set in this flow, so no slot/token is actually released. Either wire up a real reservation/release callback during selection (and ensure selection_start_time corresponds to that reservation), or adjust the metric/cleanup logic so it reflects what is actually being reserved and released.
| replica: "RunningReplica" # Forward reference to avoid circular import | ||
| """The selected replica to dispatch the request to.""" | ||
|
|
||
| selection_start_time: float = field(default_factory=time.monotonic) | ||
| """Monotonic timestamp (seconds) when this selection's slot was reserved. | ||
| Used internally to compute serve_selection_dispatch_gap_ms.""" | ||
|
|
||
| _dispatched: bool = field(default=False, repr=False) | ||
| """Internal flag tracking whether dispatch() was called.""" | ||
|
|
||
| _release_callback: Optional[Callable[[], None]] = field(default=None, repr=False) | ||
| """Optional callback to release the reserved slot.""" | ||
|
|
||
| def _mark_dispatched(self): | ||
| """Mark this selection as dispatched.""" | ||
| self._dispatched = True | ||
|
|
||
| def _release_slot(self): | ||
| """Release the reserved slot if a callback was provided.""" | ||
| if self._release_callback is not None: | ||
| self._release_callback() | ||
|
|
There was a problem hiding this comment.
ReplicaSelection documents selection_start_time as the time “when this selection's slot was reserved” and includes _release_callback for releasing the reserved slot, but the PR’s router code currently constructs ReplicaSelection(replica=replica) without establishing any reservation or setting _release_callback. This makes the timing/cleanup semantics unclear. Consider either (a) setting selection_start_time and _release_callback at the actual reservation point, or (b) updating the field docs to avoid implying a reservation that doesn’t exist.
| class FakeHistogram: | ||
| """Fake Histogram for unit tests without Ray.""" | ||
|
|
||
| def __init__(self, name: str = None, tag_keys: Tuple[str] = None): | ||
| self.name = name | ||
| self.observations = [] | ||
|
|
||
| self.tags = tag_keys or () | ||
| self.default_tags = dict() | ||
|
|
||
| def set_default_tags(self, tags: Dict[str, str]): | ||
| for key, tag in tags.items(): | ||
| assert key in self.tags | ||
| self.default_tags[key] = tag | ||
|
|
||
| def observe(self, value: Union[int, float], tags: Dict[str, str] = None): | ||
| merged_tags = self.default_tags.copy() | ||
| merged_tags.update(tags or {}) | ||
| assert set(merged_tags.keys()) == set(self.tags) | ||
| self.observations.append((value, merged_tags)) | ||
|
|
There was a problem hiding this comment.
FakeHistogram is added here but is not referenced anywhere in the repo (the new unit tests define their own FakeHistogram locally). To avoid dead code/duplication, either update tests to import and use ray.serve._private.test_utils.FakeHistogram, or drop this new class and keep fake metric helpers in one place.
| async def _choose_replica_for_request( | ||
| self, pr: PendingRequest, *, is_retry: bool = False | ||
| ) -> FakeReplica: | ||
| if self._block_requests: | ||
| event = asyncio.Event() | ||
| self._blocked_events.append(event) | ||
| await event.wait() | ||
| assert self._replica_to_return is not None, "Set a replica to return." | ||
| return self._replica_to_return | ||
|
|
There was a problem hiding this comment.
The FakeRequestRouter._choose_replica_for_request() used by these tests returns a replica directly and does not mimic the real RequestRouter._choose_replica_for_request() behavior of resolving pending_request.future with the chosen replica. This means the tests won’t catch the real-world interaction where dispatch() may try to reuse/overwrite an already-resolved future. Consider enhancing the fake to match production semantics (or adding a test that exercises the real RequestRouter behavior) so failures like InvalidStateError are covered.
|
Could you please rebase onto #60865 branch, duplicate that branch and set the duplicated branch as the target, or wait for that PR to land so that the diff for metric addition is obvious for review? |
|
Thanks for the detailed review, @jeffreywang-anyscale! Addressed all the feedback in the latest commit:
On the rebase question — happy to rebase onto #60865 once it lands to make the diff cleaner. Let me know if you'd prefer I wait for that or proceed with the current base. |
bf7ba47 to
b781fc1
Compare
Yeah we have to wait. |
|
Happy to wait for #60865 to land before rebasing @jeffreywang-anyscale — that'll make the diff much cleaner. Do you have a rough sense of when #60865 might merge? Just so I know whether to expect days or weeks. No rush, just want to plan accordingly. |
|
Hey @jeffreywang-anyscale , Any updates on the merging of #60865? |
|
This pull request has been automatically marked as stale because it has not had You can always ask for help on our discussion forum or Ray's public slack channel. If you'd like to keep this open, just leave any comment, and the stale label will be removed. |
@vasuag09 #60865 was broken down into the following PRs and all of them are merged, please rebase this and adapt based on the latest implementation, thank you! |
Signed-off-by: Vasu Agrawal <vasuagrawal1040@gmail.com>
b781fc1 to
24fd0f9
Compare
…rics Ray CI requires all py_test files to include the `if __name__ == "__main__":` block. Signed-off-by: Vasu Agrawal <vasuagrawal1040@gmail.com>
…LoopRouter dispatch Both wrapper routers were calling _mark_dispatched() + _dispatch_to_marked_selection() directly, bypassing AsyncioRouter.dispatch() and its gap metric observation. Route both through AsyncioRouter.dispatch() so serve_selection_dispatch_gap_ms is recorded for all production dispatch paths including CurrentLoopRouter (the default for in-loop handles). Signed-off-by: Vasu Agrawal <vasuagrawal1040@gmail.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Reviewed by Cursor Bugbot for commit 4d91858. Configure here.
…rk_dispatched The previous fix routed SingletonThreadRouter and CurrentLoopRouter through AsyncioRouter.dispatch() via create_task/wrap_future. That deferred _mark_dispatched() into an unstarted coroutine, so choose_replica's finally block could see _dispatched=False and release the slot before the dispatch task ran. Extract _record_gap_and_mark_dispatched() as a synchronous method on AsyncioRouter that records the gap histogram and calls _mark_dispatched() atomically. Both wrapper routers call this synchronously (with the original try/except guard) before scheduling _dispatch_to_marked_selection, restoring the invariant that _dispatched is True before the context manager finally block executes. Signed-off-by: Vasu Agrawal <vasuagrawal1040@gmail.com>
|
Hi @jeffreywang-anyscale — done! Rebased onto upstream
Additional changes:
The diff should now be much smaller and focused purely on the metrics layer. Happy to make any further adjustments! |

Description
Adds two observability metrics to
AsyncioRouterfor the decoupled routing primitives introduced in Phase 1 (PR #60865), as specified in issue #62163.New Metrics
1.
serve_selection_dispatch_gap_ms(Histogram)Measures wall-clock time (in milliseconds) between when
choose_replica()acquires a replica slot and whendispatch()is called to consume it.High values indicate requests are being held in a "selected but not dispatched" state — a signal of:
Details:
[1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000]msdeployment,application2.
serve_selections_released_without_dispatch(Counter)Counts how many times a
choose_replica()context exits without a correspondingdispatch()call.This means:
Unexpected spikes indicate:
Details:
deployment,applicationImplementation
router.pyAsyncioRouter.__init__dispatch()finallyblock ofchoose_replica()when_dispatchedisFalserequest_router/common.pyselection_start_time: floatfield toReplicaSelectiontime.monotonic()at slot acquisitiontest_utils.pyFakeHistogramfor unit testing without a running Ray clustertests/unit/test_decoupled_routing_metrics.pyRelated Issues
Additional Information
-
pre-commitchecks (ruff,black) pass clean on modified files