Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
4dc1d4e
feat: add DeploymentHandle skeleton
machichima Feb 8, 2026
d85b8eb
feat: add AsyncioRouter skeleton
machichima Feb 8, 2026
8140942
fix: type and undefined err
machichima Feb 14, 2026
1e909ca
test: update FakeReplica and add TestChooseReplica
machichima Feb 14, 2026
3df3795
feat+test: choose_replica + ensure slot reserved
machichima Feb 16, 2026
07a8962
feat: enable calling from handle layer
machichima Feb 16, 2026
3faa9a8
feat: dispatch try_send_request
machichima Feb 16, 2026
470958c
test: ensure dispatch send request to the correct replica
machichima Feb 16, 2026
b0c792c
fix: args/kwargs type + verify same deployment id
machichima Feb 16, 2026
008b190
test: ensure single/stack pattern works
machichima Feb 16, 2026
1deff34
fix: account for reserved slots when checking avail
machichima Feb 17, 2026
88db9dd
refactor: lint and format
machichima Feb 17, 2026
c6f9b45
refactor: move reserved slots metrics to RouterMetricsManager
machichima Feb 17, 2026
f7b30bb
refactor: lint and docstring
machichima Feb 17, 2026
0090c5c
refactor: remove unused code
machichima Feb 17, 2026
504bb8a
refactor: clean-up test comments
machichima Feb 17, 2026
6171af9
Merge branch 'master' into 59792-serve-decouple-routing-primitives
machichima Feb 17, 2026
f529d68
test: add placeholder method to LocalRouter
machichima Feb 18, 2026
c460e57
fix: also release slot after dispatch
machichima Feb 18, 2026
a3048a4
test: fix param type + release slot after dispatch
machichima Feb 18, 2026
56a6a89
fix: choose_replica run on router thread loop
machichima Feb 18, 2026
732215f
fix: check if already dispatched
machichima Feb 18, 2026
251f9e7
fix: dispatch inc metrics + add completion callback
machichima Feb 18, 2026
e1ef1fa
fix: add wrap_queued_request in choose_replica
machichima Feb 18, 2026
4fe9ebb
fix: skip choose_replica and dispatch test in local mode
machichima Mar 6, 2026
4009306
fix: manual decrease cache only when not dispatched
machichima Mar 6, 2026
642f05f
fix: call request_counter.inc in DeploymentHandle._choose_replica
machichima Mar 6, 2026
a074356
refactor: precommit
machichima Mar 6, 2026
2b5ce83
Merge branch 'master' into 59792-serve-decouple-routing-primitives
machichima Mar 6, 2026
1d4d735
fix: mark dispatch after check replica available
machichima Mar 6, 2026
3ee929a
fix: prevent double count in cache mode
machichima Mar 6, 2026
3327986
fix: router test error
machichima Mar 7, 2026
f7fc4c0
fix: api docs ReplicaUnavailableError issue
machichima Mar 8, 2026
1683ff4
fix: decrease queue len when finished or no callback
machichima Mar 8, 2026
edd7ccf
refactor: _deployment_handle to optional
machichima Mar 8, 2026
afef7b3
refactor: add ReplicaUnavailableError to serve public API
machichima Mar 9, 2026
21547d6
Merge branch 'master' of github.com:ray-project/ray into 59792-serve-…
machichima Mar 10, 2026
2852468
fix: fix circular deps by setting deployment_id instead
machichima Mar 23, 2026
bc58182
refactor: extract common logic to utility function
machichima Mar 23, 2026
de7e6e3
fix: move import to the top of the file
machichima Mar 23, 2026
58f42f7
refactor: choose_replica to async
machichima Mar 23, 2026
174f7bd
refactor: fix docstrings
machichima Mar 23, 2026
fe842d0
refactor: better document on_replica_result_finished
machichima Mar 23, 2026
c01822b
fix: remove redundant code as we always in cache mode
machichima Mar 23, 2026
7fe198b
refactor: make reserved_slots comment accurate
machichima Mar 23, 2026
446e7d3
fix: reuse the RequestMetadata created from choose_replica
machichima Mar 23, 2026
1c5c9dc
refactor: fix docstring
machichima Mar 23, 2026
62ea65e
test: multiple dispatch calls fail
machichima Mar 26, 2026
9cae538
test: dispatch won't be rejected even when reach the threshold
machichima Mar 26, 2026
80b3c07
test: ensure specific error raised in choose_replica
machichima Mar 27, 2026
ad7c561
test: streaming support integration test
machichima Mar 27, 2026
f32576a
test: exception or early return release slot
machichima Mar 27, 2026
04b0c19
Merge branch 'master' of github.com:ray-project/ray into 59792-serve-…
machichima Mar 27, 2026
1af2c2e
fix: add missing callback param
machichima Mar 29, 2026
9959551
fix: prevent duplicate decrease queue length cache
machichima Apr 10, 2026
779123c
fix: use call_soon_threadsafe
machichima Apr 20, 2026
f978363
fix: move resolve_request_arguments into try/except block
machichima Apr 20, 2026
5d2154a
refactor: ReplicaSelection to replica_wrapper
machichima Apr 21, 2026
b7e376c
test: remove redundant test
machichima Apr 21, 2026
65da32e
Merge branch 'master' of github.com:ray-project/ray into 59792-serve-…
machichima Apr 21, 2026
d65ad18
fix: calling on_replica_result_finished only when on_send_request is …
machichima Apr 26, 2026
5684558
Merge branch 'master' into 59792-serve-decouple-routing-primitives
jeffreywang-anyscale May 8, 2026
c9509df
Reserve replica capacity
jeffreywang-anyscale May 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions ci/lint/pydoclint-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1453,10 +1453,6 @@ python/ray/serve/_private/proxy_response_generator.py
python/ray/serve/_private/proxy_state.py
DOC201: Method `ProxyStateManager.get_targets` does not have a return section in docstring
--------------------
python/ray/serve/_private/router.py
DOC101: Method `SingletonThreadRouter.assign_request`: Docstring contains fewer arguments than in function signature.
DOC103: Method `SingletonThreadRouter.assign_request`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [**request_kwargs: , *request_args: , request_meta: RequestMetadata].
--------------------
python/ray/serve/_private/storage/kv_store.py
DOC201: Method `RayInternalKVStore.put` does not have a return section in docstring
DOC201: Method `RayInternalKVStore.delete` does not have a return section in docstring
Expand Down
3 changes: 2 additions & 1 deletion doc/source/serve/api/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ See the [model composition guide](serve-model-composition) for how to update cod
serve.exceptions.RequestCancelledError
serve.exceptions.gRPCStatusError
serve.exceptions.DeploymentUnavailableError
serve.exceptions.ReplicaUnavailableError
```


Expand Down Expand Up @@ -524,4 +525,4 @@ Content-Type: application/json

serve.llm.LLMServer
serve.llm.LLMRouter
```
```
3 changes: 3 additions & 0 deletions python/ray/serve/_private/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,9 @@ class RequestMetadata:
request_serialization: str = "cloudpickle"
response_serialization: str = "cloudpickle"

# Token for a replica-side slot reserved by choose_replica().
_reserved_slot_token: Optional[str] = None

@property
def is_http_request(self) -> bool:
return self._request_protocol == RequestProtocol.HTTP
Expand Down
47 changes: 46 additions & 1 deletion python/ray/serve/_private/local_testing_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,19 @@
import logging
import queue
import time
from contextlib import asynccontextmanager
from functools import wraps
from typing import Any, Callable, Coroutine, Dict, List, Optional, Tuple, Union
from typing import (
Any,
AsyncIterator,
Callable,
Coroutine,
Dict,
List,
Optional,
Tuple,
Union,
)

import ray
from ray import cloudpickle
Expand All @@ -16,6 +27,7 @@
)
from ray.serve._private.replica import UserCallableWrapper
from ray.serve._private.replica_result import ReplicaResult
from ray.serve._private.request_router.replica_wrapper import ReplicaSelection
from ray.serve._private.router import Router
from ray.serve._private.utils import GENERATOR_COMPOSITION_NOT_SUPPORTED_ERROR
from ray.serve.deployment import Deployment
Expand Down Expand Up @@ -341,6 +353,39 @@ def generator_result_callback(item: Any):
)
return noop_future

@asynccontextmanager
async def choose_replica(
self,
request_meta: RequestMetadata,
*request_args,
**request_kwargs,
) -> AsyncIterator[ReplicaSelection]:
"""Choose replica is not supported in local testing mode.

This is a stub implementation to satisfy the Router ABC interface.
"""
raise NotImplementedError(
Comment thread
jeffreywang-anyscale marked this conversation as resolved.
"choose_replica is not supported in local testing mode. "
"Use assign_request instead."
)
yield # Make this a generator for asynccontextmanager

def dispatch(
self,
selection: ReplicaSelection,
request_meta: RequestMetadata,
*request_args,
**request_kwargs,
) -> concurrent.futures.Future[ReplicaResult]:
"""Dispatch is not supported in local testing mode.

This is a stub implementation to satisfy the Router ABC interface.
"""
raise NotImplementedError(
"dispatch is not supported in local testing mode. "
"Use assign_request instead."
)

async def broadcast(
self,
request_meta: RequestMetadata,
Expand Down
50 changes: 48 additions & 2 deletions python/ray/serve/_private/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -1182,13 +1182,36 @@ def __init__(
self._direct_ingress_grpc_server_task: Optional[asyncio.Task] = None

self._num_queued_requests = 0
self._reserved_slots: Set[str] = set()

@property
def max_ongoing_requests(self) -> int:
return self._deployment_config.max_ongoing_requests

def get_num_ongoing_requests(self) -> int:
return self._metrics_manager.get_num_ongoing_requests()
return self._metrics_manager.get_num_ongoing_requests() + len(
self._reserved_slots
)

async def reserve_slot(
self, request_metadata: RequestMetadata, slot_token: str
) -> Tuple[bool, int]:
"""Reserve replica capacity for a future dispatch call."""
if not self._can_accept_request(request_metadata):
return False, self.get_num_ongoing_requests()

await self._semaphore.acquire()
self._reserved_slots.add(slot_token)
return True, self.get_num_ongoing_requests()

def release_slot(self, slot_token: str) -> Tuple[bool, int]:
"""Release replica capacity reserved by choose_replica()."""
if slot_token not in self._reserved_slots:
return False, self.get_num_ongoing_requests()

self._reserved_slots.remove(slot_token)
self._semaphore.release()
return True, self.get_num_ongoing_requests()

def get_metadata(self) -> ReplicaMetadata:
current_rank = ray.serve.context._get_internal_replica_context().rank
Expand Down Expand Up @@ -1865,12 +1888,25 @@ def _on_request_failed(self, request_metadata: RequestMetadata, e: Exception):

@asynccontextmanager
async def _start_request(self, request_metadata: RequestMetadata):
async with self._semaphore:
reserved_slot_token = request_metadata._reserved_slot_token
if reserved_slot_token:
if reserved_slot_token not in self._reserved_slots:
raise RuntimeError(
"Request tried to consume an unknown reserved slot "
f"{reserved_slot_token}."
)
self._reserved_slots.remove(reserved_slot_token)
else:
await self._semaphore.acquire()

try:
try:
self._metrics_manager.inc_num_ongoing_requests(request_metadata)
yield
finally:
self._metrics_manager.dec_num_ongoing_requests(request_metadata)
finally:
self._semaphore.release()

async def _drain_ongoing_requests(self):
"""Wait for any ongoing requests to finish.
Expand Down Expand Up @@ -2759,6 +2795,16 @@ def get_num_ongoing_requests(self) -> int:
"""
return self._replica_impl.get_num_ongoing_requests()

async def reserve_slot(
self, request_metadata: RequestMetadata, slot_token: str
) -> Tuple[bool, int]:
"""Reserve capacity for a future choose_replica/dispatch request."""
return await self._replica_impl.reserve_slot(request_metadata, slot_token)

def release_slot(self, slot_token: str) -> Tuple[bool, int]:
"""Release capacity reserved by choose_replica()."""
return self._replica_impl.release_slot(slot_token)

async def is_allocated(self) -> str:
"""poke the replica to check whether it's alive.

Expand Down
1 change: 1 addition & 0 deletions python/ray/serve/_private/request_router/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
PowerOfTwoChoicesRequestRouter,
)
from ray.serve._private.request_router.replica_wrapper import ( # noqa: F401
ReplicaSelection,
RunningReplica,
)
from ray.serve._private.request_router.request_router import ( # noqa: F401
Expand Down
143 changes: 136 additions & 7 deletions python/ray/serve/_private/request_router/replica_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
import asyncio
import logging
import pickle
import uuid
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Dict, Optional, Set, Tuple

import grpc

import ray
from ray.actor import ActorHandle
from ray.serve._private.common import (
DeploymentID,
ReplicaID,
ReplicaQueueLengthInfo,
RequestMetadata,
RunningReplicaInfo,
)
from ray.serve._private.constants import (
RAY_SERVE_REPLICA_GRPC_MAX_MESSAGE_LENGTH,
SERVE_LOGGER_NAME,
)
from ray.serve._private.constants import RAY_SERVE_REPLICA_GRPC_MAX_MESSAGE_LENGTH
from ray.serve._private.replica_result import (
ActorReplicaResult,
ReplicaResult,
Expand All @@ -35,8 +36,6 @@
_is_tracing_enabled,
)

logger = logging.getLogger(SERVE_LOGGER_NAME)


class ReplicaWrapper(ABC):
"""This is used to abstract away details of the transport layer
Expand Down Expand Up @@ -201,6 +200,10 @@ def __init__(self, replica_info: RunningReplicaInfo):
self._actor_replica_wrapper = ActorReplicaWrapper(self._actor_handle)
self._grpc_replica_wrapper = None

# Active local slot reservation tokens for Java replicas. Python replicas
# reserve capacity on the actor-side semaphore.
self._reserved_slots: Set[str] = set()

def update_replica_info(self, replica_info: RunningReplicaInfo) -> None:
"""Update mutable fields from a new RunningReplicaInfo.

Expand Down Expand Up @@ -324,3 +327,129 @@ def try_send_request(
return wrapper.send_request_java(pr)

return wrapper.send_request_python(pr, with_rejection=with_rejection)

async def reserve_slot(
self, request_metadata: RequestMetadata
) -> Tuple[str, ReplicaQueueLengthInfo]:
"""Reserve a slot on this replica for an upcoming request.

Returns a unique token that can be used to release the slot later.
This is used in the choose_replica/dispatch pattern to track
reservations that haven't been dispatched yet.
"""
if self._replica_info.is_cross_language:
slot_token = str(uuid.uuid4())
self._reserved_slots.add(slot_token)
return slot_token, ReplicaQueueLengthInfo(
accepted=True,
num_ongoing_requests=len(self._reserved_slots),
)

slot_token = str(uuid.uuid4())
obj_ref = self._actor_handle.reserve_slot.remote(request_metadata, slot_token)
try:
accepted, num_ongoing_requests = await obj_ref
except asyncio.CancelledError:
ray.cancel(obj_ref)
self._actor_handle.release_slot.remote(slot_token)
raise

return slot_token, ReplicaQueueLengthInfo(
accepted=accepted,
num_ongoing_requests=num_ongoing_requests,
)

async def release_slot(self, slot_token: str) -> ReplicaQueueLengthInfo:
"""Release a previously reserved slot.

This should be called if a request is not dispatched after
reserving a slot (e.g., due to an error or cancellation).
"""
if self._replica_info.is_cross_language:
self._reserved_slots.discard(slot_token)
return ReplicaQueueLengthInfo(
accepted=True,
num_ongoing_requests=len(self._reserved_slots),
)

_, num_ongoing_requests = await self._actor_handle.release_slot.remote(
slot_token
)
return ReplicaQueueLengthInfo(
accepted=True,
num_ongoing_requests=num_ongoing_requests,
)


@dataclass
class ReplicaSelection:
"""Represents a selected replica, holding information for dispatch or coordination.

This class is returned by the choose_replica() context manager.
The slot reservation lifecycle is managed by the context manager.
"""

# Public, user-accessible fields
replica_id: str
"""Unique identifier for the selected replica."""

node_ip: str
"""IP address of the node running this replica."""

port: Optional[int]
"""Port number for direct communication (if configured)."""

node_id: str
"""Ray node ID where the replica is running."""

availability_zone: Optional[str]
"""Cloud availability zone of the replica's node."""

# Internal fields (not part of public API)
_replica: RunningReplica
_deployment_id: Optional[DeploymentID]
_request_metadata: RequestMetadata
_method_name: str
_slot_token: str # Token for reserved slot
_dispatched: bool = field(
default=False, init=False
) # Tracks if dispatch was called

@property
def address(self) -> str:
"""Returns the replica address in host:port format."""
if self.port:
return f"{self.node_ip}:{self.port}"
return self.node_ip

def to_dict(self) -> Dict[str, Any]:
"""Serialize public fields to a dictionary."""
return {
"replica_id": self.replica_id,
"node_ip": self.node_ip,
"port": self.port,
"node_id": self.node_id,
"availability_zone": self.availability_zone,
}

def _mark_dispatched(self) -> None:
"""Internal: Mark this selection as dispatched (slot consumed).

Raises:
RuntimeError: If the selection has already been dispatched.
"""
if self._dispatched:
raise RuntimeError(
f"ReplicaSelection for {self.replica_id} has already been dispatched. "
"Each selection can only be dispatched once."
)
self._dispatched = True

async def _release_slot(
self, *, force: bool = False
) -> Optional[ReplicaQueueLengthInfo]:
"""Internal: Release the reserved slot."""
if self._dispatched and not force:
return None

return await self._replica.release_slot(self._slot_token)
19 changes: 19 additions & 0 deletions python/ray/serve/_private/request_router/request_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,25 @@ def on_send_request(self, replica_id: ReplicaID):
self._replica_queue_len_cache.update(replica_id, new_queue_len)
self._update_router_queue_len_gauge(replica_id, new_queue_len)

def on_replica_result_finished(self, replica_id: ReplicaID):
"""Decrement queue length cache when a request finishes or is cancelled.

This is used when a reserved slot is released without being dispatched
Comment thread
jeffreywang-anyscale marked this conversation as resolved.
(e.g., in choose_replica context manager cleanup).

We cannot rely on on_new_queue_len_info() to correct the cache in this
path. The queue length cache is incremented optimistically when a slot is
reserved, before dispatch happens. If dispatch is never called or fails
before the request reaches the replica, no queue_len_info response is
produced, so the cache would otherwise remain inflated.
"""
if self._use_replica_queue_len_cache:
num_ongoing_requests = self._replica_queue_len_cache.get(replica_id) or 0
if num_ongoing_requests > 0:
new_queue_len = num_ongoing_requests - 1
self._replica_queue_len_cache.update(replica_id, new_queue_len)
self._update_router_queue_len_gauge(replica_id, new_queue_len)
Comment thread
jeffreywang-anyscale marked this conversation as resolved.

def decrement_queue_len_cache(self, replica_id: ReplicaID):
"""Decrement the queue length cache for a replica.

Expand Down
Loading
Loading