Skip to content

Commit e0fa94d

Browse files
jeffreywang-anyscalemachichima
authored andcommitted
[serve][3/3] Expose choose_replica/dispatch on deployment handles (ray-project#63255)
Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com> Co-authored-by: machichima <nary12321@gmail.com>
1 parent ba9b455 commit e0fa94d

6 files changed

Lines changed: 1052 additions & 18 deletions

File tree

python/ray/serve/_private/local_testing_mode.py

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,19 @@
44
import logging
55
import queue
66
import time
7+
from contextlib import asynccontextmanager
78
from functools import wraps
8-
from typing import Any, Callable, Coroutine, Dict, List, Optional, Tuple, Union
9+
from typing import (
10+
Any,
11+
AsyncIterator,
12+
Callable,
13+
Coroutine,
14+
Dict,
15+
List,
16+
Optional,
17+
Tuple,
18+
Union,
19+
)
920

1021
import ray
1122
from ray import cloudpickle
@@ -16,6 +27,7 @@
1627
)
1728
from ray.serve._private.replica import UserCallableWrapper
1829
from ray.serve._private.replica_result import ReplicaResult
30+
from ray.serve._private.request_router.replica_wrapper import ReplicaSelection
1931
from ray.serve._private.router import Router
2032
from ray.serve._private.utils import GENERATOR_COMPOSITION_NOT_SUPPORTED_ERROR
2133
from ray.serve.deployment import Deployment
@@ -341,6 +353,39 @@ def generator_result_callback(item: Any):
341353
)
342354
return noop_future
343355

356+
@asynccontextmanager
357+
async def choose_replica(
358+
self,
359+
request_meta: RequestMetadata,
360+
*request_args,
361+
**request_kwargs,
362+
) -> AsyncIterator[ReplicaSelection]:
363+
"""Choose replica is not supported in local testing mode.
364+
365+
This is a stub implementation to satisfy the Router ABC interface.
366+
"""
367+
raise NotImplementedError(
368+
"choose_replica is not supported in local testing mode. "
369+
"Use assign_request instead."
370+
)
371+
yield # Make this a generator for asynccontextmanager
372+
373+
def dispatch(
374+
self,
375+
selection: ReplicaSelection,
376+
request_meta: RequestMetadata,
377+
*request_args,
378+
**request_kwargs,
379+
) -> concurrent.futures.Future[ReplicaResult]:
380+
"""Dispatch is not supported in local testing mode.
381+
382+
This is a stub implementation to satisfy the Router ABC interface.
383+
"""
384+
raise NotImplementedError(
385+
"dispatch is not supported in local testing mode. "
386+
"Use assign_request instead."
387+
)
388+
344389
async def broadcast(
345390
self,
346391
request_meta: RequestMetadata,

python/ray/serve/_private/router.py

Lines changed: 153 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22
import concurrent.futures
33
import logging
4+
import sys
45
import threading
56
import time
67
import weakref
@@ -571,6 +572,26 @@ def assign_request(
571572
) -> concurrent.futures.Future[ReplicaResult]:
572573
pass
573574

575+
@abstractmethod
576+
@asynccontextmanager
577+
async def choose_replica(
578+
self,
579+
request_meta: RequestMetadata,
580+
*request_args,
581+
**request_kwargs,
582+
) -> AsyncIterator[ReplicaSelection]:
583+
yield
584+
585+
@abstractmethod
586+
def dispatch(
587+
self,
588+
selection: ReplicaSelection,
589+
request_meta: RequestMetadata,
590+
*request_args,
591+
**request_kwargs,
592+
) -> concurrent.futures.Future[ReplicaResult]:
593+
pass
594+
574595
@abstractmethod
575596
async def broadcast(
576597
self,
@@ -1681,6 +1702,98 @@ def assign_request(
16811702
A concurrent.futures.Future resolving to the ReplicaResult representing
16821703
the assigned request.
16831704
"""
1705+
return self._wrap_asyncio_call_in_future(
1706+
self._asyncio_router.assign_request(
1707+
request_meta, *request_args, **request_kwargs
1708+
)
1709+
)
1710+
1711+
@asynccontextmanager
1712+
async def choose_replica(
1713+
self,
1714+
request_meta: RequestMetadata,
1715+
*request_args,
1716+
**request_kwargs,
1717+
) -> AsyncIterator[ReplicaSelection]:
1718+
"""Bridge async context manager to router event loop.
1719+
1720+
This ensures choose_replica runs on the singleton router loop,
1721+
maintaining thread safety for all state modifications.
1722+
"""
1723+
# Enter context on router loop
1724+
async def enter_context():
1725+
cm = self._asyncio_router.choose_replica(
1726+
request_meta, *request_args, **request_kwargs
1727+
)
1728+
selection = await cm.__aenter__()
1729+
return selection, cm
1730+
1731+
async def exit_context(cm, exc_type, exc_val, exc_tb):
1732+
return await cm.__aexit__(exc_type, exc_val, exc_tb)
1733+
1734+
future = asyncio.run_coroutine_threadsafe(enter_context(), self._asyncio_loop)
1735+
try:
1736+
selection, context_manager = await asyncio.wrap_future(future)
1737+
except BaseException:
1738+
# Cancelled after __aenter__ reserved the slot but before we
1739+
# observed the result: exit the entered CM to release the slot.
1740+
if future.done() and not future.cancelled() and future.exception() is None:
1741+
entered_cm = future.result()[1]
1742+
exc_info = sys.exc_info()
1743+
cleanup = asyncio.run_coroutine_threadsafe(
1744+
exit_context(entered_cm, *exc_info), self._asyncio_loop
1745+
)
1746+
await asyncio.shield(asyncio.wrap_future(cleanup))
1747+
raise
1748+
1749+
try:
1750+
yield selection
1751+
finally:
1752+
exc_info = sys.exc_info()
1753+
future = asyncio.run_coroutine_threadsafe(
1754+
exit_context(context_manager, *exc_info), self._asyncio_loop
1755+
)
1756+
# Shielded so a cancel landing during cleanup doesn't propagate
1757+
# through wrap_future and abort __aexit__ on the router loop.
1758+
await asyncio.shield(asyncio.wrap_future(future))
1759+
1760+
def dispatch(
1761+
self,
1762+
selection: ReplicaSelection,
1763+
request_meta: RequestMetadata,
1764+
*request_args,
1765+
**request_kwargs,
1766+
) -> concurrent.futures.Future[ReplicaResult]:
1767+
"""Dispatch request to a previously selected replica."""
1768+
try:
1769+
selection._mark_dispatched()
1770+
except Exception as exc:
1771+
future = concurrent.futures.Future()
1772+
future.set_exception(exc)
1773+
return future
1774+
1775+
return self._wrap_asyncio_call_in_future(
1776+
self._asyncio_router._dispatch_to_marked_selection(
1777+
selection, request_meta, *request_args, **request_kwargs
1778+
)
1779+
)
1780+
1781+
def _wrap_asyncio_call_in_future(
1782+
self,
1783+
coro: Coroutine,
1784+
) -> concurrent.futures.Future[ReplicaResult]:
1785+
"""Wrap an async call in a concurrent.futures.Future for cross-thread execution.
1786+
1787+
This is a helper method to execute AsyncioRouter's async methods on the dedicated asyncio event loop thread.
1788+
1789+
Args:
1790+
coro: The coroutine to execute (e.g., _asyncio_router.assign_request(...))
1791+
1792+
Returns:
1793+
A concurrent.futures.Future that resolves to the ReplicaResult.
1794+
"""
1795+
# Extract operation name from coroutine for logging
1796+
operation_name = coro.__name__
16841797

16851798
def asyncio_future_callback(
16861799
asyncio_future: asyncio.Future, concurrent_future: concurrent.futures.Future
@@ -1701,19 +1814,15 @@ def asyncio_future_callback(
17011814
):
17021815
result: ReplicaResult = asyncio_future.result()
17031816
logger.info(
1704-
"Asyncio task completed despite cancellation attempt. "
1705-
"Attempting to cancel the request that was assigned to a replica."
1817+
f"Asyncio task completed despite cancellation attempt during {operation_name}. "
1818+
"Attempting to cancel the request."
17061819
)
17071820
result.cancel()
17081821

17091822
concurrent_future = concurrent.futures.Future()
17101823

17111824
def create_task_and_setup():
1712-
task = self._asyncio_loop.create_task(
1713-
self._asyncio_router.assign_request(
1714-
request_meta, *request_args, **request_kwargs
1715-
)
1716-
)
1825+
task = self._asyncio_loop.create_task(coro)
17171826

17181827
# Set up your cancellation callback
17191828
task.add_done_callback(
@@ -1865,6 +1974,43 @@ def assign_request(
18651974
),
18661975
)
18671976

1977+
@asynccontextmanager
1978+
async def choose_replica(
1979+
self,
1980+
request_meta: RequestMetadata,
1981+
*request_args,
1982+
**request_kwargs,
1983+
) -> AsyncIterator[ReplicaSelection]:
1984+
"""Delegate to AsyncioRouter's choose_replica."""
1985+
async with self._asyncio_router.choose_replica(
1986+
request_meta, *request_args, **request_kwargs
1987+
) as selection:
1988+
yield selection
1989+
1990+
def dispatch(
1991+
self,
1992+
selection: ReplicaSelection,
1993+
request_meta: RequestMetadata,
1994+
*request_args,
1995+
**request_kwargs,
1996+
) -> asyncio.Future[ReplicaResult]:
1997+
"""Dispatch request to a previously selected replica.
1998+
1999+
Returns an asyncio.Future wrapping the async dispatch call.
2000+
"""
2001+
try:
2002+
selection._mark_dispatched()
2003+
except Exception as exc:
2004+
future = self._asyncio_loop.create_future()
2005+
future.set_exception(exc)
2006+
return future
2007+
2008+
return self._asyncio_loop.create_task(
2009+
self._asyncio_router._dispatch_to_marked_selection(
2010+
selection, request_meta, *request_args, **request_kwargs
2011+
)
2012+
)
2013+
18682014
async def broadcast(
18692015
self,
18702016
request_meta: RequestMetadata,

0 commit comments

Comments
 (0)