Skip to content

Commit 76ce1d1

Browse files
[Serve] Gate ingress request router body forwarding behind escape hatch (#63183)
Signed-off-by: Seiji Eicher <seiji@anyscale.com> Signed-off-by: Kourosh Hakhamaneshi <kourosh@anyscale.com> Co-authored-by: Kourosh Hakhamaneshi <kourosh@anyscale.com>
1 parent 4549a87 commit 76ce1d1

5 files changed

Lines changed: 138 additions & 39 deletions

File tree

python/ray/serve/_private/constants.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -799,10 +799,25 @@
799799
# active. Bodies longer than this are truncated; the Lua forwards what it has
800800
# with an `X-Body-Truncated: <bytes>/<content-length>` header so the router can
801801
# do best-effort prefix matching. Memory cost is ~2 * bufsize * maxconn.
802+
# Only consulted when RAY_SERVE_INGRESS_REQUEST_ROUTER_FORWARD_BODY=1.
802803
RAY_SERVE_HAPROXY_INGRESS_REQUEST_ROUTER_BUFSIZE = get_env_int(
803804
"RAY_SERVE_HAPROXY_INGRESS_REQUEST_ROUTER_BUFSIZE", 262144
804805
)
805806

807+
# Escape hatch: when true, HAProxy forwards the (possibly truncated) request
808+
# body to /internal/route and the router reads it. Off by default because for
809+
# large payloads the body buffering / re-emit cost adds noticeable time-to-
810+
# first-response. Skipping the forward is fine for any policy whose decision
811+
# does not depend on the request body: round-robin and power-of-two ignore
812+
# the body entirely, and session-aware policies key on the ``x-session-id``
813+
# header (forwarded with the request line) rather than the body.
814+
#
815+
# Flip this to true if the configured request router needs the body for its
816+
# decision, e.g. prefix-aware / prefix-cache routing.
817+
RAY_SERVE_INGRESS_REQUEST_ROUTER_FORWARD_BODY = get_env_bool(
818+
"RAY_SERVE_INGRESS_REQUEST_ROUTER_FORWARD_BODY", False
819+
)
820+
806821
RAY_SERVE_DIRECT_INGRESS_MIN_HTTP_PORT = int(
807822
os.environ.get("RAY_SERVE_DIRECT_INGRESS_MIN_HTTP_PORT", "30000")
808823
)

python/ray/serve/_private/haproxy.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
RAY_SERVE_HAPROXY_TIMEOUT_CLIENT_S,
5757
RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S,
5858
RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S,
59+
RAY_SERVE_INGRESS_REQUEST_ROUTER_FORWARD_BODY,
5960
SERVE_CONTROLLER_NAME,
6061
SERVE_LOGGER_NAME,
6162
SERVE_NAMESPACE,
@@ -804,6 +805,7 @@ def _write_ingress_request_router_lua(
804805

805806
content = _load_lua_template().substitute(
806807
TIMEOUT_S=RAY_SERVE_HAPROXY_INGRESS_REQUEST_ROUTER_TIMEOUT_S,
808+
FORWARD_BODY=str(RAY_SERVE_INGRESS_REQUEST_ROUTER_FORWARD_BODY).lower(),
807809
ROUTERS=_format_routers_lua(routers),
808810
REPLICA_TARGETS=_format_replica_targets_lua(targets),
809811
)
@@ -815,7 +817,7 @@ def _write_ingress_request_router_lua(
815817
logger.debug(f"Wrote Lua routing script to {lua_path}")
816818
return lua_path
817819

818-
def _generate_config_file_internal(self) -> None:
820+
def _generate_config_file_internal(self) -> bool:
819821
"""Internal config generation without locking (for use within locked sections)."""
820822
try:
821823
env = Environment()
@@ -872,6 +874,9 @@ def _generate_config_file_internal(self) -> None:
872874
"ingress_request_router_bufsize": (
873875
RAY_SERVE_HAPROXY_INGRESS_REQUEST_ROUTER_BUFSIZE
874876
),
877+
"ingress_request_router_forward_body": (
878+
RAY_SERVE_INGRESS_REQUEST_ROUTER_FORWARD_BODY
879+
),
875880
}
876881
)
877882

python/ray/serve/_private/haproxy_templates.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
nbthread {{ config.nbthread }}
2929
{%- if has_ingress_request_router %}
3030
lua-load-per-thread {{ ingress_request_router_lua_path }}
31+
{%- endif %}
32+
{%- if has_ingress_request_router and ingress_request_router_forward_body %}
3133
tune.bufsize {{ ingress_request_router_bufsize }}
3234
{%- endif %}
3335
{%- if config.enable_hap_optimization %}
@@ -97,7 +99,9 @@
9799
{%- endif %}
98100
{%- endfor %}
99101
acl has_ingress_request_router_app var(txn.ingress_request_router_app) -m found
102+
{%- if ingress_request_router_forward_body %}
100103
http-request wait-for-body time {{ ingress_request_router_timeout_s }}s if METH_POST has_ingress_request_router_app
104+
{%- endif %}
101105
http-request lua.route_via_ingress_request_router if METH_POST has_ingress_request_router_app
102106
# Fail loudly when the Lua dispatch did not pick a replica. Must appear
103107
# before the use_backend rules below so the request never falls back to
@@ -163,6 +167,10 @@
163167
{%- if has_ingress_request_router and backend.ingress_request_router_servers %}
164168
backend {{ backend.name or 'unknown' }}-via-ingress-request-router
165169
log global
170+
# Keep the pinned data-plane path on the same connection policy as the
171+
# primary backend. For streamed responses, forcing server-close can leave
172+
# HAProxy holding unread server-side FINs under a burst while worker
173+
# threads are still routing other requests.
166174
http-reuse always
167175
# use-server falls through to LB if the pinned server is DOWN.
168176
option redispatch

python/ray/serve/_private/ingress_request_router.lua.tmpl

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44
--
55
-- Bodies exceeding tune.bufsize are truncated; we forward what we have with
66
-- X-Body-Truncated since prefix-routing only needs the head of the body.
7+
-- See RAY_SERVE_INGRESS_REQUEST_ROUTER_FORWARD_BODY in constants.py.
78

89
local ROUTER_REQUEST_TIMEOUT_S = ${TIMEOUT_S}
10+
local FORWARD_BODY = ${FORWARD_BODY}
911

1012
-- Per-app state. The frontend sets txn.ingress_request_router_app to the
1113
-- backend name whose path prefix matched, and we look up that app's
@@ -97,17 +99,24 @@ core.register_action("route_via_ingress_request_router", {"http-req"}, function(
9799
return
98100
end
99101
100-
local body = txn.sf:req_body()
101-
if not body or body == "" then
102-
txn:set_var("txn.ingress_request_router_failed", "empty_body")
103-
return
104-
end
105-
106-
local truncated = truncated_full_length(txn, body)
107-
if truncated then
108-
core.log(core.warning,
109-
"ingress_request_router: forwarding truncated body to router ("
110-
.. #body .. "/" .. truncated .. " bytes)")
102+
-- FORWARD_BODY=false: don't read or forward the body; call the router
103+
-- with body="" so a Content-Length: 0 POST still goes through routing
104+
-- (any policy that needs the body must opt in via FORWARD_BODY=true).
105+
-- Empty body in FORWARD_BODY=true mode is treated the same -- a
106+
-- legitimate input the router can accept or reject on its own terms;
107+
-- we don't synthesize a sentinel for it here.
108+
local body = ""
109+
local truncated = nil
110+
if FORWARD_BODY then
111+
body = txn.sf:req_body() or ""
112+
if body ~= "" then
113+
truncated = truncated_full_length(txn, body)
114+
if truncated then
115+
core.log(core.warning,
116+
"ingress_request_router: forwarding truncated body to router ("
117+
.. #body .. "/" .. truncated .. " bytes)")
118+
end
119+
end
111120
end
112121
113122
local response = call_router(router, body, truncated)

python/ray/serve/tests/test_haproxy_api.py

Lines changed: 89 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,11 @@ def test_ingress_request_router_does_not_leak_into_other_backends(
583583

584584
assert "backend llm-via-ingress-request-router" in cfg
585585
assert "backend api-via-ingress-request-router" not in cfg
586+
assert "option http-buffer-request" not in cfg
587+
direct_backend = cfg.split("backend llm-via-ingress-request-router", 1)[1]
588+
direct_backend = direct_backend.split("listen stats", 1)[0]
589+
assert "http-reuse always" in direct_backend
590+
assert "option http-server-close" not in direct_backend
586591
# Only router-bearing backends contribute a set-var directive that
587592
# arms the Lua dispatch; the plain `api` backend must not.
588593
assert "set-var(txn.ingress_request_router_app) str(llm)" in cfg
@@ -651,6 +656,64 @@ def test_router_failure_503_rule_appears_before_use_backend(haproxy_api_cleanup)
651656
assert "X-Serve-Reason" in cfg, cfg
652657

653658

659+
@pytest.mark.parametrize("forward_body", [True, False])
660+
def test_ingress_request_router_forward_body_gate_renders(
661+
haproxy_api_cleanup, monkeypatch, forward_body
662+
):
663+
"""The FORWARD_BODY escape hatch must drive both:
664+
- HAProxy ``wait-for-body`` + ``tune.bufsize`` directives (memory cost
665+
and the per-request body round-trip), and
666+
- the Lua ``FORWARD_BODY`` constant (whether the action reads the body
667+
and forwards it to ``/internal/route``).
668+
669+
Off by default: round-robin ignores the body, so neither cost is paid.
670+
"""
671+
monkeypatch.setattr(
672+
"ray.serve._private.haproxy.RAY_SERVE_INGRESS_REQUEST_ROUTER_FORWARD_BODY",
673+
forward_body,
674+
)
675+
with tempfile.TemporaryDirectory() as temp_dir:
676+
api = _make_api(
677+
temp_dir,
678+
{
679+
"llm": BackendConfig(
680+
name="llm",
681+
path_prefix="/",
682+
app_name="llm",
683+
servers=[
684+
ServerConfig(
685+
name="r1",
686+
host="10.0.0.1",
687+
port=30001,
688+
replica_id="rid_1",
689+
)
690+
],
691+
ingress_request_router_servers=[
692+
ServerConfig(name="router", host="10.0.0.10", port=9000)
693+
],
694+
),
695+
},
696+
)
697+
with mock.patch(
698+
"ray.serve._private.constants.RAY_SERVE_HAPROXY_CONFIG_FILE_LOC",
699+
api.config_file_path,
700+
):
701+
api._generate_config_file_internal()
702+
with open(api.config_file_path) as f:
703+
cfg = f.read()
704+
with open(os.path.join(temp_dir, "ingress_request_router.lua")) as f:
705+
lua = f.read()
706+
707+
if forward_body:
708+
assert "tune.bufsize" in cfg, cfg
709+
assert "wait-for-body" in cfg, cfg
710+
assert "local FORWARD_BODY = true" in lua, lua
711+
else:
712+
assert "tune.bufsize" not in cfg, cfg
713+
assert "wait-for-body" not in cfg, cfg
714+
assert "local FORWARD_BODY = false" in lua, lua
715+
716+
654717
def _create_replica_server(port: int, replica_id_header: str):
655718
"""Fake data-plane replica that echoes its identity in a response header."""
656719
app = FastAPI()
@@ -670,7 +733,7 @@ async def root(path: str, req: Request, res: Response):
670733

671734
def _create_router_server(port: int, replica_id_to_return: str):
672735
"""Fake /internal/route. Captures bodies so tests can verify HAProxy
673-
actually buffered + forwarded the request."""
736+
forwards the buffered request body prefix to the router."""
674737
app = FastAPI()
675738
captured = {"bodies": []}
676739

@@ -689,14 +752,20 @@ def ready():
689752
)
690753

691754
server, thread = _serve_fastapi_app(app, port, ready)
755+
# Discard the readiness-probe body so callers see only client traffic.
756+
captured["bodies"].clear()
692757
return server, thread, captured
693758

694759

695760
@pytest.mark.asyncio
696-
async def test_ingress_request_router_end_to_end(haproxy_api_cleanup):
761+
async def test_ingress_request_router_end_to_end(haproxy_api_cleanup, monkeypatch):
697762
"""Run actual HAProxy against a fake router + two replicas; verify a POST
698763
is pinned to the replica the router selects, while a GET (which doesn't
699764
trigger the router-routed path) is not."""
765+
monkeypatch.setattr(
766+
"ray.serve._private.haproxy.RAY_SERVE_INGRESS_REQUEST_ROUTER_FORWARD_BODY",
767+
True,
768+
)
700769
with tempfile.TemporaryDirectory() as temp_dir:
701770
haproxy_port = find_free_port()
702771
stats_port = find_free_port()
@@ -787,12 +856,9 @@ async def test_ingress_request_router_end_to_end(haproxy_api_cleanup):
787856
assert resp.status_code == 200, resp.text
788857
assert resp.headers.get("x-replica-id") == "B"
789858

790-
# The router actually saw the original body (via wait-for-body +
791-
# txn.sf:req_body()). Just check the field made it through.
792-
assert any(
793-
'"prompt"' in body and "hello" in body
794-
for body in router_captured["bodies"]
795-
)
859+
# Direct streaming keeps a bounded request-body path for
860+
# prefix-cache-aware routing.
861+
assert router_captured["bodies"] == ['{"prompt": "hello"}']
796862

797863
# Repeat to confirm the pin holds across requests.
798864
for _ in range(3):
@@ -802,9 +868,10 @@ async def test_ingress_request_router_end_to_end(haproxy_api_cleanup):
802868
timeout=5,
803869
)
804870
assert resp.headers.get("x-replica-id") == "B"
871+
assert router_captured["bodies"] == ['{"prompt": "hello"}'] * 4
805872

806-
# GET is not POST, so wait-for-body / Lua never run; the router
807-
# should have seen exactly the four POSTs above and nothing more.
873+
# GET is not POST, so Lua routing never runs; the router should
874+
# have seen exactly the four POSTs above and nothing more.
808875
n_router_calls_before_get = len(router_captured["bodies"])
809876
requests.get(
810877
f"http://127.0.0.1:{haproxy_port}/health-passthrough", timeout=5
@@ -936,28 +1003,23 @@ async def test_router_failure_fails_loud_with_reason(haproxy_api_cleanup):
9361003
timeout=10,
9371004
)
9381005

939-
# Every dispatch failure mode must surface as a 5xx with a
940-
# reason label, never as a silent primary-backend fallback.
941-
# ``router_non_200``: router answered with status != 200 (forced
942-
# by the broken-router stub).
943-
# ``empty_body``: HAProxy's wait-for-body completed but the
944-
# request had no body to forward to the router.
945-
failure_cases = [
946-
(dict(json={"prompt": "hi"}), "router_non_200"),
947-
(dict(data=""), "empty_body"),
948-
]
949-
for kwargs, expected_reason in failure_cases:
1006+
# Every dispatch failure must surface as 5xx with a reason
1007+
# label, never as a silent primary-backend fallback. The broken
1008+
# router returns 500 for both empty and non-empty bodies, so
1009+
# both shapes surface the same ``router_non_200`` reason; the
1010+
# body shape is parametrized to pin that empty-body POSTs are
1011+
# routed through the router and not silently bypassed.
1012+
for body_kwargs in (dict(json={"prompt": "hi"}), dict(data="")):
9501013
for _ in range(3):
9511014
resp = requests.post(
9521015
f"http://127.0.0.1:{haproxy_port}/predict",
9531016
timeout=5,
954-
**kwargs,
955-
)
956-
assert resp.status_code == 503, (expected_reason, resp.text)
957-
assert resp.headers.get("X-Serve-Reason") == expected_reason, (
958-
expected_reason,
959-
resp.headers,
1017+
**body_kwargs,
9601018
)
1019+
assert resp.status_code == 503, resp.text
1020+
assert (
1021+
resp.headers.get("X-Serve-Reason") == "router_non_200"
1022+
), resp.headers
9611023

9621024
stats_csv = requests.get(
9631025
f"http://127.0.0.1:{stats_port}/stats;csv", timeout=5

0 commit comments

Comments
 (0)