Skip to content

Commit e4b95cb

Browse files
[Serve] expose choose_replica/dispatch on DeploymentHandle
Co-Authored-By: machichima <nary12321@gmail.com> Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
1 parent 82037f1 commit e4b95cb

7 files changed

Lines changed: 709 additions & 16 deletions

File tree

python/ray/serve/_private/local_testing_mode.py

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,19 @@
44
import logging
55
import queue
66
import time
7+
from contextlib import asynccontextmanager
78
from functools import wraps
8-
from typing import Any, Callable, Coroutine, Dict, List, Optional, Tuple, Union
9+
from typing import (
10+
Any,
11+
AsyncIterator,
12+
Callable,
13+
Coroutine,
14+
Dict,
15+
List,
16+
Optional,
17+
Tuple,
18+
Union,
19+
)
920

1021
import ray
1122
from ray import cloudpickle
@@ -16,6 +27,7 @@
1627
)
1728
from ray.serve._private.replica import UserCallableWrapper
1829
from ray.serve._private.replica_result import ReplicaResult
30+
from ray.serve._private.request_router.replica_wrapper import ReplicaSelection
1931
from ray.serve._private.router import Router
2032
from ray.serve._private.utils import GENERATOR_COMPOSITION_NOT_SUPPORTED_ERROR
2133
from ray.serve.deployment import Deployment
@@ -341,6 +353,39 @@ def generator_result_callback(item: Any):
341353
)
342354
return noop_future
343355

356+
@asynccontextmanager
357+
async def choose_replica(
358+
self,
359+
request_meta: RequestMetadata,
360+
*request_args,
361+
**request_kwargs,
362+
) -> AsyncIterator[ReplicaSelection]:
363+
"""Choose replica is not supported in local testing mode.
364+
365+
This is a stub implementation to satisfy the Router ABC interface.
366+
"""
367+
raise NotImplementedError(
368+
"choose_replica is not supported in local testing mode. "
369+
"Use assign_request instead."
370+
)
371+
yield # Make this a generator for asynccontextmanager
372+
373+
def dispatch(
374+
self,
375+
selection: ReplicaSelection,
376+
request_meta: RequestMetadata,
377+
*request_args,
378+
**request_kwargs,
379+
) -> concurrent.futures.Future[ReplicaResult]:
380+
"""Dispatch is not supported in local testing mode.
381+
382+
This is a stub implementation to satisfy the Router ABC interface.
383+
"""
384+
raise NotImplementedError(
385+
"dispatch is not supported in local testing mode. "
386+
"Use assign_request instead."
387+
)
388+
344389
async def broadcast(
345390
self,
346391
request_meta: RequestMetadata,

python/ray/serve/_private/router.py

Lines changed: 142 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22
import concurrent.futures
33
import logging
4+
import sys
45
import threading
56
import time
67
import weakref
@@ -564,6 +565,26 @@ def assign_request(
564565
) -> concurrent.futures.Future[ReplicaResult]:
565566
pass
566567

568+
@abstractmethod
569+
@asynccontextmanager
570+
async def choose_replica(
571+
self,
572+
request_meta: RequestMetadata,
573+
*request_args,
574+
**request_kwargs,
575+
) -> AsyncIterator[ReplicaSelection]:
576+
pass
577+
578+
@abstractmethod
579+
def dispatch(
580+
self,
581+
selection: ReplicaSelection,
582+
request_meta: RequestMetadata,
583+
*request_args,
584+
**request_kwargs,
585+
) -> concurrent.futures.Future[ReplicaResult]:
586+
pass
587+
567588
@abstractmethod
568589
async def broadcast(
569590
self,
@@ -1643,6 +1664,87 @@ def assign_request(
16431664
A concurrent.futures.Future resolving to the ReplicaResult representing
16441665
the assigned request.
16451666
"""
1667+
return self._wrap_asyncio_call_in_future(
1668+
self._asyncio_router.assign_request(
1669+
request_meta, *request_args, **request_kwargs
1670+
)
1671+
)
1672+
1673+
@asynccontextmanager
1674+
async def choose_replica(
1675+
self,
1676+
request_meta: RequestMetadata,
1677+
*request_args,
1678+
**request_kwargs,
1679+
) -> AsyncIterator[ReplicaSelection]:
1680+
"""Bridge async context manager to router event loop.
1681+
1682+
This ensures choose_replica runs on the singleton router loop,
1683+
maintaining thread safety for all state modifications.
1684+
"""
1685+
# Enter context on router loop
1686+
async def enter_context():
1687+
cm = self._asyncio_router.choose_replica(
1688+
request_meta, *request_args, **request_kwargs
1689+
)
1690+
selection = await cm.__aenter__()
1691+
return selection, cm
1692+
1693+
future = asyncio.run_coroutine_threadsafe(enter_context(), self._asyncio_loop)
1694+
selection, context_manager = await asyncio.wrap_future(future)
1695+
1696+
try:
1697+
yield selection
1698+
finally:
1699+
# Exit context on router loop
1700+
async def exit_context(exc_type, exc_val, exc_tb):
1701+
return await context_manager.__aexit__(exc_type, exc_val, exc_tb)
1702+
1703+
exc_info = sys.exc_info()
1704+
future = asyncio.run_coroutine_threadsafe(
1705+
exit_context(*exc_info), self._asyncio_loop
1706+
)
1707+
await asyncio.wrap_future(future)
1708+
1709+
def dispatch(
1710+
self,
1711+
selection: ReplicaSelection,
1712+
request_meta: RequestMetadata,
1713+
*request_args,
1714+
**request_kwargs,
1715+
) -> concurrent.futures.Future[ReplicaResult]:
1716+
"""Dispatch request to a previously selected replica."""
1717+
try:
1718+
selection._mark_dispatched()
1719+
except Exception as exc:
1720+
future = concurrent.futures.Future()
1721+
future.set_exception(exc)
1722+
return future
1723+
1724+
return self._wrap_asyncio_call_in_future(
1725+
self._asyncio_router._dispatch_to_marked_selection(
1726+
selection, request_meta, *request_args, **request_kwargs
1727+
)
1728+
)
1729+
1730+
def _wrap_asyncio_call_in_future(
1731+
self,
1732+
coro: Coroutine,
1733+
) -> concurrent.futures.Future[ReplicaResult]:
1734+
"""Wrap an async call in a concurrent.futures.Future for cross-thread execution.
1735+
1736+
This is a helper method to execute AsyncioRouter's async methods on the dedicated asyncio event loop thread.
1737+
1738+
Args:
1739+
coro: The coroutine to execute (e.g., _asyncio_router.assign_request(...))
1740+
1741+
Returns:
1742+
A concurrent.futures.Future that resolves to the ReplicaResult.
1743+
"""
1744+
# Extract operation name from coroutine for logging
1745+
operation_name = (
1746+
coro.__name__ if hasattr(coro, "__name__") else "unknown operation"
1747+
)
16461748

16471749
def asyncio_future_callback(
16481750
asyncio_future: asyncio.Future, concurrent_future: concurrent.futures.Future
@@ -1663,19 +1765,15 @@ def asyncio_future_callback(
16631765
):
16641766
result: ReplicaResult = asyncio_future.result()
16651767
logger.info(
1666-
"Asyncio task completed despite cancellation attempt. "
1667-
"Attempting to cancel the request that was assigned to a replica."
1768+
f"Asyncio task completed despite cancellation attempt during {operation_name}. "
1769+
"Attempting to cancel the request."
16681770
)
16691771
result.cancel()
16701772

16711773
concurrent_future = concurrent.futures.Future()
16721774

16731775
def create_task_and_setup():
1674-
task = self._asyncio_loop.create_task(
1675-
self._asyncio_router.assign_request(
1676-
request_meta, *request_args, **request_kwargs
1677-
)
1678-
)
1776+
task = self._asyncio_loop.create_task(coro)
16791777

16801778
# Set up your cancellation callback
16811779
task.add_done_callback(
@@ -1827,6 +1925,43 @@ def assign_request(
18271925
),
18281926
)
18291927

1928+
@asynccontextmanager
1929+
async def choose_replica(
1930+
self,
1931+
request_meta: RequestMetadata,
1932+
*request_args,
1933+
**request_kwargs,
1934+
) -> AsyncIterator[ReplicaSelection]:
1935+
"""Delegate to AsyncioRouter's choose_replica."""
1936+
async with self._asyncio_router.choose_replica(
1937+
request_meta, *request_args, **request_kwargs
1938+
) as selection:
1939+
yield selection
1940+
1941+
def dispatch(
1942+
self,
1943+
selection: ReplicaSelection,
1944+
request_meta: RequestMetadata,
1945+
*request_args,
1946+
**request_kwargs,
1947+
) -> asyncio.Future[ReplicaResult]:
1948+
"""Dispatch request to a previously selected replica.
1949+
1950+
Returns an asyncio.Future wrapping the async dispatch call.
1951+
"""
1952+
try:
1953+
selection._mark_dispatched()
1954+
except Exception as exc:
1955+
future = self._asyncio_loop.create_future()
1956+
future.set_exception(exc)
1957+
return future
1958+
1959+
return self._asyncio_loop.create_task(
1960+
self._asyncio_router._dispatch_to_marked_selection(
1961+
selection, request_meta, *request_args, **request_kwargs
1962+
)
1963+
)
1964+
18301965
async def broadcast(
18311966
self,
18321967
request_meta: RequestMetadata,

0 commit comments

Comments
 (0)