Skip to content

Commit 0055c7d

Browse files
eicherseijiclaudekouroshHakha
authored andcommitted
[Serve][5/5] Enable direct streaming for Ray Serve LLM apps (ray-project#63167)
Signed-off-by: Seiji Eicher <seiji@anyscale.com> Signed-off-by: Seiji Eicher <58963096+eicherseiji@users.noreply.github.com> Signed-off-by: Kourosh Hakhamaneshi <kourosh@anyscale.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: Kourosh Hakhamaneshi <kourosh@anyscale.com>
1 parent e5804b5 commit 0055c7d

12 files changed

Lines changed: 502 additions & 16 deletions

File tree

python/ray/llm/_internal/serve/constants.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@
6262
"RAYLLM_GUIDED_DECODING_BACKEND", "xgrammar"
6363
)
6464

65+
RAY_SERVE_LLM_ENABLE_DIRECT_STREAMING = (
66+
os.environ.get("RAY_SERVE_LLM_ENABLE_DIRECT_STREAMING", "0") == "1"
67+
)
68+
6569
MAX_NUM_STOPPING_SEQUENCES = int(os.getenv("RAYLLM_MAX_NUM_STOPPING_SEQUENCES", "8"))
6670
ENV_VARS_TO_PROPAGATE = {
6771
"HUGGING_FACE_HUB_TOKEN",

python/ray/llm/_internal/serve/core/engine/protocol.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,18 @@ async def check_health(self) -> None:
221221
"""
222222
return
223223

224+
async def build_asgi_app(self) -> Any:
225+
"""Build an ASGI app that serves directly from this engine's frontend.
226+
227+
Used by direct streaming, which serves traffic from the LLMServer
228+
replica's own ASGI ingress instead of the OpenAiIngress deployment.
229+
Engines that do not support direct serving should keep the default,
230+
which raises NotImplementedError.
231+
"""
232+
raise NotImplementedError(
233+
f"{type(self).__name__} does not support direct ASGI serving."
234+
)
235+
224236
##############################################################
225237
# Optional methods
226238
# These methods will be implemented in the future to allow

python/ray/llm/_internal/serve/core/ingress/builder.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
maybe_apply_llm_deployment_config_defaults,
1111
)
1212
from ray.llm._internal.common.utils.import_utils import load_class
13+
from ray.llm._internal.serve.constants import RAY_SERVE_LLM_ENABLE_DIRECT_STREAMING
1314
from ray.llm._internal.serve.core.configs.llm_config import LLMConfig
1415
from ray.llm._internal.serve.core.configs.openai_api_models import to_model_metadata
1516
from ray.llm._internal.serve.core.ingress.ingress import (
@@ -19,12 +20,46 @@
1920
from ray.llm._internal.serve.core.server.builder import (
2021
build_llm_deployment,
2122
)
23+
from ray.llm._internal.serve.core.server.llm_server import LLMServer
2224
from ray.llm._internal.serve.observability.logging import get_logger
2325
from ray.serve.deployment import Application
2426

2527
logger = get_logger(__name__)
2628

2729

30+
def _build_direct_streaming_llm_deployment(llm_config: LLMConfig) -> Application:
31+
"""Build the LLMServer deployment with late-bound ASGI ingress enabled.
32+
33+
The real ASGI app (vLLM FastAPI) is constructed inside
34+
`LLMServer.__serve_build_asgi_app__` after the engine starts.
35+
"""
36+
server_cls = llm_config.server_cls or LLMServer
37+
return build_llm_deployment(
38+
llm_config,
39+
deployment_cls=serve.ingress()(server_cls),
40+
)
41+
42+
43+
def _build_openai_ingress_request_router(*, server: Application) -> Application:
44+
"""Build the ingress request router peer for OpenAI compatible LLM apps.
45+
46+
The returned Application is attached to the ingress application with
47+
``Application._with_ingress_request_router``.
48+
49+
``num_replicas`` is pinned to 1 because HAProxy's ingress request router
50+
backend currently expects a single endpoint. TODO(eicherseiji): expose
51+
these as a user-overridable IngressRequestRouterConfig once HAProxy
52+
supports multiple router replicas.
53+
"""
54+
from ray.llm._internal.serve.core.ingress.router import LLMRouter
55+
56+
return serve.deployment(
57+
LLMRouter,
58+
num_replicas=1,
59+
max_ongoing_requests=1000,
60+
).bind(server=server)
61+
62+
2863
class IngressClsConfig(BaseModelExtended):
2964
ingress_cls: Union[str, Type[OpenAiIngress]] = Field(
3065
default=OpenAiIngress,
@@ -104,6 +139,37 @@ def _validate_model_ids(self):
104139
return self
105140

106141

142+
def _validate_direct_streaming_builder_config(
143+
builder_config: LLMServingArgs,
144+
) -> None:
145+
if len(builder_config.llm_configs) > 1:
146+
raise ValueError(
147+
"RAY_SERVE_LLM_ENABLE_DIRECT_STREAMING currently supports exactly one "
148+
"LLM config. Multi-model direct streaming requires composing multiple "
149+
"LLMServer deployments into the main application graph, which is not "
150+
"supported yet."
151+
)
152+
153+
if builder_config.ingress_deployment_config:
154+
raise ValueError(
155+
"RAY_SERVE_LLM_ENABLE_DIRECT_STREAMING does not support "
156+
"ingress_deployment_config because LLMServer is used directly as "
157+
"the ingress deployment. Configure LLMServer through each "
158+
"LLMConfig.deployment_config instead."
159+
)
160+
161+
ingress_cls_config = builder_config.ingress_cls_config
162+
if (
163+
ingress_cls_config.ingress_cls != OpenAiIngress
164+
or ingress_cls_config.ingress_extra_kwargs
165+
):
166+
raise ValueError(
167+
"RAY_SERVE_LLM_ENABLE_DIRECT_STREAMING does not support "
168+
"ingress_cls_config because LLMServer is used directly as the "
169+
"ingress deployment."
170+
)
171+
172+
107173
def build_openai_app(builder_config: dict) -> Application:
108174
"""Build an OpenAI compatible app with the llm deployment setup from
109175
the given builder configuration.
@@ -119,6 +185,20 @@ def build_openai_app(builder_config: dict) -> Application:
119185
builder_config = LLMServingArgs.model_validate(builder_config)
120186
llm_configs = builder_config.llm_configs
121187

188+
# Direct streaming attaches LLMRouter as the ingress request router and
189+
# uses the LLMServer deployment itself as the ingress app, so it returns
190+
# before the regular OpenAiIngress wiring.
191+
if RAY_SERVE_LLM_ENABLE_DIRECT_STREAMING:
192+
_validate_direct_streaming_builder_config(builder_config)
193+
direct_deployment = _build_direct_streaming_llm_deployment(llm_configs[0])
194+
logger.info(
195+
"Direct streaming enabled: "
196+
"LLMServer=ingress, LLMRouter=ingress_request_router"
197+
)
198+
return direct_deployment._with_ingress_request_router(
199+
_build_openai_ingress_request_router(server=direct_deployment)
200+
)
201+
122202
llm_deployments = {c.model_id: build_llm_deployment(c) for c in llm_configs}
123203
model_cards = {c.model_id: to_model_metadata(c.model_id, c) for c in llm_configs}
124204
lora_paths = {
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
import random
2+
from typing import FrozenSet, List, Optional, Tuple
3+
4+
from fastapi import FastAPI, HTTPException, Request
5+
6+
from ray import serve
7+
from ray.serve._private.common import ReplicaID
8+
from ray.serve.handle import DeploymentHandle
9+
10+
_BODY_TRUNCATED_HEADER = "x-body-truncated"
11+
12+
_ReplicaCacheSignature = FrozenSet[ReplicaID]
13+
14+
router_app = FastAPI()
15+
16+
17+
@serve.ingress(router_app)
18+
class LLMRouter:
19+
"""Ingress request router for direct streaming.
20+
21+
When direct streaming is enabled, HAProxy calls /internal/route on this
22+
deployment to get a data plane replica, then forwards traffic directly
23+
to the matching LLMServer replica's backend HTTP port.
24+
25+
/internal/route HTTP contract
26+
-----------------------------
27+
Request:
28+
POST /internal/route
29+
Content-Type: application/json
30+
Body: the target ChatCompletions / Completions request payload.
31+
Today the router uses round-robin and ignores the body, but it
32+
is plumbed through so future routing policies (e.g. prefix
33+
cache aware) can score replicas against ``messages`` /
34+
``prompt``. HAProxy should continue forwarding the payload
35+
(subject to truncation below).
36+
37+
Truncated bodies:
38+
HAProxy may forward only a prefix of the request body for routing.
39+
When it does, it must set the ``x-body-truncated`` header. The
40+
router forwards both the body bytes and this signal to
41+
``_pick_replica`` for future body-aware policies.
42+
43+
Responses:
44+
200 ``{"host": str, "port": int, "replica_id": str}``: pick
45+
succeeded.
46+
4xx/5xx FastAPI ``{"detail": str}``: informational only; HAProxy
47+
treats any non-200 as a routing failure.
48+
49+
Health:
50+
``GET /health`` is exposed as a human-operator convenience.
51+
Serve uses ``check_health()`` for replica readiness, not HTTP.
52+
"""
53+
54+
async def __init__(self, server: DeploymentHandle):
55+
# Randomized so multiple LLMRouter replicas don't lockstep on the
56+
# same replica sequence.
57+
self._round_robin_counter = random.randrange(2**31)
58+
self._cached_dict_id: Optional[int] = None
59+
self._cached_replica_signature: Optional[_ReplicaCacheSignature] = None
60+
self._cached_endpoints: List[Tuple[str, int, str]] = []
61+
self._handle: DeploymentHandle = server
62+
63+
# Force the handle's local router and request router to construct
64+
# synchronously so /internal/route can read them in the hot path.
65+
# `curr_replicas` is populated separately by controller broadcast;
66+
# /internal/route returns 503 (HAProxy retries) until then, which
67+
# decouples router liveness from LLMServer cold start.
68+
self._handle._init()
69+
self._request_router = self._handle._get_request_router()
70+
if self._request_router is None:
71+
raise RuntimeError(
72+
"DeploymentHandle._get_request_router() returned None after "
73+
"_init(); Serve internals may have changed."
74+
)
75+
76+
async def check_health(self):
77+
if self._handle._get_request_router() is None:
78+
raise RuntimeError("request router not initialized")
79+
80+
@router_app.post("/internal/route")
81+
async def route(self, request: Request):
82+
body = await request.body()
83+
body_truncated = _BODY_TRUNCATED_HEADER in request.headers
84+
try:
85+
host, port, replica_id = self._pick_replica(
86+
request_body=body, body_truncated=body_truncated
87+
)
88+
except RuntimeError as e:
89+
raise HTTPException(status_code=503, detail=str(e))
90+
return {"host": host, "port": port, "replica_id": replica_id}
91+
92+
@router_app.get("/health")
93+
async def health(self):
94+
return {"status": "ok"}
95+
96+
def _ready_endpoints(self) -> List[Tuple[str, int, str]]:
97+
"""Backend (host, port, full_id) tuples, cached on replica-set change."""
98+
curr_replicas = self._request_router.curr_replicas
99+
# RequestRouter swaps the dict wholesale on every controller broadcast,
100+
# so dict identity is a cheap "did anything change" check; the keyset
101+
# check then filters out broadcasts that didn't actually change the
102+
# replica set.
103+
if id(curr_replicas) == self._cached_dict_id:
104+
return self._cached_endpoints
105+
signature = frozenset(curr_replicas.keys())
106+
if signature != self._cached_replica_signature:
107+
self._cached_replica_signature = signature
108+
ready = sorted(
109+
(r for r in curr_replicas.values() if r.backend_http_endpoint),
110+
key=lambda r: r.replica_id.unique_id,
111+
)
112+
self._cached_endpoints = [
113+
(*r.backend_http_endpoint, r.replica_id.to_full_id_str()) for r in ready
114+
]
115+
self._cached_dict_id = id(curr_replicas)
116+
return self._cached_endpoints
117+
118+
def _pick_replica(
119+
self,
120+
request_body: Optional[bytes] = None,
121+
body_truncated: bool = False,
122+
) -> Tuple[str, int, str]:
123+
"""Pick a backend HTTP replica.
124+
125+
Today this is plain round-robin and ignores the payload. The
126+
``request_body`` (possibly a HAProxy-truncated prefix, indicated by
127+
``body_truncated``) is plumbed through so a future prefix cache aware
128+
policy can score replicas against the request's prompt / messages
129+
without changing the /internal/route contract or the call site.
130+
"""
131+
del request_body, body_truncated
132+
candidates = self._ready_endpoints()
133+
if not candidates:
134+
raise RuntimeError("no backend-http replicas")
135+
136+
index = self._round_robin_counter % len(candidates)
137+
self._round_robin_counter += 1
138+
return candidates[index]

python/ray/llm/_internal/serve/core/server/llm_server.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,9 @@ async def start(self):
195195
self.engine = self._engine_cls(self._llm_config)
196196
await asyncio.wait_for(self._start_engine(), timeout=ENGINE_START_TIMEOUT_S)
197197

198+
async def __serve_build_asgi_app__(self):
199+
return await self.engine.build_asgi_app()
200+
198201
def _init_multiplex_loader(
199202
self, model_downloader_cls: Optional[Type[LoraModelLoader]] = None
200203
):

python/ray/llm/_internal/serve/engines/vllm/vllm_engine.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,22 @@ def __init__(
267267
self._oai_serving_scores: Optional["ServingScores"] = None
268268
self._oai_serving_tokenization: Optional["OpenAIServingTokenization"] = None
269269

270+
async def build_asgi_app(self):
271+
from vllm.entrypoints.openai.api_server import build_app, init_app_state
272+
273+
supported_tasks = ("generate",)
274+
if hasattr(self._engine_client, "get_supported_tasks"):
275+
supported_tasks = await self._engine_client.get_supported_tasks()
276+
277+
app = build_app(self._vllm_args, supported_tasks=supported_tasks)
278+
await init_app_state(
279+
self._engine_client,
280+
app.state,
281+
self._vllm_args,
282+
supported_tasks=supported_tasks,
283+
)
284+
return app
285+
270286
async def start(self) -> None:
271287
"""Start the vLLM engine.
272288
@@ -316,6 +332,7 @@ async def start(self) -> None:
316332
merged = _convert_config_dicts(merged)
317333

318334
args = _dict_to_namespace(merged)
335+
self._vllm_args = args
319336

320337
# Query supported tasks from the engine so init_app_state initializes the correct serving objects.
321338
# Without this, vLLM falls back to 'generate' only.

0 commit comments

Comments
 (0)