Skip to content

Commit d4d8bb3

Browse files
Merge branch 'master' into docs/symmetric-run-docker-init
2 parents 712d99e + 2fa47df commit d4d8bb3

4 files changed

Lines changed: 245 additions & 3 deletions

File tree

python/ray/serve/_private/haproxy_templates.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,11 @@
9999
acl has_ingress_request_router_app var(txn.ingress_request_router_app) -m found
100100
http-request wait-for-body time {{ ingress_request_router_timeout_s }}s if METH_POST has_ingress_request_router_app
101101
http-request lua.route_via_ingress_request_router if METH_POST has_ingress_request_router_app
102+
# Fail loudly when the Lua dispatch did not pick a replica. Must appear
103+
# before the use_backend rules below so the request never falls back to
104+
# the primary backend (which would be a silent bypass of the configured
105+
# router policy).
106+
http-request return status 503 content-type text/plain lf-string "Ingress request router failed: %[var(txn.ingress_request_router_failed)]" hdr X-Serve-Reason %[var(txn.ingress_request_router_failed)] if { var(txn.ingress_request_router_failed) -m found }
102107
{%- endif %}
103108
# Static routing based on path prefixes in decreasing length then alphabetical order
104109
{%- for backend in backends %}

python/ray/serve/_private/ingress_request_router.lua.tmpl

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,16 @@ local function truncated_full_length(txn, body)
7676
end
7777
end
7878
79+
-- Failure semantics: every path that reaches a router decision but cannot pin
80+
-- a replica must arm txn.ingress_request_router_failed so the frontend's 503
81+
-- rule fires instead of letting the request silently fall through to the
82+
-- primary backend. The product invariant is: requests to a router-bearing app
83+
-- are served via the router or fail; there is no quiet alternative path.
84+
-- Two silent returns are correct: (1) the request didn't target a
85+
-- router-bearing app (no txn var set, no app entry in our maps), and
86+
-- (2) the controller hasn't pushed router/replica state for this app yet.
87+
-- Failure-mode bucketing belongs in observability (response header label,
88+
-- metric label), not in the data plane.
7989
core.register_action("route_via_ingress_request_router", {"http-req"}, function(txn)
8090
local app = txn:get_var("txn.ingress_request_router_app")
8191
if not app then
@@ -89,6 +99,7 @@ core.register_action("route_via_ingress_request_router", {"http-req"}, function(
8999
90100
local body = txn.sf:req_body()
91101
if not body or body == "" then
102+
txn:set_var("txn.ingress_request_router_failed", "empty_body")
92103
return
93104
end
94105
@@ -100,17 +111,24 @@ core.register_action("route_via_ingress_request_router", {"http-req"}, function(
100111
end
101112
102113
local response = call_router(router, body, truncated)
103-
if not response or not is_http_200(response) then
114+
if not response then
115+
txn:set_var("txn.ingress_request_router_failed", "router_unreachable")
116+
return
117+
end
118+
if not is_http_200(response) then
119+
txn:set_var("txn.ingress_request_router_failed", "router_non_200")
104120
return
105121
end
106122
107123
local replica_id = extract_replica_id(http_response_body(response))
108124
if not replica_id then
125+
txn:set_var("txn.ingress_request_router_failed", "unparseable_replica_id")
109126
return
110127
end
111128
112129
local server_name = replica_map[replica_id]
113130
if not server_name then
131+
txn:set_var("txn.ingress_request_router_failed", "unknown_replica_id")
114132
return
115133
end
116134

python/ray/serve/_private/replica.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2535,8 +2535,12 @@ async def _direct_ingress_asgi(
25352535
return
25362536

25372537
# If the HTTP path does not match the deployment route prefix,
2538-
# it is invalid and we should not serve it.
2539-
if not route.startswith(self._route_prefix):
2538+
# it is invalid and we should not serve it. Ingress request router
2539+
# peer deployments (e.g. LLMRouter) have no route prefix; fall
2540+
# back to "" so any path (including the empty-path ASGI edge case)
2541+
# matches and downstream user code dispatches.
2542+
route_prefix = self._route_prefix or ""
2543+
if not route.startswith(route_prefix):
25402544
for msg in convert_object_to_asgi_messages(
25412545
f"Path '{route}' not found. "
25422546
"Ping http://.../-/routes for available routes.",

python/ray/serve/tests/test_haproxy_api.py

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,68 @@ def test_ingress_request_router_does_not_leak_into_other_backends(
589589
assert "set-var(txn.ingress_request_router_app) str(api)" not in cfg
590590

591591

592+
def test_router_failure_503_rule_appears_before_use_backend(haproxy_api_cleanup):
593+
"""The 503-on-router-failure rule must be rendered before any
594+
``use_backend`` directive. If it isn't, a failed Lua dispatch would
595+
silently fall through to the primary backend and the router policy
596+
the operator opted into would be invisibly bypassed."""
597+
with tempfile.TemporaryDirectory() as temp_dir:
598+
api = _make_api(
599+
temp_dir,
600+
{
601+
"llm": BackendConfig(
602+
name="llm",
603+
path_prefix="/",
604+
app_name="llm",
605+
servers=[
606+
ServerConfig(
607+
name="r1",
608+
host="10.0.0.1",
609+
port=30001,
610+
replica_id="rid_1",
611+
)
612+
],
613+
ingress_request_router_servers=[
614+
ServerConfig(name="router", host="10.0.0.10", port=9000)
615+
],
616+
),
617+
},
618+
)
619+
with mock.patch(
620+
"ray.serve._private.constants.RAY_SERVE_HAPROXY_CONFIG_FILE_LOC",
621+
api.config_file_path,
622+
):
623+
api._generate_config_file_internal()
624+
with open(api.config_file_path) as f:
625+
cfg = f.read()
626+
627+
# Parse line-by-line so we don't accidentally match the substring
628+
# "use_backend" inside an explanatory comment block rendered above.
629+
lines = cfg.splitlines()
630+
sentinel_line = next(
631+
(
632+
i
633+
for i, ln in enumerate(lines)
634+
if "var(txn.ingress_request_router_failed) -m found" in ln
635+
),
636+
None,
637+
)
638+
first_use_backend_line = next(
639+
(i for i, ln in enumerate(lines) if ln.strip().startswith("use_backend ")),
640+
None,
641+
)
642+
assert sentinel_line is not None, cfg
643+
assert first_use_backend_line is not None, cfg
644+
assert sentinel_line < first_use_backend_line, (
645+
"503-on-router-failure rule must precede every use_backend so a "
646+
"failed dispatch does not silently fall through to the primary "
647+
"backend.\n" + cfg
648+
)
649+
# Spot-check rule shape.
650+
assert "status 503" in cfg, cfg
651+
assert "X-Serve-Reason" in cfg, cfg
652+
653+
592654
def _create_replica_server(port: int, replica_id_header: str):
593655
"""Fake data-plane replica that echoes its identity in a response header."""
594656
app = FastAPI()
@@ -764,6 +826,159 @@ async def test_ingress_request_router_end_to_end(haproxy_api_cleanup):
764826
pass
765827

766828

829+
def _create_broken_router_server(port: int, status_code: int = 500):
830+
"""Fake /internal/route that always returns ``status_code`` (default 500).
831+
Used to verify the fail-loud path: a router non-200 must surface to the
832+
client as 5xx with X-Serve-Reason, not silently fall back to a primary
833+
backend."""
834+
app = FastAPI()
835+
836+
@app.get("/-/healthz")
837+
async def health():
838+
return {"status": "OK"}
839+
840+
@app.post("/internal/route")
841+
async def route(res: Response):
842+
res.status_code = status_code
843+
return {"error": "broken"}
844+
845+
return _serve_fastapi_app(app, port, _healthz_ready(port))
846+
847+
848+
def _backend_stot(stats_csv: str, backend_name: str) -> int:
849+
"""Pull ``stot`` (cumulative sessions) for the BACKEND aggregate row of
850+
``backend_name`` from HAProxy's CSV stats. Returns -1 if not found."""
851+
lines = stats_csv.splitlines()
852+
if not lines:
853+
return -1
854+
header = lines[0].lstrip("# ").split(",")
855+
pxname_idx = header.index("pxname")
856+
svname_idx = header.index("svname")
857+
stot_idx = header.index("stot")
858+
for row in lines[1:]:
859+
parts = row.split(",")
860+
if (
861+
len(parts) > stot_idx
862+
and parts[pxname_idx] == backend_name
863+
and parts[svname_idx] == "BACKEND"
864+
):
865+
return int(parts[stot_idx] or 0)
866+
return -1
867+
868+
869+
@pytest.mark.asyncio
870+
async def test_router_failure_fails_loud_with_reason(haproxy_api_cleanup):
871+
"""When ``/internal/route`` returns non-200, HAProxy must return 5xx with
872+
``X-Serve-Reason`` rather than silently falling back to the primary
873+
backend. The primary backend's cumulative session count must stay 0."""
874+
with tempfile.TemporaryDirectory() as temp_dir:
875+
haproxy_port = find_free_port()
876+
stats_port = find_free_port()
877+
replica_port = find_free_port()
878+
router_port = find_free_port()
879+
880+
actor_name = "SERVE_REPLICA::app#dep#aaa"
881+
882+
replica, replica_thread = _create_replica_server(
883+
replica_port, replica_id_header="A"
884+
)
885+
broken_router, broken_router_thread = _create_broken_router_server(router_port)
886+
887+
try:
888+
config = HAProxyConfig(
889+
http_options=HTTPOptions(
890+
host="127.0.0.1",
891+
port=haproxy_port,
892+
keep_alive_timeout_s=58,
893+
),
894+
stats_port=stats_port,
895+
socket_path=os.path.join(temp_dir, "admin.sock"),
896+
has_received_routes=True,
897+
has_received_servers=True,
898+
health_check_path="/-/healthz",
899+
health_check_inter="500ms",
900+
health_check_rise=1,
901+
health_check_fall=2,
902+
)
903+
904+
backend = BackendConfig(
905+
name="llm",
906+
path_prefix="/",
907+
app_name="llm",
908+
health_check_path="/-/healthz",
909+
servers=[
910+
ServerConfig(
911+
name="A",
912+
host="127.0.0.1",
913+
port=replica_port,
914+
replica_id=actor_name,
915+
),
916+
],
917+
ingress_request_router_servers=[
918+
ServerConfig(name="router", host="127.0.0.1", port=router_port),
919+
],
920+
)
921+
922+
api = HAProxyApi(
923+
cfg=config,
924+
backend_configs={"llm": backend},
925+
config_file_path=os.path.join(temp_dir, "haproxy.cfg"),
926+
)
927+
haproxy_api_cleanup(api)
928+
await api.start()
929+
930+
wait_for_condition(lambda: check_haproxy_ready(stats_port), timeout=10)
931+
await async_wait_for_condition(
932+
lambda: requests.get(
933+
f"http://127.0.0.1:{haproxy_port}/-/healthz", timeout=2
934+
).status_code
935+
== 200,
936+
timeout=10,
937+
)
938+
939+
# Every dispatch failure mode must surface as a 5xx with a
940+
# reason label, never as a silent primary-backend fallback.
941+
# ``router_non_200``: router answered with status != 200 (forced
942+
# by the broken-router stub).
943+
# ``empty_body``: HAProxy's wait-for-body completed but the
944+
# request had no body to forward to the router.
945+
failure_cases = [
946+
(dict(json={"prompt": "hi"}), "router_non_200"),
947+
(dict(data=""), "empty_body"),
948+
]
949+
for kwargs, expected_reason in failure_cases:
950+
for _ in range(3):
951+
resp = requests.post(
952+
f"http://127.0.0.1:{haproxy_port}/predict",
953+
timeout=5,
954+
**kwargs,
955+
)
956+
assert resp.status_code == 503, (expected_reason, resp.text)
957+
assert resp.headers.get("X-Serve-Reason") == expected_reason, (
958+
expected_reason,
959+
resp.headers,
960+
)
961+
962+
stats_csv = requests.get(
963+
f"http://127.0.0.1:{stats_port}/stats;csv", timeout=5
964+
).text
965+
assert _backend_stot(stats_csv, "llm") == 0, stats_csv
966+
assert (
967+
_backend_stot(stats_csv, "llm-via-ingress-request-router") == 0
968+
), stats_csv
969+
finally:
970+
for srv in (replica, broken_router):
971+
try:
972+
srv.should_exit = True
973+
except Exception:
974+
pass
975+
for thr in (replica_thread, broken_router_thread):
976+
try:
977+
thr.join(timeout=5)
978+
except Exception:
979+
pass
980+
981+
767982
@pytest.mark.asyncio
768983
async def test_graceful_reload(haproxy_api_cleanup):
769984
"""Test that graceful reload preserves long-running connections."""

0 commit comments

Comments
 (0)