Skip to content

[Serve][5/5] Enable direct streaming for Ray Serve LLM apps #63167

Merged
kouroshHakha merged 121 commits into
ray-project:masterfrom
eicherseiji:ingress-bypass/4-llm-route-v2
May 7, 2026
Merged

[Serve][5/5] Enable direct streaming for Ray Serve LLM apps #63167
kouroshHakha merged 121 commits into
ray-project:masterfrom
eicherseiji:ingress-bypass/4-llm-route-v2

Conversation

@eicherseiji
Copy link
Copy Markdown
Contributor

@eicherseiji eicherseiji commented May 6, 2026

Why this is needed

This PR introduces the Ray Serve LLM changes: an ingress request router that can pick a model replica, and a backend ASGI app that the chosen replica can serve directly. This PR is where the OpenAI/vLLM serving path plugs into those new Serve interfaces.

Summary

  • add LLMRouter as the LLM-side ingress request router that serves /internal/route and returns {"replica_id": ...} for a chosen backend replica
  • use the temporary DeploymentHandle._get_request_router() escape hatch so LLMRouter can reuse Serve's existing in-process replica picker instead of re-implementing replica selection logic
  • add a late-bound ASGI hook, __serve_build_asgi_app__, so LLMServer can build the vendored vLLM FastAPI app after async engine startup; Serve resolves it during replica ASGI initialization before lifespan startup
  • compose the direct-streaming app inside build_openai_app(...); this prototype validates direct streaming as single-model-only for now
# The direct-streaming app shape is selected internally when
# RAY_SERVE_LLM_ENABLE_DIRECT_STREAMING=1.
app = build_openai_app({"llm_configs": [llm_config]})

Stack: #62667 -> #62680 -> #62668 -> #62669 -> this

Test plan

  • Added / updated LLM builder unit coverage for the composed direct-streaming app shape
  • python -m py_compile on the edited files
  • git diff --check
  • Repo commit hooks / linters on commit

Benchmark Results

Direct streaming, benchmarked at 17a635aa4f887a8afdf931e65a31a74bf507d400; current PR cleanup tip is 256599ac14ae7322b2222202e3c8d9a25923ae62. vLLM 0.19.0.

Workload: Qwen/Qwen3-0.6B-FP8, 8 H100 replicas, ISL=8000, OSL=50, 4096 requests, request-rate=inf.

Benchmark invocation:

cd /home/ray/default/llm-inference-stack/replica-scaling
python sweep_concurrency.py \
  --backend ray-serve-llm-direct-streaming \
  --label qwen-0.6b-fp8-h100-ray-direct-streaming-seiji-prhead-17a635-rr-8k50-c64-128-256 \
  --concurrency 64 128 256 \
  --isl-osl-pairs 8000:50

Equivalent vllm bench serve workload shape:

vllm bench serve \
  --backend openai \
  --base-url http://127.0.0.1:8000 \
  --model Qwen/Qwen3-0.6B-FP8 \
  --dataset-name random \
  --random-input-len 8000 \
  --random-output-len 50 \
  --num-prompts 4096 \
  --max-concurrency <64|128|256> \
  --request-rate inf \
  --percentile-metrics ttft,tpot,itl,e2el \
  --save-result

App deployment code:

import os

# For cluster runs, set this in the cluster/runtime env before workers start.
# In a local script, set it before importing ray.serve.llm.
os.environ["RAY_SERVE_LLM_ENABLE_DIRECT_STREAMING"] = "1"

from ray import serve
from ray.serve.llm import LLMConfig, LLMServingArgs, ModelLoadingConfig, build_openai_app

llm_config = LLMConfig(
    model_loading_config=ModelLoadingConfig(
        model_id="Qwen/Qwen3-0.6B-FP8",
        model_source="Qwen/Qwen3-0.6B-FP8",
    ),
    engine_kwargs=dict(
        tensor_parallel_size=1,
        max_model_len=10000,
        gpu_memory_utilization=0.95,
        max_num_seqs=128,
        max_num_batched_tokens=16384,
        enable_prefix_caching=False,
        load_format="dummy",
        kv_cache_dtype="fp8",
    ),
    deployment_config=dict(
        autoscaling_config=dict(min_replicas=8, max_replicas=8),
        max_ongoing_requests=8192,
    )
)

app = build_openai_app(LLMServingArgs(llm_configs=[llm_config]))
serve.run(app)

Results:

Max concurrency Successful Failed Req/s Output tok/s Mean TTFT (ms) P99 TTFT (ms) Mean TPOT (ms) P99 TPOT (ms) Mean E2E (ms) P99 E2E (ms)
64 4096 0 122.20 6110.07 118.66 489.24 8.19 10.09 519.95 841.03
128 4096 0 149.83 7491.40 148.71 746.33 14.28 18.41 848.37 1389.91
256 4096 0 164.24 8211.87 231.20 1395.37 26.81 35.11 1544.79 2400.58

Supersedes #62670

eicherseiji added 30 commits May 1, 2026 09:35
Wire up the controller to support "ingress bypass" mode where a router
deployment (e.g. OpenAiIngress) serves /internal/route for Lua routing
decisions, while non-router deployments (e.g. LLMServer) serve data
plane traffic directly via their direct ingress ports.

Key changes:
- controller.py: _get_target_groups_for_app_with_router splits targets
  into router_targets (for Lua) and main targets (data plane)
- application_state.py: track router deployment name per app
- build_app.py, client.py, deploy_utils.py, deployment_info.py: plumb
  router flag through deployment args
- default_impl.py: factory plumbing for router deployment
- schema.py: router_targets field on TargetGroup

Signed-off-by: Seiji Eicher <seiji@anyscale.com>
- Remove unused _summarize_target, _summarize_target_groups,
  _summarize_fallback_targets helpers
- Remove unused get_fallback_targets_for_proxy_manager method
- Revert unrelated RAY_SERVE_THROUGHPUT_OPTIMIZED env var plumbing
- Fix stale docstring referencing OpenAiIngress

Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
This reverts commit 9600be7.

Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
…d json

The TargetGroup schema gained an ingress_request_router_targets field
earlier in this PR; update the json_serializable expected output to
include the empty default for both HTTP and gRPC target groups.

Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Extend HAProxy configuration to support Lua-based custom request routing
for the ingress bypass pattern. When a backend has router_targets configured,
HAProxy calls /internal/route on a router replica to get the target host:port,
then forwards the data plane request directly to that replica.

Key changes:
- haproxy.py: BackendConfig gains router_servers, custom_request_routing,
  max_server_conns, router_server fields; reload tracking; is_head
  readiness fix; summarize helpers for debugging
- haproxy_templates.py: Lua route template for /internal/route calls

Signed-off-by: Seiji Eicher <seiji@anyscale.com>
- Remove streaming-only restriction: route ALL POST requests through
  Lua when ingress bypass is enabled, not just streaming ones
- Remove killswitch file check (/tmp/ray-serve-disable-ingress-bypass-routing)
- Reduce Lua socket timeout from 5s to 500ms for fast fallback
- Revert enable_hap_optimization splitting back to single flag
- Remove reload tracking (_reload_count, _last_reload_mode)
- Remove _hard_reload and _reload_with_config indirection
- Remove get_reload_debug_summary

Signed-off-by: Seiji Eicher <seiji@anyscale.com>
@eicherseiji eicherseiji force-pushed the ingress-bypass/4-llm-route-v2 branch from 272410f to e80211d Compare May 6, 2026 23:04
- VLLMEngine: expose public vllm_args / engine_client properties; LLMServer
  uses them instead of reaching into private attrs.
- LLMRouter: use frozenset of replica IDs as the cache signature so equal-length
  in-place swaps are detected (was id() + len()).
- Direct-streaming builder: use Application.name (public) instead of
  ._bound_deployment.name.

Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Copy link
Copy Markdown
Contributor

@kouroshHakha kouroshHakha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good shape for a feature-flagged prototype — the architecture is clean and the separation of concerns (LLMRouter handles replica selection, LLMServer serves traffic, builder composes them) makes sense. A few things worth discussing before the flag goes wider:

Direct ASGI path bypasses Serve lifecycle (replica.py:2538): The TODO correctly lists what's missing (backpressure, timeouts, cancellation, tracing, per-route metrics). Can you link this to a tracking issue? Missing cancellation propagation in particular means a client disconnect won't abort the vLLM generation, which can waste GPU time under load.

LLMRouter reimplements routing policy: _ready_replicas, _round_robin_counter, and _pick_replica duplicate logic that already lives in RequestRouter.choose_replicas(). The right shape is to call the existing policy via _get_request_router().choose_replicas(...) and extract backend_http_endpoint — routing policy stays in one place and becomes configurable. When #60865 lands with DeploymentHandle.choose_replica(), this collapses to a single call. Since this is a fully internal change with no API or contract impact, it can land as a follow-up once the flag is validated.

Naming: Consider renaming router.pyllm_ingress_request_router.py and LLMRouterLLMIngressRequestRouter. The current names are ambiguous alongside Serve's own ray/serve/_private/router.py.

Note

This review was co-written with AI assistance (Claude Code).

Comment thread python/ray/llm/_internal/serve/core/ingress/router.py Outdated
Comment thread python/ray/llm/_internal/serve/core/ingress/router.py Outdated
Comment thread python/ray/llm/_internal/serve/core/ingress/router.py Outdated
Comment thread python/ray/llm/_internal/serve/core/ingress/router.py
Comment thread python/ray/llm/_internal/serve/core/ingress/router.py Outdated
def _replica_id_sort_key(replica: RunningReplica) -> str:
return replica.replica_id.unique_id

def _ready_replicas(self, request_router) -> List[RunningReplica]:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important (can land as follow-up): _ready_replicas, _cached_replica_signature, _round_robin_counter, _replica_id_sort_key, and _pick_replica reimplement replica selection that already exists in RequestRouter.choose_replicas(). The right shape is to call the existing policy via _get_request_router().choose_replicas(...) and extract backend_http_endpoint from the result — all the custom caching and counter logic goes away, and the routing policy becomes whatever Serve is already configured with rather than always being a bespoke round-robin. When #60865 lands with DeploymentHandle.choose_replica(), this becomes a single call with no internal API access at all. Since this is a fully internal change with no API or contract impact, it can land as a follow-up once the flag is validated.

Comment thread python/ray/llm/_internal/serve/core/ingress/builder.py Outdated
# into the router's recursive build.
return serve.deployment(
LLMRouter,
num_replicas=1,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important: num_replicas=1 and max_ongoing_requests=1000 are hardcoded with no user override path. Introduce an IngressRequestRouterConfig (small Pydantic model in LLMServingArgs) with these as fields and appropriate defaults. For now validate that only num_replicas is user-configurable, with an explicit error if autoscaling is set (since LLMRouter doesn't support it yet).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proposing to defer this until the root cause of ingress request router scaling issue is better understood

Comment thread python/ray/llm/_internal/serve/core/server/llm_server.py
@kouroshHakha kouroshHakha marked this pull request as ready for review May 7, 2026 00:49
@kouroshHakha kouroshHakha requested review from a team as code owners May 7, 2026 00:49
Comment thread python/ray/llm/_internal/serve/core/ingress/router.py Outdated
Comment thread python/ray/llm/_internal/serve/core/ingress/router.py Outdated
Serve core
- Replace LATE_BOUND_ASGI_APP sentinel with no-arg serve.ingress(): a
  class supplies its own ASGI app via __serve_build_asgi_app__. Drops
  the placeholder FastAPI() and the dual _set_asgi_app() init.
- Hoist _DIRECT_INGRESS_ASGI_REQ_META to a module-level constant so
  /direct ingress dispatch skips a per-request RequestMetadata
  construction and two generate_request_id() UUIDs.

LLM engine
- Add LLMEngine.build_asgi_app() to the engine protocol; VLLMEngine
  implements it. LLMServer.__serve_build_asgi_app__ is a one-line
  delegate to self.engine.build_asgi_app(). Drops the public
  vllm_args/engine_client properties that leaked vLLM internals.

LLM router
- Bind via bind(server=llm_deployment) so the handle is injected as an
  init kwarg; drops the magic-string serve.get_deployment_handle().
- Replace the awaited llm_config.remote() priming RPC with a synchronous
  DeploymentHandle._init() + cached _get_request_router(). Removes the
  60s timeout, the dependency on a method being removed in ray-project#63065, and
  the engine-cold-start coupling.
- check_health() now asserts the request router is non-None as a
  liveness signal.
- Drop GET / (HAProxy never calls it; Serve uses check_health for
  readiness). Keep GET /health as a human-operator convenience.
- Cache the RequestRouter on self at init; precompute (host, port,
  full_id_str) tuples in _ready_endpoints to skip per-request
  formatting; two-tier cache (id(curr_replicas) fast path then
  frozenset(keys) fallback) avoids the frozenset allocation when
  curr_replicas is unchanged.

Signed-off-by: Seiji Eicher <seiji@anyscale.com>
@ray-gardener ray-gardener Bot added the serve Ray Serve Related Issue label May 7, 2026
The `if self._user_callable_asgi_app is not None` short-circuit added
in the previous commits matched every @serve.ingress deployment, not
just direct streaming, and bypassed RequestMetadata setup (route,
request_id, session_id, tracing context, backpressure). That broke:

- test_replica_request_context.py::TestHTTPRoute::test_matching_fastapi_route
  (request context route returned empty string)
- test_http_headers.py::TestUserProvidedRequestIDHeader::test_fastapi
  (request_id header not parsed)
- test_http_headers.py::test_reuse_request_id

The existing flow at line 2535+ already handles ASGI apps correctly via
_call_http_entrypoint (UserMethodInfo.is_asgi_app), so direct streaming
goes through the same path with no special-case code.

Buildkite failures: ray-project/premerge#65996.

Signed-off-by: Seiji Eicher <seiji@anyscale.com>
self._vllm_args,
supported_tasks=supported_tasks,
)
return app
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

build_asgi_app skips vLLM version compatibility checks

Medium Severity

build_asgi_app unconditionally passes supported_tasks to build_app and init_app_state, and omits the vllm_config parameter entirely. The existing start() method guards both with inspect.signature checks for cross-version compatibility and passes vllm_config when the parameter exists. The new method will crash on vLLM versions that don't accept supported_tasks, and will produce an incompletely initialized app state on versions that require vllm_config.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 363f0c3. Configure here.

Seed `_round_robin_counter` with a random value so multiple LLMRouter
replicas don't lockstep on the same replica sequence. Dormant today
because HAProxy expects a single router endpoint; preempts the issue
when multi-replica routers ship.

Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Drop the dormancy aside; first sentence is the only signal.

Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

There are 2 total unresolved issues (including 1 from previous review).

Fix All in Cursor

Reviewed by Cursor Bugbot for commit 120dab7. Configure here.

(*r.backend_http_endpoint, r.replica_id.to_full_id_str()) for r in ready
]
self._cached_dict_id = id(curr_replicas)
return self._cached_endpoints
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stale cache when replica dies via in-place dict mutation

High Severity

_ready_endpoints uses id(curr_replicas) == self._cached_dict_id as a fast "nothing changed" shortcut. However, RequestRouter.on_replica_actor_died() mutates self._replicas in place via pop(), so the dict object identity remains unchanged after a replica is removed. The cache continues returning the stale endpoint list that includes the dead replica, routing requests to it until the next full controller broadcast replaces the dict wholesale.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 120dab7. Configure here.

Copy link
Copy Markdown
Contributor

@kouroshHakha kouroshHakha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving. We need to also follow up with a release test. @eicherseiji

@kouroshHakha kouroshHakha enabled auto-merge (squash) May 7, 2026 05:38
@eicherseiji
Copy link
Copy Markdown
Contributor Author

llm-gpu failures unrelated

 Test: //doc:.../working-with-llms/omni_audio_example                            
  Type: Ray Data batch inference (Qwen2-Omni audio)                            
  Time: TIMEOUT @ 911s                                                            
  ────────────────────────────────────────                                     
  Test: //doc:.../llm/doc_code/serve/transcription/transcription_example          
  Type: Ray Data batch inference (Whisper)                                        
  Time: TIMEOUT @ 905s                                                         
  ────────────────────────────────────────                                        
  Test: //doc:.../working-with-llms/vlm_image_example                             
  Type: Ray Data batch inference (VLM)                                            
  Time: TIMEOUT @ 911s               

"Cluster resources are not enough to run any task from TaskPoolMapOperator[ReadParquet]. The job may hang forever unless the cluster scales up"
https://buildkite.com/ray-project/premerge/builds/66007#019e002b-23ef-4854-a61e-5e31bed60dad

@github-actions github-actions Bot disabled auto-merge May 7, 2026 07:29
@kouroshHakha kouroshHakha merged commit 89b5722 into ray-project:master May 7, 2026
6 checks passed
kouroshHakha pushed a commit to eicherseiji/ray that referenced this pull request May 7, 2026
…e hatch

Round-robin /internal/route ignores the request body, so we make body
forwarding opt-in via RAY_SERVE_INGRESS_REQUEST_ROUTER_FORWARD_BODY (off
by default). Saves on every routed request:
- HAProxy `wait-for-body` round-trip and per-connection buffer cost
  (~2 * tune.bufsize * maxconn).
- Lua `txn.sf:req_body()` + body re-emit on the socket.
- LLMRouter `await request.body()`.

When the flag is on, the previous body-forwarding path is restored
unchanged for body-aware policies (prefix-cache, etc.):
- HAProxy frontend re-emits `wait-for-body` and `tune.bufsize`.
- Lua reads the body and forwards it (with X-Body-Truncated when truncated).
- LLMRouter reads the body and the truncation header.

The end-to-end test that asserted the body got forwarded now flips the
flag on via monkeypatch.

Stacked on ray-project#63167.

Signed-off-by: Seiji Eicher <seiji@anyscale.com>
chillCode404 pushed a commit to chillCode404/ray-contrib that referenced this pull request May 9, 2026
…ect#63167)

Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <58963096+eicherseiji@users.noreply.github.com>
Signed-off-by: Kourosh Hakhamaneshi <kourosh@anyscale.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: Kourosh Hakhamaneshi <kourosh@anyscale.com>
am-kinetica pushed a commit to kineticadb/ray that referenced this pull request May 14, 2026
…ect#63167)

Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <58963096+eicherseiji@users.noreply.github.com>
Signed-off-by: Kourosh Hakhamaneshi <kourosh@anyscale.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: Kourosh Hakhamaneshi <kourosh@anyscale.com>
Signed-off-by: anindyam1969 <amukherjee@kinetica.com>
Lucas61000 pushed a commit to Lucas61000/ray that referenced this pull request May 15, 2026
…ect#63167)

Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <58963096+eicherseiji@users.noreply.github.com>
Signed-off-by: Kourosh Hakhamaneshi <kourosh@anyscale.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: Kourosh Hakhamaneshi <kourosh@anyscale.com>
alexandrplashchinsky pushed a commit to alexandrplashchinsky/ray-alex that referenced this pull request May 29, 2026
…ect#63167)

Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <58963096+eicherseiji@users.noreply.github.com>
Signed-off-by: Kourosh Hakhamaneshi <kourosh@anyscale.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: Kourosh Hakhamaneshi <kourosh@anyscale.com>
Signed-off-by: Alexandr Plashchinsky <alexandr.plashchinsky@alexandrplashchinsky-H765G66H9V.local>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

go add ONLY when ready to merge, run all tests serve Ray Serve Related Issue

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants