diff --git a/python/ray/serve/_private/constants.py b/python/ray/serve/_private/constants.py index 3bbed7ac849f..d2bcac1c91e8 100644 --- a/python/ray/serve/_private/constants.py +++ b/python/ray/serve/_private/constants.py @@ -716,7 +716,16 @@ # HAProxy hard stop after timeout RAY_SERVE_HAPROXY_HARD_STOP_AFTER_S = int( - os.environ.get("RAY_SERVE_HAPROXY_HARD_STOP_AFTER_S", "120") + os.environ.get("RAY_SERVE_HAPROXY_HARD_STOP_AFTER_S", "1800") +) + +# Idle keep-alive timeout for HAProxy's client-side connections. Distinct +# from `HTTPOptions.keep_alive_timeout_s` (which is the uvicorn keep-alive +# on the *replica* side). Lower values force idle clients to rotate off +# old HAProxy procs faster after a reload, reducing the chance that a +# long-running request lands on a near-hard-stop-deadline proc. +RAY_SERVE_HAPROXY_TIMEOUT_HTTP_KEEP_ALIVE_S = int( + os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_HTTP_KEEP_ALIVE_S", "60") ) # HAProxy metrics export port @@ -733,16 +742,12 @@ ) # HAProxy timeout configurations (in seconds, None = no timeout) -RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S = ( - int(os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S")) - if os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S") - else None +RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S = int( + os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S", "3600") ) -RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S = ( - int(os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S")) - if os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S") - else None +RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S = int( + os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S", "5") ) # When enabled, adds 'option http-no-delay' to the HAProxy config defaults, @@ -756,6 +761,90 @@ os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_CLIENT_S", "3600") ) +# Time to wait for HAProxy to enter the running state during a graceful +# reload before giving up. The previous hardcoded 5s was too tight for +# clusters with many backends/servers under load: when reloads time out +# the new server list never takes effect, leaving HAProxy with stale +# routing (new replicas can't be added without a reload, since Ray's +# implementation regenerates the config rather than using the runtime API). +RAY_SERVE_HAPROXY_RELOAD_TIMEOUT_S = int( + os.environ.get("RAY_SERVE_HAPROXY_RELOAD_TIMEOUT_S", "5") +) + +# Number of connection-level retries per request. With option redispatch, +# each retry picks a different healthy server. The HAProxy compiled-in +# default is 3; raising this is useful in environments with high +# autoscaling churn where any given primary may be temporarily +# unreachable and a sibling can serve the request instead. +RAY_SERVE_HAPROXY_RETRIES = int(os.environ.get("RAY_SERVE_HAPROXY_RETRIES", "3")) + +# Window during which incoming controller broadcasts (target_groups, +# fallback_targets) are coalesced into a single backend update before being +# applied to HAProxy. Under autoscaling churn the controller can fire +# broadcasts tens of ms apart; without coalescing each one issues its own +# runtime-API command burst on the admin socket, which saturates HAProxy's +# CLI mux and causes timeouts (and `-x` socket-transfer failures during the +# fallback reload). 0.2s collapses typical burst clusters into one diff. +# Set to 0 to disable coalescing entirely (legacy behaviour). +RAY_SERVE_HAPROXY_BROADCAST_COALESCE_S = float( + os.environ.get("RAY_SERVE_HAPROXY_BROADCAST_COALESCE_S", "0.5") +) + +# Maximum number of HAProxy runtime-API commands (e.g. `add server`, +# `disable server`, `del server`, `enable server`) batched onto a single +# admin-socket connection. Each chunk is sent as a `;`-separated command +# string and shares one socket-level timeout. Smaller chunks reduce the +# risk of any one chunk exceeding the read timeout when HAProxy's CLI +# mux is queueing behind HTTP worker dispatch under load: the per-chunk +# overhead (~single-digit ms of socket setup) is negligible compared to +# the cost of a fallback full reload triggered by a timeout. 16 leaves +# substantial headroom even at high traffic load. +RAY_SERVE_HAPROXY_RUNTIME_CHUNK_SIZE = int( + os.environ.get("RAY_SERVE_HAPROXY_RUNTIME_CHUNK_SIZE", "16") +) + +# Connect/read timeout (seconds) for HAProxy admin-socket commands. The +# CLI mux serializes admin operations behind HTTP worker dispatch, so a +# batch of `add server` / `del server` / etc. commands can routinely take +# more than a few seconds while HAProxy is serving heavy traffic. A +# generous ceiling keeps the runtime-API path alive across slow windows +# instead of cascading into a fallback reload. +RAY_SERVE_HAPROXY_SOCKET_TIMEOUT_S = float( + os.environ.get("RAY_SERVE_HAPROXY_SOCKET_TIMEOUT_S", "60") +) + +# Total number of server slots to pre-allocate across all backends via +# HAProxy's `server-template` directive. Slots are partitioned across +# backends at config-generation time (see `_compute_slot_split`); each +# backend's share becomes the size of its `server-template` block and +# the upper bound on how many replicas it can hold before the runtime- +# API path returns False and a full reload re-computes the split. 4096 +# total comfortably covers Ray Serve clusters with hundreds of replicas +# across a few dozen backends; raise it for larger fleets, but note that +# every slot has a small HAProxy memory cost (~few KB per slot) and +# slightly inflates `show stat` output. +RAY_SERVE_HAPROXY_TOTAL_SLOTS = int( + os.environ.get("RAY_SERVE_HAPROXY_TOTAL_SLOTS", "4096") +) + +# Floor on per-backend slot allocation. Even backends with zero replicas +# get this many slots reserved so they can absorb some growth without +# requiring a reload to re-split. Set high enough to cover short bursts +# of churn but low enough that idle backends don't crowd out active ones. +# If N_backends * MIN_SLOTS exceeds TOTAL_SLOTS the split degrades to an +# equal share of (TOTAL_SLOTS // N_backends) per backend. +RAY_SERVE_HAPROXY_MIN_SLOTS_PER_BACKEND = int( + os.environ.get("RAY_SERVE_HAPROXY_MIN_SLOTS_PER_BACKEND", "32") +) + +# Headroom multiplier applied when allocating slots. A backend with N +# current replicas gets ~N * factor slots (subject to total/min limits); +# the extra slots absorb scale-up without triggering a reload. 2.0 means +# a backend can double in size between reloads before exhausting. +RAY_SERVE_HAPROXY_SLOT_HEADROOM_FACTOR = float( + os.environ.get("RAY_SERVE_HAPROXY_SLOT_HEADROOM_FACTOR", "2.0") +) + # Number of consecutive failed server health checks that must occur # before haproxy marks the server as down. RAY_SERVE_HAPROXY_HEALTH_CHECK_FALL = int( diff --git a/python/ray/serve/_private/haproxy.py b/python/ray/serve/_private/haproxy.py index 9e4b9bee371a..f7aa5d4be247 100644 --- a/python/ray/serve/_private/haproxy.py +++ b/python/ray/serve/_private/haproxy.py @@ -1,8 +1,10 @@ import asyncio import csv +import heapq import io import json import logging +import math import os import re import shutil @@ -31,6 +33,7 @@ RAY_SERVE_EXPERIMENTAL_PIP_HAPROXY, RAY_SERVE_HAPROXY_BALANCE_ALGORITHM, RAY_SERVE_HAPROXY_BINARY_PATH, + RAY_SERVE_HAPROXY_BROADCAST_COALESCE_S, RAY_SERVE_HAPROXY_CONFIG_FILE_LOC, RAY_SERVE_HAPROXY_HARD_STOP_AFTER_S, RAY_SERVE_HAPROXY_HEALTH_CHECK_DOWNINTER, @@ -40,16 +43,24 @@ RAY_SERVE_HAPROXY_HEALTH_CHECK_RISE, RAY_SERVE_HAPROXY_MAXCONN, RAY_SERVE_HAPROXY_METRICS_PORT, + RAY_SERVE_HAPROXY_MIN_SLOTS_PER_BACKEND, RAY_SERVE_HAPROXY_NBTHREAD, + RAY_SERVE_HAPROXY_RELOAD_TIMEOUT_S, + RAY_SERVE_HAPROXY_RETRIES, + RAY_SERVE_HAPROXY_RUNTIME_CHUNK_SIZE, RAY_SERVE_HAPROXY_SERVER_STATE_BASE, RAY_SERVE_HAPROXY_SERVER_STATE_FILE, + RAY_SERVE_HAPROXY_SLOT_HEADROOM_FACTOR, RAY_SERVE_HAPROXY_SOCKET_PATH, + RAY_SERVE_HAPROXY_SOCKET_TIMEOUT_S, RAY_SERVE_HAPROXY_STATS_PORT, RAY_SERVE_HAPROXY_SYSLOG_PORT, RAY_SERVE_HAPROXY_TCP_NODELAY, RAY_SERVE_HAPROXY_TIMEOUT_CLIENT_S, RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S, + RAY_SERVE_HAPROXY_TIMEOUT_HTTP_KEEP_ALIVE_S, RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S, + RAY_SERVE_HAPROXY_TOTAL_SLOTS, SERVE_CONTROLLER_NAME, SERVE_LOGGER_NAME, SERVE_NAMESPACE, @@ -318,6 +329,29 @@ class BackendConfig: # The app name for this backend. app_name: str = field(default_factory=str) + # Size of this backend's `server-template` slot pool. The rendered + # config emits `server-template srv 1-{slot_pool_size} ...` so the + # backend can hold up to `slot_pool_size` replicas before the runtime + # API has to fall back to a reload to re-split. In production this + # is set by HAProxyManager via `_compute_slot_split` on every config + # (re)generation. Defaults to a sensible non-zero value via + # `__post_init__` so direct-construction callers (typically tests) + # don't need to specify it -- a zero value would render an invalid + # `server-template srv 1-0` line. + slot_pool_size: int = 0 + + def __post_init__(self) -> None: + # If the caller didn't set a slot pool size, derive a sensible + # default from the current server count. Production sets this + # explicitly post-init via `_compute_slot_split`; this default is + # for direct-construction callers (tests) so an unset value + # doesn't produce an invalid `server-template 1-0` line. + if self.slot_pool_size <= 0: + self.slot_pool_size = max( + len(self.servers) * 2, + RAY_SERVE_HAPROXY_MIN_SLOTS_PER_BACKEND, + ) + def build_health_check_config(self, global_config: "HAProxyConfig") -> dict: """Build health check configuration for HAProxy backend. @@ -413,7 +447,17 @@ class HAProxyConfig: timeout_client_s: Optional[int] = RAY_SERVE_HAPROXY_TIMEOUT_CLIENT_S timeout_server_s: Optional[int] = RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S timeout_http_request_s: Optional[int] = None + # HAProxy's idle keep-alive timeout for client connections. Decoupled + # from `http_options.keep_alive_timeout_s`, which controls the + # *replica*'s uvicorn keep-alive on the other side of HAProxy. + timeout_http_keep_alive_s: Optional[ + int + ] = RAY_SERVE_HAPROXY_TIMEOUT_HTTP_KEEP_ALIVE_S hard_stop_after_s: Optional[int] = RAY_SERVE_HAPROXY_HARD_STOP_AFTER_S + # Number of connection-level retries per request. Used in the `defaults` + # block; combined with `option redispatch` (set per-backend) each retry + # picks a different healthy server. + retries: int = RAY_SERVE_HAPROXY_RETRIES custom_global: Dict[str, str] = field(default_factory=dict) custom_defaults: Dict[str, str] = field(default_factory=dict) inject_process_id_header: bool = False @@ -452,6 +496,18 @@ class HAProxyConfig: is_head: bool = False + @property + def transfer_socket_path(self) -> str: + """Path of the dedicated FD-transfer admin socket. + + Separate from `socket_path` so that the `-x` socket transfer during + reload (which goes through HAProxy's CLI mux) doesn't have to + compete with our runtime-API command stream (`add server`, + `del server`, etc.). Derived deterministically from `socket_path` + so existing per-node path scoping carries over automatically. + """ + return self.socket_path + ".fd" + @property def frontend_host(self) -> str: if self.http_options.host is None or self.http_options.host == "0.0.0.0": @@ -463,10 +519,6 @@ def frontend_host(self) -> str: def frontend_port(self) -> int: return self.http_options.port - @property - def timeout_http_keep_alive_s(self) -> int: - return self.http_options.keep_alive_timeout_s - def build_health_route_info(self, backends: List[BackendConfig]) -> HealthRouteInfo: if not self.has_received_routes: router_ready_for_traffic = False @@ -547,6 +599,265 @@ async def reload(self) -> None: pass +@dataclass +class _BackendDiff: + """Diff between old and new BackendConfigs for incremental update path. + + When `is_structural` is True, a full reload is required (backends added/ + removed, ACLs/timeouts/health-check parameters changed, fallback changed). + + When `is_structural` is False, only the `servers` list within one or more + backends differs, and the change can be applied via HAProxy's runtime API + (`add server` / `del server` over the admin socket) without a reload. + """ + + is_structural: bool + servers_to_add: Dict[str, List[ServerConfig]] = field(default_factory=dict) + servers_to_remove: Dict[str, List[str]] = field(default_factory=dict) + + +def _compute_backend_diff( + old_configs: Dict[str, BackendConfig], + new_configs: Dict[str, BackendConfig], +) -> _BackendDiff: + """Determine whether a config change can be applied incrementally. + + Anything other than pure server list differences within an existing + backend is treated as a structural change requiring a reload. This keeps + the incremental path conservative: when in doubt, reload. + """ + # Different set of backends (added/removed) → reload + if set(old_configs.keys()) != set(new_configs.keys()): + return _BackendDiff(is_structural=True) + + servers_to_add: Dict[str, List[ServerConfig]] = {} + servers_to_remove: Dict[str, List[str]] = {} + + for name in new_configs: + old_be = old_configs[name] + new_be = new_configs[name] + + # Any backend-level config change → reload. Compare the fields that + # affect the rendered backend block (excluding the servers list). + # `name` is included defensively: callers in this file invariably + # use the dict key as the BackendConfig.name, but the helper itself + # shouldn't assume that invariant — a name change has to reload to + # update the frontend ACL labels and `use_backend` directives. + backend_level_fields = ( + "name", + "path_prefix", + "fallback_server", + "app_name", + # slot_pool_size is part of the rendered `server-template` line, + # so any change requires a reload to take effect. + "slot_pool_size", + "timeout_connect_s", + "timeout_server_s", + "timeout_client_s", + "timeout_http_request_s", + "timeout_queue_s", + "timeout_http_keep_alive_s", + "timeout_tunnel_s", + "health_check_fall", + "health_check_rise", + "health_check_inter", + "health_check_fastinter", + "health_check_downinter", + "health_check_path", + ) + for f in backend_level_fields: + if getattr(old_be, f) != getattr(new_be, f): + return _BackendDiff(is_structural=True) + + # Compare server lists by name. A name present in both with different + # host/port is treated as remove + add (HAProxy's runtime API can't + # update a server's address in-place across all versions). + old_servers = {s.name: s for s in old_be.servers} + new_servers = {s.name: s for s in new_be.servers} + + added: List[ServerConfig] = [] + removed: List[str] = [] + + for sname, server in new_servers.items(): + if sname not in old_servers: + added.append(server) + elif old_servers[sname] != server: + added.append(server) + removed.append(sname) + + for sname in old_servers: + if sname not in new_servers: + removed.append(sname) + + if added: + servers_to_add[name] = added + if removed: + servers_to_remove[name] = removed + + return _BackendDiff( + is_structural=False, + servers_to_add=servers_to_add, + servers_to_remove=servers_to_remove, + ) + + +class BackendSlotPool: + """Tracks slot ↔ replica mapping for one backend's `server-template` block. + + HAProxy's `server-template` directive pre-allocates a fixed pool of + server slots at config-generation time. The slots are named + `` (e.g. `srv1`, `srv2`, ...) and start disabled with a + placeholder address. Once HAProxy is running we repurpose slots + dynamically via the runtime API: + + # Assign slot N to a new replica: + set server /srvN addr port + set server /srvN state ready + + # Release slot N (replica removed): + set server /srvN state maint + + The pool tracks which slots are in use and which are free; allocation + is "first-free" via a min-heap so slot numbers stay compact under + churn. `assign` is idempotent — re-assigning an already-present + replica returns its existing slot, which lets the apply loop handle + same-name-different-address transitions without bookkeeping. + """ + + def __init__(self, backend_name: str, pool_size: int): + if pool_size <= 0: + raise ValueError(f"BackendSlotPool requires pool_size > 0, got {pool_size}") + self.backend_name = backend_name + self.pool_size = pool_size + self._replica_to_slot: Dict[str, int] = {} + # min-heap of free slot numbers (1-indexed to match HAProxy CLI) + self._free_slots: List[int] = list(range(1, pool_size + 1)) + heapq.heapify(self._free_slots) + + def assign(self, replica_name: str) -> Optional[int]: + """Allocate a slot for `replica_name`. Returns the slot number, + or None if the pool is full. Idempotent: re-assigning an + already-present replica returns its existing slot.""" + existing = self._replica_to_slot.get(replica_name) + if existing is not None: + return existing + if not self._free_slots: + return None + slot = heapq.heappop(self._free_slots) + self._replica_to_slot[replica_name] = slot + return slot + + def release(self, replica_name: str) -> Optional[int]: + """Free the slot held by `replica_name`. Returns the slot number, + or None if the replica wasn't holding a slot.""" + slot = self._replica_to_slot.pop(replica_name, None) + if slot is not None: + heapq.heappush(self._free_slots, slot) + return slot + + def slot_for(self, replica_name: str) -> Optional[int]: + """Return the slot held by `replica_name`, or None if absent.""" + return self._replica_to_slot.get(replica_name) + + def in_use(self) -> int: + return len(self._replica_to_slot) + + def free(self) -> int: + return len(self._free_slots) + + def is_exhausted(self) -> bool: + return not self._free_slots + + def mapping(self) -> Dict[str, int]: + """Return a copy of the replica → slot mapping (for diagnostics).""" + return dict(self._replica_to_slot) + + def assigned_slots(self) -> Set[int]: + """Return the set of slot numbers currently assigned to replicas.""" + return set(self._replica_to_slot.values()) + + +def _compute_slot_split( + backend_to_replica_count: Dict[str, int], + total: int = RAY_SERVE_HAPROXY_TOTAL_SLOTS, + min_per_backend: int = RAY_SERVE_HAPROXY_MIN_SLOTS_PER_BACKEND, + headroom_factor: float = RAY_SERVE_HAPROXY_SLOT_HEADROOM_FACTOR, +) -> Dict[str, int]: + """Partition `total` slots across backends. + + Allocation strategy: + 1. Reserve `min_per_backend` for every backend (floor). + 2. Compute each backend's "demand" as `replicas * headroom_factor`. + 3. Distribute the remaining slots proportionally to demand. Backends + with zero replicas only get their minimum (no extra share). + + Invariants when N := len(backend_to_replica_count): + - sum(result.values()) <= total + - For each b: result[b] >= min_per_backend (when N * min <= total) + - For each b: result[b] >= 1 (always) + + Degraded path: if `N * min_per_backend > total`, we can't honor the + minimum and fall back to equal split (floor(total / N) each, with the + remainder distributed to the first few backends alphabetically). This + is a pathological case that should not occur with default settings + (4096 / 16 = 256 backends). + """ + if not backend_to_replica_count: + return {} + + n = len(backend_to_replica_count) + names = sorted(backend_to_replica_count.keys()) + + if n * min_per_backend > total: + # Pathological: too many backends to give each the minimum. Equal split. + base = max(1, total // n) + remainder = max(0, total - base * n) + return { + name: base + (1 if i < remainder else 0) for i, name in enumerate(names) + } + + reserved = n * min_per_backend + remaining = total - reserved + + demand = { + name: max(0, int(math.ceil(backend_to_replica_count[name] * headroom_factor))) + for name in names + } + total_demand = sum(demand.values()) + + extra: Dict[str, int] = {name: 0 for name in names} + if remaining > 0: + if total_demand == 0: + # No replicas anywhere — split the remainder equally. + per_backend = remaining // n + leftover = remaining - per_backend * n + for i, name in enumerate(names): + extra[name] = per_backend + (1 if i < leftover else 0) + else: + # Proportional to demand. Float-then-floor, then distribute the + # rounding remainder to backends with the largest fractional parts + # so we always allocate exactly `remaining` slots. + shares_float = { + name: remaining * demand[name] / total_demand for name in names + } + shares = {name: int(shares_float[name]) for name in names} + allocated = sum(shares.values()) + leftover = remaining - allocated + # Sort by fractional part descending; ties broken by name asc. + order = sorted( + names, + key=lambda name: ( + -(shares_float[name] - shares[name]), + name, + ), + ) + for name in order[:leftover]: + shares[name] += 1 + extra = shares + + return {name: min_per_backend + extra[name] for name in names} + + class HAProxyApi(ProxyApi): """ProxyApi implementation for HAProxy.""" @@ -564,6 +875,10 @@ def __init__( self._proc = None # Track old processes from graceful reloads that may still be draining self._old_procs: List[asyncio.subprocess.Process] = [] + # Slot pools, one per backend, sized by the most recent rendered + # config's `slot_pool_size`. Populated/reset by `_rebind_slot_pools` + # which is called after every successful (re)load. + self._slot_pools: Dict[str, BackendSlotPool] = {} # Ensure required directories exist during initialization self._initialize_directories_and_error_files() @@ -598,12 +913,151 @@ def _initialize_directories_and_error_files(self) -> None: self.cfg.error_file_path = error_file_path + async def _rebind_slot_pools(self) -> None: + """Rebuild slot pools from backend_configs and populate slots. + + Called after every successful (re)load. The new HAProxy process + starts with all template slots disabled at the placeholder + address, so we: + 1. Recreate the in-memory pools (sized per BackendConfig's + `slot_pool_size`, set by `_compute_slot_split` at + config-generation time). + 2. Assign current `backend_configs[*].servers` to slots. + 3. Send a single batched runtime-API call to point those slots + at the right addresses and bring them up. + + This handles two cases under one path: + - Production: a reload was triggered because the incremental + apply gave up (e.g. structural change, pool exhausted). The + new desired state was already written to `backend_configs` + by `try_apply_servers_dynamically` before it returned False; + this method makes HAProxy actually serve from it. + - Tests: HAProxyApi is constructed with explicit + `backend_configs={...}` and `start()` is called; this + method binds those servers to slots without requiring the + caller to make an extra runtime-API call. + + On step-3 failure we wipe `_slot_pools` AND `backend_configs` + so the next broadcast goes through the first-time-setup path + (returns False, caller reloads, this method runs again with a + fresh HAProxy admin socket). Without this wipe, the actor's + cached state would say "slots are assigned" while HAProxy's + templated slots are still at the placeholder, and subsequent + diffs would find no work to do -- HAProxy would silently serve + nothing. Failures here are rare in practice since we just + restarted HAProxy. + """ + new_pools: Dict[str, BackendSlotPool] = {} + for name, bc in self.backend_configs.items(): + if bc.slot_pool_size <= 0: + continue + new_pools[name] = BackendSlotPool(name, bc.slot_pool_size) + self._slot_pools = new_pools + + if new_pools: + sizes = {n: p.pool_size for n, p in new_pools.items()} + logger.info( + "Slot pools reset: total=%d, split=%s", + sum(sizes.values()), + sizes, + extra={"log_to_stderr": True}, + ) + + if not self.backend_configs: + return + commands: List[str] = [] + placed_count = 0 + for backend_name, bc in self.backend_configs.items(): + pool = self._slot_pools.get(backend_name) + if pool is None or not bc.servers: + continue + for server in bc.servers: + slot = pool.assign(server.name) + if slot is None: + logger.warning( + "Slot population: backend %s pool exhausted " + "(pool_size=%d). Some servers will be unreachable " + "until the next reload re-splits.", + backend_name, + pool.pool_size, + ) + break + commands.append( + f"set server {backend_name}/srv{slot} " + f"addr {server.host} port {server.port}" + ) + commands.append(f"set server {backend_name}/srv{slot} state ready") + placed_count += 1 + if not commands: + return + try: + await self._send_runtime_commands(commands) + logger.info( + "Slot population OK: %d servers placed across %d backends", + placed_count, + len(self._slot_pools), + extra={"log_to_stderr": True}, + ) + except Exception as e: + # On failure, the in-memory pool has assignments that HAProxy + # doesn't know about: the templated slots are still disabled + # at the placeholder address. The next broadcast with + # unchanged servers would see an empty diff and skip the + # work, leaving HAProxy serving nothing. Force recovery by + # wiping our cached state so the next broadcast goes through + # the first-time-setup path (returns False, caller reloads, + # this method runs again with a fresh HAProxy admin socket). + logger.error( + "Slot population failed after reload: %s. Wiping pool and " + "backend_configs to force a clean recovery on the next " + "broadcast.", + e, + extra={"log_to_stderr": True}, + ) + self._slot_pools = {} + self.backend_configs = {} + return + # Best-effort snapshot for live debugging. + self._write_slot_mapping_for_debug() + + def _write_slot_mapping_for_debug(self) -> None: + """Best-effort dump of current slot mapping to a JSON file. + + Writes `/slot_mapping.json` so operators can `cat` it + to translate `srv1`/`srv2`/... back to replica IDs without + grepping logs. Best-effort: any failure is swallowed so a write + problem can't break an otherwise-successful apply. + """ + try: + socket_dir = os.path.dirname(self.cfg.socket_path) + mapping_path = os.path.join(socket_dir, "slot_mapping.json") + payload = { + name: { + "pool_size": pool.pool_size, + "in_use": pool.in_use(), + "free": pool.free(), + "slots": { + f"srv{slot}": replica + for replica, slot in pool.mapping().items() + }, + } + for name, pool in self._slot_pools.items() + } + tmp = mapping_path + ".tmp" + with open(tmp, "w") as f: + json.dump(payload, f, indent=2, sort_keys=True) + os.replace(tmp, mapping_path) + except Exception as e: + logger.debug(f"Failed to write slot_mapping.json: {e}") + def _is_running(self) -> bool: """Check if the HAProxy process is still running.""" return self._proc is not None and self._proc.returncode is None async def _start_and_wait_for_haproxy( - self, *extra_args: str, timeout_s: int = 5 + self, + *extra_args: str, + timeout_s: int = RAY_SERVE_HAPROXY_RELOAD_TIMEOUT_S, ) -> asyncio.subprocess.Process: # Build command args haproxy_bin = get_haproxy_binary() @@ -617,65 +1071,429 @@ async def _start_and_wait_for_haproxy( logger.debug(f"Starting HAProxy with args: {args}") + spawn_start = time.monotonic() proc = await asyncio.create_subprocess_exec( *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) + spawn_elapsed = time.monotonic() - spawn_start + wait_start = time.monotonic() try: await self._wait_for_hap_availability(proc, timeout_s=timeout_s) except Exception: + wait_elapsed = time.monotonic() - wait_start + logger.warning( + "HAProxy start failed (PID=%s): spawn=%.2fs wait=%.2fs args=%s", + proc.pid, + spawn_elapsed, + wait_elapsed, + extra_args, + ) # If startup fails, ensure the process is killed to avoid orphaned processes if proc.returncode is None: proc.kill() await proc.wait() raise + wait_elapsed = time.monotonic() - wait_start + + # Drain HAProxy stderr in the background. `-db` debug mode + # emits lines per backend-state change, per health-check fail, + # etc., and with many backends a busy proxy can fill the OS + # pipe buffer (64KB default on Linux) within seconds. With no + # active reader, HAProxy then blocks on its next stderr write + # -- which freezes the admin socket (same proc) and presents + # to the actor as a 60s timeout on `show info`. Drain to a + # per-pid log file so the buffer never fills. + stderr_log_path = self._stderr_log_path_for(proc.pid) + asyncio.create_task(self._drain_proc_stderr(proc, stderr_log_path)) + + logger.info( + "HAProxy started (PID=%s): spawn=%.2fs wait=%.2fs args=%s " + "(stderr -> %s)", + proc.pid, + spawn_elapsed, + wait_elapsed, + extra_args, + stderr_log_path, + ) return proc - async def _save_server_state(self) -> None: - """Save the server state to the file.""" - server_state = await self._send_socket_command("show servers state") - with open(self.cfg.server_state_file, "w") as f: - f.write(server_state) + def _stderr_log_path_for(self, pid: int) -> str: + """Deterministic per-pid stderr log path. + + Co-located with the admin socket so callers (notably + `_wait_for_admin_socket_bound_by`) can reconstruct it from + the pid alone. + """ + return f"{self.cfg.socket_path}.stderr.{pid}.log" + + async def _drain_proc_stderr( + self, + proc: asyncio.subprocess.Process, + log_path: str, + max_bytes: int = 10_000_000, + ) -> None: + """Drain HAProxy stderr to a bounded per-pid log file.""" + try: + with open(log_path, "wb") as f: + f.write( + f"=== HAProxy stderr pid={proc.pid} " + f"started_at={time.time():.3f} ===\n".encode() + ) + f.flush() + while True: + try: + chunk = await proc.stderr.read(8192) + except Exception: + break + if not chunk: + break + if f.tell() + len(chunk) > max_bytes: + f.seek(0) + f.truncate() + f.write( + f"=== truncated at {time.time():.3f}, " + f"continuing pid={proc.pid} ===\n".encode() + ) + f.write(chunk) + f.flush() + except Exception as e: + logger.debug("HAProxy stderr drain for pid=%s ended: %s", proc.pid, e) + + async def _terminate_old_procs(self) -> None: + """SIGKILL any old HAProxy procs we're still tracking. + + Used during fresh-start recovery (when -sf/-x graceful reload isn't + possible) to free listener ports the orphans may still be holding. + Idempotent — already-dead procs are a no-op. + """ + for old_proc in self._old_procs: + if old_proc.returncode is not None: + continue + try: + old_proc.kill() + await old_proc.wait() + logger.info( + f"Killed orphaned HAProxy process (PID: {old_proc.pid}) " + "to free listener ports for fresh start." + ) + except Exception as e: + logger.warning( + f"Failed to kill orphaned HAProxy process " + f"(PID: {old_proc.pid}): {e}" + ) + self._old_procs.clear() async def _graceful_reload(self) -> None: """Perform a graceful reload of HAProxy by starting a new process with -sf.""" + reload_start = time.monotonic() + phases: List[str] = [] try: old_proc = self._proc + # If the previous HAProxy process has already exited (e.g., a prior + # reload's `-x admin.sock` socket transfer failed and the new + # process exited 1), there's nothing to graceful-reload from: no + # listening FDs to inherit and no live PID to signal. Without this + # branch every subsequent reload would re-raise "HAProxy crashed + # during startup: exit code 1" from `_wait_for_hap_availability` + # below and the actor would be stuck with no listener on 8000 + # until the controller kills it. Start a fresh process instead so + # we recover automatically; this drops in-flight connections + # briefly but is strictly better than staying down. + if old_proc is None or old_proc.returncode is not None: + logger.warning( + "Previous HAProxy process is not running " + "(returncode=%s); starting a fresh HAProxy instead of " + "attempting a graceful reload.", + old_proc.returncode if old_proc is not None else None, + ) + # Kill any tracked old procs that may still be holding the + # listener ports. Under -x failures the prior reload never + # delivered SIGUSR1 to its predecessor, so processes appended + # to `_old_procs` can stay fully bound to ports 8000 / 9101 / + # 8404 indefinitely. A fresh-start without -sf/-x doesn't + # inherit anything, so it would otherwise hit EADDRINUSE on + # those ports until the orphans finish their natural drain + # (tens of seconds at minimum). SIGKILL them up front so the + # fresh process can bind immediately. + t0 = time.monotonic() + await self._terminate_old_procs() + phases.append(f"terminate_old={time.monotonic() - t0:.2f}s") + + t0 = time.monotonic() + self._proc = await self._start_and_wait_for_haproxy() + phases.append(f"fresh_start={time.monotonic() - t0:.2f}s") + + # On fresh-start there is no old process competing for + # the admin socket, so this should resolve immediately; + # we still wait defensively in case the new proc takes + # a moment to bind. + t0 = time.monotonic() + await self._wait_for_admin_socket_bound_by(self._proc.pid) + phases.append(f"admin_wait={time.monotonic() - t0:.2f}s") + + # New process started with empty server-template slots; + # rebind pools from backend_configs and repopulate. + t0 = time.monotonic() + await self._rebind_slot_pools() + phases.append(f"rebind_slots={time.monotonic() - t0:.2f}s") + + logger.info( + "HAProxy fresh-start reload OK in %.2fs (%s)", + time.monotonic() - reload_start, + " ".join(phases), + ) + return + + t0 = time.monotonic() await self._wait_for_hap_availability(old_proc) + phases.append(f"old_proc_check={time.monotonic() - t0:.2f}s") - # Save server state if optimization is enabled - if self.cfg.enable_hap_optimization: - await self._save_server_state() + # Note: server-state file save/load is intentionally skipped. + # With `server-template`, every reload runs `_rebind_slot_pools` + # which re-assigns slots first-free from a fresh pool. Loading + # the state file in the new HAProxy would also restore the + # previous mapping; the two would collide and the same replica + # would end up bound to two slots (both routable, doubling + # traffic to it). We keep `-x` FD transfer for seamless + # listener migration since that's independent of slot state. # Start new HAProxy process with -sf flag to gracefully take over from old process # Use -x socket transfer for seamless reloads if optimization is enabled reload_args = ["-sf", str(old_proc.pid)] if self.cfg.enable_hap_optimization: - reload_args.extend(["-x", self.cfg.socket_path]) + # Use the dedicated FD-transfer socket so `_getsocks` doesn't + # queue behind in-flight runtime-API commands on the main + # admin socket. + reload_args.extend(["-x", self.cfg.transfer_socket_path]) + t0 = time.monotonic() self._proc = await self._start_and_wait_for_haproxy(*reload_args) + phases.append(f"new_proc_start={time.monotonic() - t0:.2f}s") # Track old process so we can ensure it's cleaned up during shutdown if old_proc is not None: self._old_procs.append(old_proc) + # `_start_and_wait_for_haproxy` returns as soon as the new + # proc's frontend listener accepts TCP -- which it does very + # early via `-x` FD transfer. The runtime-API admin socket + # binding lags behind that, and during the lag connections + # to the socket path still go to the OLD proc (which has + # the old config and rejects new-config commands with + # "No such backend"). Wait for the admin socket to be owned + # by the new proc before we issue any runtime commands. + t0 = time.monotonic() + await self._wait_for_admin_socket_bound_by(self._proc.pid) + phases.append(f"admin_wait={time.monotonic() - t0:.2f}s") + + # New process has empty server-template slots; rebind pools + # from backend_configs and repopulate. + t0 = time.monotonic() + await self._rebind_slot_pools() + phases.append(f"rebind_slots={time.monotonic() - t0:.2f}s") + logger.info( - "Successfully performed graceful HAProxy reload with process restart." + "HAProxy graceful reload OK in %.2fs (%s)", + time.monotonic() - reload_start, + " ".join(phases), ) except Exception as e: - logger.error(f"HAProxy graceful reload failed: {e}") + logger.error( + "HAProxy graceful reload failed after %.2fs (%s): %s", + time.monotonic() - reload_start, + " ".join(phases) if phases else "no_phases_completed", + e, + ) raise + async def _is_listener_bound(self) -> bool: + """Quick TCP-only check: is HAProxy accepting connections on its port? + + Used for readiness detection inside `_wait_for_hap_availability`. + We deliberately don't send an HTTP request here -- under startup + load (large config, many backends, server-state file replay) + HAProxy can be bound but momentarily slow to dispatch the first + HTTP requests to its worker threads, which would burn our entire + 5s startup budget on a single read timeout. TCP-bound is the + right signal for "the listener is up"; HTTP-level responsiveness + follows within milliseconds and is verified separately by the + controller's health check via `probe_http_listener`. + """ + try: + _, writer = await asyncio.wait_for( + asyncio.open_connection("127.0.0.1", self.cfg.frontend_port), + timeout=0.5, + ) + except Exception: + return False + try: + writer.close() + await writer.wait_closed() + except Exception: + pass + return True + + async def _wait_for_admin_socket_bound_by( + self, target_pid: int, timeout_s: float = 10.0 + ) -> bool: + """Wait until the admin socket is bound by the proc with `target_pid`. + + After a graceful reload, the new HAProxy process binds the + frontend listener very early (often within a few ms via the `-x` + FD transfer) but the runtime-API admin socket binding can lag + behind by tens to hundreds of milliseconds. During that window + the old proc still owns the admin-socket path, so any + `_send_socket_command` (e.g. our `set server` / `show servers + state` calls) gets routed to the OLD process -- which has the + OLD config and returns "No such backend" for templated slots in + the new config. + + Polls `show info` and parses the `Pid:` line until it matches + the new proc, or until `timeout_s` elapses. Returns True if + confirmed, False on timeout. Failures here are logged but not + fatal: callers should still attempt their runtime commands, and + if those fail the regular reload-fallback path kicks in. + """ + deadline = time.monotonic() + timeout_s + pid_re = re.compile(r"^Pid:\s*(\d+)\s*$", re.MULTILINE) + last_seen_pid: Optional[int] = None + while time.monotonic() < deadline: + try: + info = await self._send_socket_command("show info") + match = pid_re.search(info) + if match: + seen = int(match.group(1)) + last_seen_pid = seen + if seen == target_pid: + return True + except Exception: + pass + await asyncio.sleep(0.05) + logger.warning( + "Admin socket did not bind to new HAProxy pid=%d within %.1fs " + "(last seen pid=%s); proceeding anyway -- runtime commands may " + "fail and trigger another reload.", + target_pid, + timeout_s, + last_seen_pid, + ) + self._dump_admin_socket_diagnostics(target_pid) + return False + + def _dump_admin_socket_diagnostics(self, target_pid: int) -> None: + """Best-effort snapshot when the admin-socket wait times out. + + Captures filesystem state of the socket path, the new proc's + /proc//status (Sleeping vs Running gives away a write + block from a deadlock), and a tail of recent HAProxy stderr. + All best-effort -- any sub-failure is swallowed so we never + replace a real warning with a traceback. + """ + lines: List[str] = [f"Admin-socket diagnostics for pid={target_pid}:"] + + sock_path = self.cfg.socket_path + try: + st = os.stat(sock_path) + lines.append( + f" socket path {sock_path}: exists, size={st.st_size}, " + f"mtime={st.st_mtime:.3f}" + ) + except FileNotFoundError: + lines.append(f" socket path {sock_path}: MISSING") + except Exception as e: + lines.append(f" socket path {sock_path}: stat failed: {e}") + + try: + with open(f"/proc/{target_pid}/status") as f: + for line in f: + if line.startswith(("State:", "Threads:", "VmRSS:")): + lines.append(f" /proc/{target_pid}: {line.rstrip()}") + except FileNotFoundError: + lines.append(f" /proc/{target_pid}: dead (no proc entry)") + except Exception as e: + lines.append(f" /proc/{target_pid}/status: read failed: {e}") + + stderr_log = self._stderr_log_path_for(target_pid) + try: + with open(stderr_log, "rb") as f: + f.seek(0, 2) + size = f.tell() + f.seek(max(0, size - 2048)) + tail = f.read().decode("utf-8", errors="replace") + lines.append(f" stderr tail ({stderr_log}, last 2KB of {size}B):") + for ln in tail.splitlines()[-20:]: + lines.append(f" {ln}") + except FileNotFoundError: + lines.append(f" stderr log {stderr_log}: not found") + except Exception as e: + lines.append(f" stderr log {stderr_log}: read failed: {e}") + + logger.warning("\n".join(lines)) + + async def probe_http_listener(self) -> bool: + """Probe the HAProxy HTTP listener via /-/healthz. + + Returns True if HAProxy is parsing HTTP on its frontend port, + False otherwise. Used by the controller's health check, which + runs on a stable proxy and needs to differentiate "alive and + serving" from "bound but stuck". Does NOT depend on the admin + socket -- the admin socket is shared with our runtime-API + commands and the `-x` reload FD transfer, and saturates under + autoscaling churn. The HTTP listener is served by HAProxy worker + threads independent of the CLI mux, so it stays responsive when + the admin socket is busy. + + During graceful reload (where the old process is still bound to + the same port) this probe may be answered by either the old or + the new process; that's acceptable for readiness purposes because + either way the port is serving traffic. Crash detection is + handled separately by polling `proc.returncode`. + """ + try: + reader, writer = await asyncio.wait_for( + asyncio.open_connection("127.0.0.1", self.cfg.frontend_port), + timeout=2.0, + ) + except Exception: + return False + try: + writer.write( + b"GET /-/healthz HTTP/1.0\r\nHost: localhost\r\nConnection: close\r\n\r\n" + ) + await writer.drain() + response = await asyncio.wait_for(reader.read(256), timeout=2.0) + # Accept any HTTP response, not just 200. The signal we want is + # "HAProxy is alive and parsing HTTP", not "HAProxy says traffic + # is healthy". /-/healthz legitimately returns 503 in two cases: + # - drain mode (disable() sets pass_health_checks=False to + # signal upstream LB to stop routing). + # - initial startup before has_received_routes is True. + # In both states HAProxy is functioning and admin-socket-based + # `is_running()` would return True; we match that behavior. + return response.startswith(b"HTTP/") + except Exception: + return False + finally: + try: + writer.close() + await writer.wait_closed() + except Exception: + pass + async def _wait_for_hap_availability( - self, proc: asyncio.subprocess.Process, timeout_s: int = 5 + self, + proc: asyncio.subprocess.Process, + timeout_s: int = RAY_SERVE_HAPROXY_RELOAD_TIMEOUT_S, ) -> None: - start_time = time.time() + start_time = time.monotonic() + iterations = 0 - # TODO: update this to use health checks - while time.time() - start_time < timeout_s: + while time.monotonic() - start_time < timeout_s: + iterations += 1 if proc.returncode is not None: stdout = await proc.stdout.read() if proc.stdout else b"" stderr = await proc.stderr.read() if proc.stderr else b"" @@ -685,16 +1503,30 @@ async def _wait_for_hap_availability( ) raise RuntimeError( - f"HAProxy crashed during startup: {output or f'exit code {proc.returncode}'}" + f"HAProxy crashed during startup after " + f"{time.monotonic() - start_time:.2f}s, " + f"{iterations} probe attempts: " + f"{output or f'exit code {proc.returncode}'}" ) - if await self.is_running(): + # TCP-only check: is the listener accepting connections? We + # don't send an HTTP request here because the per-iteration + # cost would burn our startup budget. TCP-bound is the right + # signal for "HAProxy is up"; the controller's check_health + # uses the HTTP probe to verify ongoing serving capability. + if await self._is_listener_bound(): + logger.debug( + "HAProxy listener bound after %.2fs (%d probe attempts)", + time.monotonic() - start_time, + iterations, + ) return await asyncio.sleep(0.5) raise RuntimeError( - f"HAProxy did not enter running state within {timeout_s} seconds." + f"HAProxy did not enter running state within {timeout_s} seconds " + f"({iterations} probe attempts)." ) def _generate_config_file_internal(self) -> None: @@ -785,6 +1617,16 @@ async def start(self) -> None: logger.info("Successfully generated HAProxy config file.") self._proc = await self._start_and_wait_for_haproxy() + # No old process competes for the admin socket on initial + # start, but the new proc still needs a moment to bind it + # after the frontend listener comes up. Wait briefly so + # `_rebind_slot_pools`'s runtime-API calls don't race the + # bind. + await self._wait_for_admin_socket_bound_by(self._proc.pid) + # Rebuild in-memory pools and populate slots from any + # already-set backend_configs (test fixtures or production + # reloads that wrote backend_configs before calling start). + await self._rebind_slot_pools() logger.info("HAProxy started successfully.") except Exception as e: logger.error(f"Failed to initialize and start HAProxy configuration: {e}") @@ -796,23 +1638,52 @@ async def get_all_stats(self) -> Dict[str, Dict[str, ServerStats]]: Returns only application backends configured in self.backend_configs, excluding HAProxy internal components (frontends, default_backend, stats). Also excludes BACKEND aggregate entries, returning only individual servers. + + With server-template, `show stat` reports every templated slot + (e.g. srv1..srv4096) -- including placeholders at the disabled + address. Including those would inflate `total_servers` and skew + any caller that counts servers. We filter to: + - Slots actively assigned in our in-memory pool (i.e. bound + to a real replica). + - Slots with non-zero current sessions or queued requests + (recently released slots still draining; we want + `is_system_idle` to see those). + - The fallback server (named after the actor, not srv-prefixed). """ try: stats_output = await self._send_socket_command("show stat") all_stats = self._parse_haproxy_csv_stats(stats_output) - # Filter to only return application backends (ones in backend_configs) - # Exclude HAProxy internal components like frontends, default_backend, stats - # Also exclude BACKEND aggregate entries, keep only individual servers - return { - backend_name: { - server_name: stats - for server_name, stats in servers.items() - if server_name != "BACKEND" - } - for backend_name, servers in all_stats.items() - if backend_name in self.backend_configs - } + filtered: Dict[str, Dict[str, ServerStats]] = {} + for backend_name, servers in all_stats.items(): + if backend_name not in self.backend_configs: + continue + pool = self._slot_pools.get(backend_name) + active_slots: Set[int] = ( + pool.assigned_slots() if pool is not None else set() + ) + kept: Dict[str, ServerStats] = {} + for server_name, stats in servers.items(): + if server_name == "BACKEND": + continue + if not server_name.startswith("srv"): + # Non-template server (e.g. fallback). Keep unconditionally. + kept[server_name] = stats + continue + try: + slot_num = int(server_name[3:]) + except ValueError: + # Unexpected naming; keep defensively. + kept[server_name] = stats + continue + if slot_num in active_slots: + kept[server_name] = stats + elif stats.current_sessions > 0 or stats.queued > 0: + # Released slot still draining in-flight work. + kept[server_name] = stats + # else: placeholder slot; skip + filtered[backend_name] = kept + return filtered except Exception as e: logger.error(f"Failed to get HAProxy stats: {e}") return {} @@ -833,7 +1704,7 @@ async def _send_socket_command(self, command: str) -> str: try: reader, writer = await asyncio.wait_for( asyncio.open_unix_connection(self.cfg.socket_path), - timeout=5.0, + timeout=RAY_SERVE_HAPROXY_SOCKET_TIMEOUT_S, ) except asyncio.TimeoutError: raise RuntimeError( @@ -846,7 +1717,9 @@ async def _send_socket_command(self, command: str) -> str: # Read until EOF (HAProxy closes connection after response) try: - result_bytes = await asyncio.wait_for(reader.read(), timeout=5.0) + result_bytes = await asyncio.wait_for( + reader.read(), timeout=RAY_SERVE_HAPROXY_SOCKET_TIMEOUT_S + ) except asyncio.TimeoutError: raise RuntimeError( f"Timeout while sending command '{command}' to HAProxy socket" @@ -861,6 +1734,158 @@ async def _send_socket_command(self, command: str) -> str: except Exception as e: raise RuntimeError(f"Failed to send socket command '{command}': {e}") + async def _send_runtime_command(self, command: str) -> str: + """Send an admin-socket command and treat error responses as failures. + + HAProxy's admin socket conveys command failures by returning an + error string in the response body — not by closing the connection + or raising at the transport level. `_send_socket_command` returns + the body verbatim, so callers that only catch transport-level + exceptions (e.g. mutating commands like `add server` / `del server`) + can mistake a rejection for a success. + + This wrapper inspects the response for the well-known error markers + HAProxy emits and raises RuntimeError when present, so the caller's + `try/except` reliably catches both transport failures and + application-level rejections. + """ + result = await self._send_socket_command(command) + if not result: + return result + # HAProxy prefixes alert/warning/notice levels into the admin-socket + # response when a command is rejected (e.g., + # "[ALERT] (XYZ) : 'add server' : Backend X not found.") + # Plain string error messages without a prefix also occur — check + # for a small set of common error tokens. Any false-positive here + # is fine: the worst case is we trigger a fallback reload. + normalized = result.lower() + error_markers = ( + "[alert]", + "[warning]", + "[notice]", + "no such server", + "no such backend", + "doesn't exist", + "does not exist", + "not found", + "not allowed", + "syntax error", + "invalid argument", + "unknown command", + "must be disabled", + ) + for marker in error_markers: + if marker in normalized: + raise RuntimeError( + f"HAProxy rejected runtime command '{command}': " + f"{result.strip()}" + ) + return result + + async def _send_runtime_commands( + self, + commands: List[str], + chunk_size: int = RAY_SERVE_HAPROXY_RUNTIME_CHUNK_SIZE, + ) -> None: + """Send multiple admin-socket commands batched onto single connections. + + HAProxy's CLI accepts multiple commands per connection separated by + `;`. This lets us collapse what would otherwise be N separate Unix + socket round-trips (each contending with HTTP worker threads for + accept/dispatch) into a small number of connections, reducing the + wallclock cost of a large apply. + + chunk_size controls how many commands ride through each socket + connection. The socket-level timeout (RAY_SERVE_HAPROXY_SOCKET_TIMEOUT_S) + applies per chunk, so smaller chunks give each chunk its own budget + rather than forcing the whole batch to fit in one timeout window. + Under load HAProxy's CLI mux serializes admin operations behind HTTP + worker dispatch, so the dominant cost is HAProxy's processing time + per command, not per-chunk socket setup -- keeping chunks small + trades a small amount of extra wallclock for a much higher chance of + completing the runtime-API path instead of timing out into a full + reload. + + Error detection: substring-match the concatenated response for known + HAProxy error markers and raise on the first hit. Loses per-command + attribution (we don't know which command in the batch failed), but + the caller's response to any failure is the same fallback reload + regardless. + + On failure, logs per-chunk wallclock so we can distinguish "just over + the timeout budget" from "HAProxy genuinely wedged" -- the right fix + for the two is different (smaller chunks vs. investigating HAProxy + state). + + chunk_size stays comfortably under HAProxy's default 16 KB CLI + buffer (`tune.bufsize`) for typical command lengths (~80 bytes). + """ + if not commands: + return + error_markers = ( + "[alert]", + "[warning]", + "[notice]", + "no such server", + "no such backend", + "doesn't exist", + "does not exist", + "not found", + "not allowed", + "syntax error", + "invalid argument", + "unknown command", + "must be disabled", + ) + num_chunks = (len(commands) + chunk_size - 1) // chunk_size + chunk_times: List[float] = [] + batch_start = time.monotonic() + for i in range(0, len(commands), chunk_size): + chunk = commands[i : i + chunk_size] + batched = ";".join(chunk) + chunk_start = time.monotonic() + try: + result = await self._send_socket_command(batched) + except Exception: + # Per-chunk wallclock tells us whether we're at "just over + # budget" (e.g. 15.0s = hit the timeout exactly, smaller chunks + # or larger timeout would help) or "HAProxy is wedged" (much + # longer than that, something else is wrong). + logger.warning( + "Runtime API chunk %d/%d (size=%d) failed after %.2fs; " + "total_commands=%d, prior_chunk_times=%s, " + "batch_elapsed=%.2fs", + i // chunk_size + 1, + num_chunks, + len(chunk), + time.monotonic() - chunk_start, + len(commands), + [f"{t:.2f}s" for t in chunk_times], + time.monotonic() - batch_start, + extra={"log_to_stderr": False}, + ) + raise + chunk_times.append(time.monotonic() - chunk_start) + if not result: + continue + normalized = result.lower() + for marker in error_markers: + if marker in normalized: + raise RuntimeError( + f"HAProxy rejected one or more runtime commands in " + f"batch of {len(chunk)} (chunk starts at index {i}): " + f"{result.strip()[:500]}" + ) + logger.info( + "Runtime API batch OK: %d commands, %d chunks, chunk_times=%s, " + "total=%.2fs", + len(commands), + num_chunks, + [f"{t:.2f}s" for t in chunk_times], + time.monotonic() - batch_start, + extra={"log_to_stderr": True}, + ) + @staticmethod def _parse_haproxy_csv_stats( stats_output: str, @@ -976,6 +2001,156 @@ def set_backend_configs( len(bc.servers) > 0 for bc in backend_configs.values() ) + async def try_apply_servers_dynamically( + self, + new_backend_configs: Dict[str, BackendConfig], + ) -> bool: + """Try to apply server-list changes via HAProxy's runtime API. + + Compares ``new_backend_configs`` to the current ``self.backend_configs``. + If the change is purely server adds/removes within existing backends + with unchanged slot-pool sizes, applies the diff over the admin socket + using `set server /srvN addr ... port ...` plus + `set server ... state ready/maint` against the pre-allocated + `server-template` slots. Otherwise falls back to a full reload. + + The slot pool model trades up-front memory (one templated server per + slot, regardless of whether it's in use) for runtime-API efficiency: + we never need to `add server`/`del server` -- which mutate HAProxy's + per-backend server list under lock -- only repurpose pre-existing + slots. The result is fewer admin-socket commands per broadcast and + no server-list mutation contention with HTTP workers. + + Slot accounting (which slot holds which replica) lives in + ``self._slot_pools`` and is reset to "all free" after every + successful (re)load, since the new HAProxy process starts with all + template slots disabled at the placeholder address. + + Returns True if applied incrementally (no reload needed). Returns + False if a reload is required (caller is responsible for calling + ``reload()``). + """ + # First-time setup or no current state to diff against → reload. + if not self.backend_configs: + self.set_backend_configs(new_backend_configs) + return False + + # If HAProxy isn't running yet (initial startup), the runtime API + # isn't available. Fall back to reload, which will start HAProxy. + if not await self.is_running(): + self.set_backend_configs(new_backend_configs) + return False + + diff = _compute_backend_diff(self.backend_configs, new_backend_configs) + if diff.is_structural: + # Structural change (which includes slot_pool_size shifts) → reload. + self.set_backend_configs(new_backend_configs) + return False + + if not diff.servers_to_add and not diff.servers_to_remove: + # No-op: server lists are identical. + self.set_backend_configs(new_backend_configs) + return True + + # Build the command list. Process removals before adds so freed + # slots are available for new replicas in the same broadcast -- + # otherwise a 1:1 churn (5 in, 5 out) would temporarily need 2x + # the slots. + commands: List[str] = [] + added_count = 0 + removed_count = 0 + + for backend_name, server_names in diff.servers_to_remove.items(): + pool = self._slot_pools.get(backend_name) + if pool is None: + # No pool means the manager has produced a config we + # haven't loaded into HAProxy yet. Falling back to reload + # is the safe choice: it'll regenerate the config and + # rebuild pools. + logger.warning( + "No slot pool for backend %s; falling back to reload.", + backend_name, + ) + self.set_backend_configs(new_backend_configs) + return False + for sname in server_names: + slot = pool.release(sname) + if slot is None: + # Replica wasn't holding a slot. This can happen if a + # prior apply failed mid-way; benign, skip. + continue + commands.append(f"set server {backend_name}/srv{slot} state maint") + removed_count += 1 + + for backend_name, servers in diff.servers_to_add.items(): + pool = self._slot_pools.get(backend_name) + if pool is None: + logger.warning( + "No slot pool for backend %s; falling back to reload.", + backend_name, + ) + self.set_backend_configs(new_backend_configs) + return False + for server in servers: + slot = pool.assign(server.name) + if slot is None: + logger.warning( + "Backend %s slot pool exhausted " + "(pool_size=%d, in_use=%d). Falling back to reload " + "to re-split slots with bigger share for this " + "backend.", + backend_name, + pool.pool_size, + pool.in_use(), + ) + self.set_backend_configs(new_backend_configs) + return False + commands.append( + f"set server {backend_name}/srv{slot} " + f"addr {server.host} port {server.port}" + ) + commands.append(f"set server {backend_name}/srv{slot} state ready") + added_count += 1 + + if not commands: + # All adds/removes were no-ops (e.g. removes for replicas that + # never got slot-assigned). Update state and we're done. + self.set_backend_configs(new_backend_configs) + return True + + try: + await self._send_runtime_commands(commands) + except Exception as e: + logger.warning( + f"Failed to apply backend update incrementally via runtime " + f"API: {e}. Falling back to a full reload.", + extra={"log_to_stderr": False}, + ) + self.set_backend_configs(new_backend_configs) + return False + + # Update internal state and regenerate the config file so any + # subsequent reload starts from the current set of servers. + self.set_backend_configs(new_backend_configs) + try: + self._generate_config_file_internal() + except Exception as e: + # Non-fatal: runtime change already applied; next reload rebuilds. + logger.warning( + f"Applied backend update via runtime API but failed to " + f"refresh the config file: {e}", + extra={"log_to_stderr": False}, + ) + + logger.info( + f"Applied backend update via runtime API: " + f"{added_count} server(s) added, {removed_count} server(s) removed.", + extra={"log_to_stderr": False}, + ) + # Best-effort slot-mapping snapshot for live debugging. + self._write_slot_mapping_for_debug() + return True + async def is_running(self) -> bool: try: await self._send_socket_command("show info") @@ -1026,6 +2201,30 @@ def __init__( # which can cause race conditions with SO_REUSEPORT self._reload_lock = asyncio.Lock() + # Coalesce controller broadcasts: when a broadcast arrives we schedule + # a single sleeping flush task. Subsequent broadcasts during the sleep + # are absorbed implicitly because they overwrite `self._target_groups` + # / fallback fields in place; the flush picks up the latest snapshot + # when it wakes. This collapses bursts of replica-add/remove events + # (common during autoscaling churn) into one runtime-API command pile, + # which keeps the HAProxy admin socket from saturating. + # `_coalesce_pending` lets the flush task notice broadcasts that + # arrive during a long-running apply and trigger another iteration + # so the trailing broadcast in a flurry isn't dropped. + self._coalesce_window_s = RAY_SERVE_HAPROXY_BROADCAST_COALESCE_S + self._coalesce_task: Optional[asyncio.Task] = None + self._coalesce_pending = False + + # Cached slot-pool split, keyed by backend name. Recomputed only + # when the set of backends changes or a backend's replica count + # exceeds its current allocation -- without this caching, the + # proportional split would shift on every broadcast (since + # changing any backend's replica count changes the ratios for + # all backends), every broadcast would see `slot_pool_size` + # differ in the diff, and every broadcast would trigger a + # reload, defeating the runtime-API path entirely. + self._current_split: Dict[str, int] = {} + self.long_poll_client = long_poll_client or LongPollClient( ray.get_actor(SERVE_CONTROLLER_NAME, namespace=SERVE_NAMESPACE), { @@ -1186,9 +2385,16 @@ async def check_health(self) -> bool: # If haproxy is already shutdown, return False. if not self._haproxy or not self._haproxy._proc: return False + if self._haproxy._proc.returncode is not None: + return False logger.debug("Received health check.", extra={"log_to_stderr": False}) - return await self._haproxy.is_running() + # Probe the HTTP listener instead of the admin socket. See + # HAProxyApi.probe_http_listener for the rationale -- briefly: the + # admin socket competes with runtime-API load, and a saturated + # admin socket would otherwise cause the controller to kill the + # actor and trigger a reload-orphan-port cascade. + return await self._haproxy.probe_http_listener() def pong(self) -> str: pass @@ -1260,7 +2466,47 @@ async def _reload_haproxy(self) -> None: await self._haproxy_start_task await self._haproxy.reload() - def _update_haproxy_backends(self) -> None: + async def _apply_backend_update( + self, name_to_backend_configs: Dict[str, BackendConfig] + ) -> None: + """Apply a backend config update incrementally if possible, else reload. + + At scale, each controller broadcast triggers a config update. Doing a + full HAProxy reload for every update blocks the actor's event loop + long enough to miss the controller's health-check window, which causes + the controller to kill and recreate the proxy actor — leading to a + cascading failure where every proxy in the cluster cycles continuously. + + The runtime API path applies pure server adds/removes in milliseconds + without a reload. For structural changes (new backend, ACL change, + timeout change, etc.) we still fall back to a full reload. + """ + async with self._reload_lock: + await self._haproxy_start_task + applied_incrementally = await self._haproxy.try_apply_servers_dynamically( + name_to_backend_configs + ) + if not applied_incrementally: + await self._haproxy.reload() + + def _build_name_to_backend_configs(self) -> Dict[str, BackendConfig]: + """Snapshot the current target groups + fallbacks into backend configs. + + Stamps each BackendConfig with its slot-pool size from the + cached split. The split is recomputed only when: + - The set of backends changes (a backend is added or removed), or + - Any backend's current replica count exceeds its allocated + slot pool size (i.e. it has overflowed and needs more). + + Without this caching, the proportional split would shift on + every replica-count change (since the ratios reallocate among + all backends). That would make `slot_pool_size` differ in the + diff on every broadcast, marking it as a structural change, + and forcing a reload on every broadcast -- defeating the entire + runtime-API path. With caching, the split is stable until a + backend overflows or the topology changes, both of which + already justify a reload. + """ backend_configs = [] for target_group in self._target_groups: fallback_target = None @@ -1272,17 +2518,77 @@ def _update_haproxy_backends(self) -> None: backend_config = self._create_backend_config(target_group, fallback_target) backend_configs.append(backend_config) - logger.info( - f"Got updated backend configs: {backend_configs}.", - extra={"log_to_stderr": True}, + replica_counts = {bc.name: len(bc.servers) for bc in backend_configs} + backend_set_changed = set(replica_counts) != set(self._current_split) + any_overflow = any( + replica_counts[name] > self._current_split.get(name, 0) + for name in replica_counts ) + if backend_set_changed or any_overflow: + old_split = self._current_split + self._current_split = _compute_slot_split(replica_counts) + if self._current_split != old_split: + logger.info( + "Slot split recomputed (set_changed=%s, overflow=%s): %s", + backend_set_changed, + any_overflow, + self._current_split, + extra={"log_to_stderr": True}, + ) - name_to_backend_configs = { - backend_config.name: backend_config for backend_config in backend_configs - } + for bc in backend_configs: + bc.slot_pool_size = self._current_split.get(bc.name, 0) - self._haproxy.set_backend_configs(name_to_backend_configs) - self.event_loop.create_task(self._reload_haproxy()) + return {bc.name: bc for bc in backend_configs} + + def _update_haproxy_backends(self) -> None: + # Schedule a coalesced flush. If a flush is already pending (sleeping + # or applying), set `_coalesce_pending` so the running task picks up + # our broadcast on the next loop iteration. With coalescing disabled + # (window=0), apply synchronously to preserve legacy behaviour. + if self._coalesce_window_s <= 0: + name_to_backend_configs = self._build_name_to_backend_configs() + logger.info( + f"Got updated backend configs: {list(name_to_backend_configs.values())}.", + extra={"log_to_stderr": True}, + ) + self.event_loop.create_task( + self._apply_backend_update(name_to_backend_configs) + ) + return + + self._coalesce_pending = True + if self._coalesce_task is None or self._coalesce_task.done(): + self._coalesce_task = self.event_loop.create_task( + self._coalesce_and_apply() + ) + + async def _coalesce_and_apply(self) -> None: + """Drain pending broadcasts, applying each batch after a short sleep. + + Broadcasts arriving during the sleep are absorbed because the + long-poll callbacks overwrite `self._target_groups` / fallback fields + in place — by the time we wake up, the snapshot already reflects them. + Broadcasts arriving during the apply are handled by the outer loop: + they re-set `_coalesce_pending`, and we iterate again. + """ + try: + while self._coalesce_pending: + # Clear before sleep so any broadcast during sleep+apply + # re-arms the flag and triggers another iteration. + self._coalesce_pending = False + await asyncio.sleep(self._coalesce_window_s) + name_to_backend_configs = self._build_name_to_backend_configs() + logger.info( + f"Got updated backend configs: {list(name_to_backend_configs.values())}.", + extra={"log_to_stderr": True}, + ) + await self._apply_backend_update(name_to_backend_configs) + except Exception as e: + # Don't let an apply failure kill the coalescer. The next broadcast + # will schedule a fresh flush task; the underlying error is already + # logged by `_apply_backend_update` / `try_apply_servers_dynamically`. + logger.error(f"Coalesced backend apply failed: {e}") def update_target_groups(self, target_groups: List[TargetGroup]) -> None: self._target_groups = target_groups diff --git a/python/ray/serve/_private/haproxy_templates.py b/python/ray/serve/_private/haproxy_templates.py index 252589019458..04c69e022f91 100644 --- a/python/ray/serve/_private/haproxy_templates.py +++ b/python/ray/serve/_private/haproxy_templates.py @@ -5,16 +5,17 @@ {%- if not health_info.healthy %} # Override: force health checks to fail (used by drain/disable) http-request return status {{ health_info.status }} content-type text/plain string "{{ health_info.health_message }}" if healthcheck -{%- elif backends %} - # 200 if any backend has at least one server UP -{%- for backend in backends %} - acl backend_{{ backend.name or 'unknown' }}_server_up nbsrv({{ backend.name or 'unknown' }}) ge 1 -{%- endfor %} - # Any backend with a server UP passes the health check (OR logic) -{%- for backend in backends %} - http-request return status {{ health_info.status }} content-type text/plain string "{{ health_info.health_message }}" if healthcheck backend_{{ backend.name or 'unknown' }}_server_up -{%- endfor %} - http-request return status 503 content-type text/plain string "Service Unavailable" if healthcheck +{%- else %} + # Always return success if HAProxy itself is up. Backend pool state is + # surfaced to clients per-request (HAProxy returns 503 when backends + # are missing) — not via this ingress healthcheck. Tying ALB target + # health to nbsrv() causes flapping cascades during autoscaling churn: + # initial-check windows after a reload briefly report nbsrv()=0 even + # though HAProxy can still serve traffic (via redispatch, fallback, + # and replicas that finish their initial probes within milliseconds). + # That brief 503 marks the target unhealthy at the ALB for minutes, + # making every client request route to a "no healthy targets" state. + http-request return status {{ health_info.status }} content-type text/plain string "{{ health_info.health_message }}" if healthcheck {%- endif %} """ @@ -22,7 +23,18 @@ # Log to the standard system log socket with debug level. log /dev/log local0 debug log 127.0.0.1:{{ config.syslog_port }} local0 debug - stats socket {{ config.socket_path }} mode 666 level admin expose-fd listeners + # Dedicated socket for the `-x admin.sock` FD-transfer during graceful + # reload. Carries `expose-fd listeners` so the new HAProxy can fetch + # listener FDs from it, but is NOT used for runtime-API commands. + # Keeping FD transfer on its own socket prevents the `_getsocks` + # exchange from queueing behind add/del/disable/enable server commands + # during a reload. + stats socket {{ config.transfer_socket_path }} mode 666 level admin expose-fd listeners + # Runtime-API + stats socket. Used by the proxy actor for + # `add server`, `del server`, `show stat`, etc. No `expose-fd + # listeners` here — FD transfer is reserved for the dedicated socket + # above so the two streams don't contend on the same connection slot. + stats socket {{ config.socket_path }} mode 666 level admin stats timeout 30s maxconn {{ config.maxconn }} nbthread {{ config.nbthread }} @@ -36,6 +48,16 @@ defaults mode http option log-health-checks + # Avoid blocking on libc DNS resolution at HAProxy startup. With many + # backends, the cumulative resolver wait can be seconds even when + # every server address is already an IP literal -- HAProxy still + # consults libc as part of `init-addr`'s default order. The order + # below tries the previous process's resolved address first (instant + # via the server-state file), falls back to libc only if needed, and + # accepts "no IP yet" rather than failing startup. Servers come up + # immediately and DNS resolution happens lazily in the background. + default-server init-addr last,libc,none + retries {{ config.retries }} {% if config.timeout_connect_s is not none %}timeout connect {{ config.timeout_connect_s }}s{% endif %} {% if config.timeout_client_s is not none %}timeout client {{ config.timeout_client_s }}s{% endif %} {% if config.timeout_server_s is not none %}timeout server {{ config.timeout_server_s }}s{% endif %} @@ -44,7 +66,6 @@ {% if config.timeout_queue_s is not none %}timeout queue {{ config.timeout_queue_s }}s{% endif %} log global option httplog - option abortonclose {%- if config.tcp_nodelay %} # Set TCP_NODELAY on all connections option http-no-delay @@ -57,9 +78,12 @@ errorfile 502 {{ config.error_file_path }} errorfile 504 {{ config.error_file_path }} {%- endif %} - {%- if config.enable_hap_optimization %} - load-server-state-from-file global - {%- endif %} + # `load-server-state-from-file` is intentionally not emitted: with + # `server-template`, the slot ↔ replica mapping is owned by the + # proxy actor's in-memory pool. Loading a saved state file would + # restore the previous mapping inside HAProxy while the actor + # re-assigns slots first-free from a fresh pool, producing + # duplicate slots both pointing at the same replica. balance {{ config.balance_algorithm }} frontend prometheus bind :{{ config.metrics_port }} @@ -91,6 +115,13 @@ {%- set hc = item.health_config %} backend {{ backend.name or 'unknown' }} log global + # On a server failure, retry on a different server (combined with + # `retries N` from the defaults block) instead of giving up to the + # backup or returning a synthesized 5xx. retry-on covers + # connection-establishment failures and "connected but no response yet" + # cases — both are body-stream-safe and don't risk double-execution. + option redispatch + retry-on conn-failure empty-response # Enable HTTP connection reuse for better performance http-reuse always # Set backend-specific timeouts, overriding defaults if specified @@ -123,10 +154,13 @@ http-check expect status 200 {%- endif %} {{ hc.default_server_directive }} - # Servers in this backend - {%- for server in backend.servers %} - server {{ server.name }} {{ server.host }}:{{ server.port }} check - {%- endfor %} + # Pre-allocated slot pool. Slots start disabled with a placeholder + # address; the proxy actor populates them via the runtime API + # (`set server /srvN addr port ` + `state ready`) + # as replicas register, and releases them (`state maint`) when + # replicas exit. Slot count is computed by `_compute_slot_split` at + # config-generation time from the current replica count + headroom. + server-template srv 1-{{ backend.slot_pool_size }} 0.0.0.0:1 check disabled {%- if backend.fallback_server %} # Fallback to head node's Serve proxy when no ingress replicas are available server {{ backend.fallback_server.name }} {{ backend.fallback_server.host }}:{{ backend.fallback_server.port }} check backup diff --git a/python/ray/serve/tests/test_haproxy_api.py b/python/ray/serve/tests/test_haproxy_api.py index ff1b22b1c8e9..dcab187a24b4 100644 --- a/python/ray/serve/tests/test_haproxy_api.py +++ b/python/ray/serve/tests/test_haproxy_api.py @@ -192,6 +192,7 @@ def test_generate_config_file_internal(haproxy_api_cleanup): timeout_server_s=30, timeout_http_request_s=10, timeout_queue_s=1, + timeout_http_keep_alive_s=55, stats_port=8080, stats_uri="/mystats", health_check_fall=3, @@ -201,7 +202,6 @@ def test_generate_config_file_internal(haproxy_api_cleanup): http_options=HTTPOptions( host="0.0.0.0", port=8000, - keep_alive_timeout_s=55, ), has_received_routes=True, has_received_servers=True, @@ -225,6 +225,7 @@ def test_generate_config_file_internal(haproxy_api_cleanup): fallback_server=ServerConfig( name="api_fallback_server", host="127.0.0.1", port=8500 ), + slot_pool_size=8, ), "web_backend": BackendConfig( name="web_backend", @@ -236,7 +237,8 @@ def test_generate_config_file_internal(haproxy_api_cleanup): timeout_tunnel_s=45, servers=[ ServerConfig(name="web_server1", host="127.0.0.1", port=8003), - ] + ], + slot_pool_size=4, # No health check overrides - should use global defaults ), } @@ -268,16 +270,37 @@ def test_generate_config_file_internal(haproxy_api_cleanup): # Log to the standard system log socket with debug level. log /dev/log local0 debug log 127.0.0.1:514 local0 debug - stats socket {socket_path} mode 666 level admin expose-fd listeners + # Dedicated socket for the `-x admin.sock` FD-transfer during graceful + # reload. Carries `expose-fd listeners` so the new HAProxy can fetch + # listener FDs from it, but is NOT used for runtime-API commands. + # Keeping FD transfer on its own socket prevents the `_getsocks` + # exchange from queueing behind add/del/disable/enable server commands + # during a reload. + stats socket {socket_path}.fd mode 666 level admin expose-fd listeners + # Runtime-API + stats socket. Used by the proxy actor for + # `add server`, `del server`, `show stat`, etc. No `expose-fd + # listeners` here — FD transfer is reserved for the dedicated socket + # above so the two streams don't contend on the same connection slot. + stats socket {socket_path} mode 666 level admin stats timeout 30s maxconn 1000 nbthread 2 server-state-base /tmp/haproxy-serve server-state-file /tmp/haproxy-serve/server-state - hard-stop-after 120s + hard-stop-after 600s defaults mode http option log-health-checks + # Avoid blocking on libc DNS resolution at HAProxy startup. With many + # backends, the cumulative resolver wait can be seconds even when + # every server address is already an IP literal -- HAProxy still + # consults libc as part of `init-addr`'s default order. The order + # below tries the previous process's resolved address first (instant + # via the server-state file), falls back to libc only if needed, and + # accepts "no IP yet" rather than failing startup. Servers come up + # immediately and DNS resolution happens lazily in the background. + default-server init-addr last,libc,none + retries 3 timeout connect 5s timeout client 30s timeout server 30s @@ -286,12 +309,16 @@ def test_generate_config_file_internal(haproxy_api_cleanup): timeout queue 1s log global option httplog - option abortonclose option idle-close-on-response # Normalize 502 and 504 errors to 500 per Serve's default behavior errorfile 502 {temp_dir}/500.http errorfile 504 {temp_dir}/500.http - load-server-state-from-file global + # `load-server-state-from-file` is intentionally not emitted: with + # `server-template`, the slot ↔ replica mapping is owned by the + # proxy actor's in-memory pool. Loading a saved state file would + # restore the previous mapping inside HAProxy while the actor + # re-assigns slots first-free from a fresh pool, producing + # duplicate slots both pointing at the same replica. balance random(2) frontend prometheus bind :9101 @@ -304,13 +331,16 @@ def test_generate_config_file_internal(haproxy_api_cleanup): acl healthcheck path -i /-/healthz # Suppress logging for health checks http-request set-log-level silent if healthcheck - # 200 if any backend has at least one server UP - acl backend_api_backend_server_up nbsrv(api_backend) ge 1 - acl backend_web_backend_server_up nbsrv(web_backend) ge 1 - # Any backend with a server UP passes the health check (OR logic) - http-request return status 200 content-type text/plain string "success" if healthcheck backend_api_backend_server_up - http-request return status 200 content-type text/plain string "success" if healthcheck backend_web_backend_server_up - http-request return status 503 content-type text/plain string "Service Unavailable" if healthcheck + # Always return success if HAProxy itself is up. Backend pool state is + # surfaced to clients per-request (HAProxy returns 503 when backends + # are missing) — not via this ingress healthcheck. Tying ALB target + # health to nbsrv() causes flapping cascades during autoscaling churn: + # initial-check windows after a reload briefly report nbsrv()=0 even + # though HAProxy can still serve traffic (via redispatch, fallback, + # and replicas that finish their initial probes within milliseconds). + # That brief 503 marks the target unhealthy at the ALB for minutes, + # making every client request route to a "no healthy targets" state. + http-request return status 200 content-type text/plain string "success" if healthcheck # Routes endpoint acl routes path -i /-/routes http-request return status 200 content-type application/json string "{routes}" if routes @@ -326,6 +356,13 @@ def test_generate_config_file_internal(haproxy_api_cleanup): http-request return status 404 content-type text/plain lf-string "Path \'%[path]\' not found. Ping http://.../-/routes for available routes." backend api_backend log global + # On a server failure, retry on a different server (combined with + # `retries N` from the defaults block) instead of giving up to the + # backup or returning a synthesized 5xx. retry-on covers + # connection-establishment failures and "connected but no response yet" + # cases — both are body-stream-safe and don't risk double-execution. + option redispatch + retry-on conn-failure empty-response # Enable HTTP connection reuse for better performance http-reuse always # Set backend-specific timeouts, overriding defaults if specified @@ -337,13 +374,24 @@ def test_generate_config_file_internal(haproxy_api_cleanup): option httpchk GET /api/health http-check expect status 200 default-server fastinter 250ms downinter 250ms fall 2 rise 3 inter 5s check - # Servers in this backend - server api_server1 127.0.0.1:8001 check - server api_server2 127.0.0.1:8002 check + # Pre-allocated slot pool. Slots start disabled with a placeholder + # address; the proxy actor populates them via the runtime API + # (`set server /srvN addr port ` + `state ready`) + # as replicas register, and releases them (`state maint`) when + # replicas exit. Slot count is computed by `_compute_slot_split` at + # config-generation time from the current replica count + headroom. + server-template srv 1-8 0.0.0.0:1 check disabled # Fallback to head node's Serve proxy when no ingress replicas are available server api_fallback_server 127.0.0.1:8500 check backup backend web_backend log global + # On a server failure, retry on a different server (combined with + # `retries N` from the defaults block) instead of giving up to the + # backup or returning a synthesized 5xx. retry-on covers + # connection-establishment failures and "connected but no response yet" + # cases — both are body-stream-safe and don't risk double-execution. + option redispatch + retry-on conn-failure empty-response # Enable HTTP connection reuse for better performance http-reuse always # Set backend-specific timeouts, overriding defaults if specified @@ -357,8 +405,13 @@ def test_generate_config_file_internal(haproxy_api_cleanup): option httpchk GET /-/healthz http-check expect status 200 default-server fastinter 250ms downinter 250ms fall 3 rise 2 inter 2s check - # Servers in this backend - server web_server1 127.0.0.1:8003 check + # Pre-allocated slot pool. Slots start disabled with a placeholder + # address; the proxy actor populates them via the runtime API + # (`set server /srvN addr port ` + `state ready`) + # as replicas register, and releases them (`state maint`) when + # replicas exit. Slot count is computed by `_compute_slot_split` at + # config-generation time from the current replica count + headroom. + server-template srv 1-4 0.0.0.0:1 check disabled listen stats bind *:8080 stats enable @@ -1078,7 +1131,11 @@ def health_check_condition(status_code: int): @pytest.mark.asyncio async def test_health_endpoint_or_logic_multiple_backends(haproxy_api_cleanup): - """Test that the health endpoint returns 200 if ANY backend has at least one server UP (OR logic).""" + """Test that the health endpoint returns 200 unconditionally when HAProxy + is up — regardless of backend server state. The endpoint is for ALB + target-health checks (is this proxy reachable?) and intentionally + decouples from backend pool state to avoid flapping cascades during + autoscaling churn.""" with tempfile.TemporaryDirectory() as temp_dir: config_file_path = os.path.join(temp_dir, "haproxy.cfg") socket_path = os.path.join(temp_dir, "admin.sock") @@ -1172,25 +1229,22 @@ def health_ok(): backend2_server.should_exit = True backend2_thread.join(timeout=5) - # Wait for health check to fail (both servers are DOWN) - def health_fails(): - resp = requests.get( - f"http://127.0.0.1:{config.frontend_port}{config.health_check_endpoint}", - timeout=5, - ) - return resp.status_code == 503 - - wait_for_condition(health_fails, timeout=10, retry_interval_ms=200) + # Give HAProxy time to detect both servers are down + await asyncio.sleep(2) - # Verify health check returns 503 when ALL servers are DOWN + # Verify health check STILL returns 200 even when ALL servers are + # DOWN. /-/healthz reflects HAProxy's own liveness, not backend + # pool state. Backend availability is surfaced to clients + # per-request (HAProxy returns 503 from the actual request path + # when no backend can serve), not via this ingress probe. health_response = requests.get( f"http://127.0.0.1:{config.frontend_port}{config.health_check_endpoint}", timeout=5, ) assert ( - health_response.status_code == 503 - ), "Health check should return 503 when all servers are DOWN" - assert b"Service Unavailable" in health_response.content + health_response.status_code == 200 + ), "Health check should return 200 (HAProxy is up) regardless of backend state" + assert b"success" in health_response.content await api.stop() finally: diff --git a/python/ray/serve/tests/test_haproxy_slots.py b/python/ray/serve/tests/test_haproxy_slots.py new file mode 100644 index 000000000000..4a1b8e6db92b --- /dev/null +++ b/python/ray/serve/tests/test_haproxy_slots.py @@ -0,0 +1,175 @@ +"""Unit tests for HAProxy slot pool + split helpers. + +These tests are pure-Python and do not require the HAProxy binary or the +RAY_SERVE_ENABLE_HA_PROXY env flag. +""" + +import pytest + +from ray.serve._private.haproxy import ( + BackendSlotPool, + _compute_slot_split, +) + + +class TestBackendSlotPool: + def test_init_rejects_nonpositive_size(self): + with pytest.raises(ValueError): + BackendSlotPool("be", 0) + with pytest.raises(ValueError): + BackendSlotPool("be", -1) + + def test_first_free_allocation_is_compact(self): + """Slots are handed out lowest-first; releases recycle the freed slot.""" + pool = BackendSlotPool("be", pool_size=4) + assert pool.assign("a") == 1 + assert pool.assign("b") == 2 + assert pool.assign("c") == 3 + # Release b -> slot 2 should be the next allocation. + assert pool.release("b") == 2 + assert pool.assign("d") == 2 + # Release a -> slot 1 reused next. + assert pool.release("a") == 1 + assert pool.assign("e") == 1 + + def test_assign_is_idempotent(self): + pool = BackendSlotPool("be", pool_size=4) + slot = pool.assign("a") + assert pool.assign("a") == slot + assert pool.in_use() == 1 + + def test_release_of_unknown_is_noop(self): + pool = BackendSlotPool("be", pool_size=2) + assert pool.release("ghost") is None + assert pool.in_use() == 0 + + def test_full_pool_returns_none(self): + pool = BackendSlotPool("be", pool_size=2) + pool.assign("a") + pool.assign("b") + assert pool.assign("c") is None + assert pool.is_exhausted() + # After a release, room opens up again. + pool.release("a") + assert not pool.is_exhausted() + assert pool.assign("c") == 1 + + def test_mapping_returns_copy(self): + pool = BackendSlotPool("be", pool_size=4) + pool.assign("a") + m1 = pool.mapping() + pool.assign("b") + m2 = pool.mapping() + # m1 should not be mutated by later assignments + assert m1 == {"a": 1} + assert m2 == {"a": 1, "b": 2} + + def test_in_use_and_free_counts(self): + pool = BackendSlotPool("be", pool_size=3) + assert pool.in_use() == 0 + assert pool.free() == 3 + pool.assign("a") + pool.assign("b") + assert pool.in_use() == 2 + assert pool.free() == 1 + pool.release("a") + assert pool.in_use() == 1 + assert pool.free() == 2 + + +class TestComputeSlotSplit: + def test_empty_returns_empty(self): + assert _compute_slot_split({}) == {} + + def test_single_backend_gets_full_budget(self): + result = _compute_slot_split({"only": 10}, total=128, min_per_backend=16) + assert result == {"only": 128} + + def test_no_replicas_equal_split_above_minimum(self): + result = _compute_slot_split( + {"a": 0, "b": 0, "c": 0}, + total=300, + min_per_backend=10, + headroom_factor=2.0, + ) + # Reserved: 30. Remaining: 270 split equally → 90 each. Sum = 300. + assert result == {"a": 100, "b": 100, "c": 100} + assert sum(result.values()) == 300 + + def test_proportional_split_by_replica_count(self): + result = _compute_slot_split( + {"big": 100, "small": 10}, + total=400, + min_per_backend=20, + headroom_factor=2.0, + ) + # Reserved: 40. Remaining: 360. + # Demand: big=200, small=20 (total 220). + # Extra: big = 360*200/220 ≈ 327, small = 360*20/220 ≈ 32. + # With rounding: 327+32=359; +1 to largest fractional → 327+33=360. + # Final: big=20+327=347 or 348, small=20+32=33 or 32. Sum=400. + assert sum(result.values()) == 400 + assert result["big"] >= result["small"] + assert result["big"] >= 200 # has at least replica count's worth + assert result["small"] >= 20 # has at least the minimum + + def test_minimum_floor_respected(self): + """Even an idle backend gets at least min_per_backend.""" + result = _compute_slot_split( + {"hot": 100, "idle": 0}, + total=200, + min_per_backend=16, + headroom_factor=2.0, + ) + assert result["idle"] == 16 + assert sum(result.values()) <= 200 + + def test_pathological_too_many_backends_degrades_to_equal(self): + # 10 backends * min=16 = 160 > total=100, so equal split. + result = _compute_slot_split( + {f"be{i}": 0 for i in range(10)}, + total=100, + min_per_backend=16, + headroom_factor=2.0, + ) + assert sum(result.values()) == 100 + # Each backend has at least 1, all roughly equal (10 each). + for size in result.values(): + assert size >= 1 + assert max(result.values()) - min(result.values()) <= 1 + + def test_sum_invariant_does_not_exceed_total(self): + """Property: sum(result) <= total for all reasonable inputs.""" + import random + + random.seed(42) + for _ in range(50): + n = random.randint(1, 30) + counts = {f"be{i}": random.randint(0, 50) for i in range(n)} + total = random.randint(64, 8192) + result = _compute_slot_split( + counts, total=total, min_per_backend=16, headroom_factor=2.0 + ) + assert sum(result.values()) <= total + assert set(result.keys()) == set(counts.keys()) + + def test_headroom_factor_scales_allocation(self): + """Higher headroom_factor gives the busier backend more slots.""" + low = _compute_slot_split( + {"busy": 50, "idle": 0}, + total=400, + min_per_backend=16, + headroom_factor=1.0, + ) + high = _compute_slot_split( + {"busy": 50, "idle": 0}, + total=400, + min_per_backend=16, + headroom_factor=4.0, + ) + # The "idle" backend's allocation can't grow (only min). But the + # "busy" backend's share grows with headroom_factor — except + # the total is fixed at 400 and the idle min is 16, so busy + # caps at 384 regardless. Just check monotonicity: + # busy(high_factor) >= busy(low_factor) + assert high["busy"] >= low["busy"] diff --git a/python/ray/serve/tests/test_http_routes.py b/python/ray/serve/tests/test_http_routes.py index 4fafc1360e73..18f18a9f5896 100644 --- a/python/ray/serve/tests/test_http_routes.py +++ b/python/ray/serve/tests/test_http_routes.py @@ -244,9 +244,16 @@ def h(): time.sleep(100) # Don't return here to leave time for actor exit. serve.run(h.bind()) - # Error is raised before the request reaches the deployed replica as the replica does not exist. + # The replica kills itself mid-request, so the deployment ends up with no + # live replicas. The exact status code depends on timing: + # - 500 if the controller hasn't yet broadcast unavailability to the proxy + # (the request fails as ActorDiedError → mapped to 500 in http_util.py). + # - 503 if the broadcast has happened (the proxy raises + # DeploymentUnavailableError → mapped to 503 in http_util.py). With + # HAProxy + redispatch, the backup proxy actor is reached after the + # primary fails, by which point the broadcast has typically landed. r = httpx.get("http://localhost:8000/h") - assert r.status_code == 500 + assert r.status_code in (500, 503) if __name__ == "__main__": diff --git a/python/ray/serve/tests/unit/test_haproxy_diff.py b/python/ray/serve/tests/unit/test_haproxy_diff.py new file mode 100644 index 000000000000..8eccfa147747 --- /dev/null +++ b/python/ray/serve/tests/unit/test_haproxy_diff.py @@ -0,0 +1,380 @@ +"""Unit tests for the incremental backend-update diff in HAProxyApi. + +The diff helper decides whether a backend-config change can be applied via +HAProxy's runtime API (`add server` / `del server`) instead of regenerating +the full config and triggering a graceful reload. At Ray Serve scales with +many replicas under autoscaling churn, reload-on-every-change blocks the +proxy actor's event loop long enough to fail the controller's health check, +which kills and recreates the actor — leading to cluster-wide proxy +flapping. The runtime-API path keeps the actor responsive for the common +case (replica added/removed, nothing else). + +These tests cover the diff classification in isolation. Whether a given +diff actually applies cleanly is HAProxy's responsibility at runtime; +HAProxyApi.try_apply_servers_dynamically falls back to a reload on any +socket-command error. +""" + +from unittest.mock import AsyncMock, patch + +import pytest + +from ray.serve._private.haproxy import ( + BackendConfig, + HAProxyApi, + HAProxyConfig, + ServerConfig, + _compute_backend_diff, +) +from ray.serve.config import HTTPOptions + + +def _backend( + name: str, + servers: list, + *, + path_prefix: str = "/x", + fallback=None, + **overrides, +) -> BackendConfig: + """Make a BackendConfig with sensible defaults; override fields per test.""" + return BackendConfig( + name=name, + path_prefix=path_prefix, + app_name=name, + servers=servers, + fallback_server=fallback, + **overrides, + ) + + +def _server(name: str, host: str = "10.0.0.1", port: int = 8000) -> ServerConfig: + return ServerConfig(name=name, host=host, port=port) + + +class TestComputeBackendDiff: + """The diff is the safety boundary between fast-path and reload.""" + + def test_identical_configs_no_changes(self): + """No diff → no work, no reload.""" + configs = {"b1": _backend("b1", [_server("s1"), _server("s2")])} + diff = _compute_backend_diff(configs, configs) + assert not diff.is_structural + assert diff.servers_to_add == {} + assert diff.servers_to_remove == {} + + def test_added_server(self): + """A new server within an existing backend is the canonical fast-path case.""" + old = {"b1": _backend("b1", [_server("s1")])} + new = {"b1": _backend("b1", [_server("s1"), _server("s2", port=8001)])} + diff = _compute_backend_diff(old, new) + assert not diff.is_structural + assert "b1" in diff.servers_to_add + assert [s.name for s in diff.servers_to_add["b1"]] == ["s2"] + assert diff.servers_to_remove == {} + + def test_removed_server(self): + """A removed server within an existing backend is also fast-path.""" + old = {"b1": _backend("b1", [_server("s1"), _server("s2")])} + new = {"b1": _backend("b1", [_server("s1")])} + diff = _compute_backend_diff(old, new) + assert not diff.is_structural + assert diff.servers_to_remove == {"b1": ["s2"]} + assert diff.servers_to_add == {} + + def test_server_address_change_is_remove_plus_add(self): + """Same name, different host/port → remove old, add new. + + HAProxy's runtime API doesn't have a clean in-place address-change + primitive that works the same way across all 2.x releases, so we + treat this as remove + re-add. Conservative and predictable. + """ + old = {"b1": _backend("b1", [_server("s1", host="10.0.0.1", port=8000)])} + new = {"b1": _backend("b1", [_server("s1", host="10.0.0.2", port=8000)])} + diff = _compute_backend_diff(old, new) + assert not diff.is_structural + assert diff.servers_to_remove == {"b1": ["s1"]} + assert [s.host for s in diff.servers_to_add["b1"]] == ["10.0.0.2"] + + def test_added_backend_is_structural(self): + """A new backend requires a new ACL → reload.""" + old = {"b1": _backend("b1", [_server("s1")])} + new = { + "b1": _backend("b1", [_server("s1")]), + "b2": _backend("b2", [_server("s3")]), + } + diff = _compute_backend_diff(old, new) + assert diff.is_structural + + def test_removed_backend_is_structural(self): + """A removed backend → reload (frontend ACLs must be regenerated).""" + old = { + "b1": _backend("b1", [_server("s1")]), + "b2": _backend("b2", [_server("s3")]), + } + new = {"b1": _backend("b1", [_server("s1")])} + diff = _compute_backend_diff(old, new) + assert diff.is_structural + + def test_path_prefix_change_is_structural(self): + """Path-prefix changes affect the frontend ACLs → reload.""" + old = {"b1": _backend("b1", [_server("s1")], path_prefix="/x")} + new = {"b1": _backend("b1", [_server("s1")], path_prefix="/y")} + diff = _compute_backend_diff(old, new) + assert diff.is_structural + + def test_fallback_server_change_is_structural(self): + """Backup-server changes are rendered into the backend block → reload.""" + old = { + "b1": _backend( + "b1", + [_server("s1")], + fallback=_server("fb", host="127.0.0.1", port=8500), + ) + } + new = { + "b1": _backend( + "b1", + [_server("s1")], + fallback=_server("fb", host="127.0.0.1", port=8501), + ) + } + diff = _compute_backend_diff(old, new) + assert diff.is_structural + + @pytest.mark.parametrize( + "field,old_val,new_val", + [ + ("timeout_connect_s", None, 5), + ("timeout_server_s", 30, 60), + ("timeout_http_keep_alive_s", None, 60), + ("timeout_tunnel_s", None, 60), + ("health_check_fall", None, 3), + ("health_check_rise", None, 2), + ("health_check_inter", None, "1s"), + ("health_check_path", None, "/-/healthz"), + ], + ) + def test_backend_level_field_change_is_structural(self, field, old_val, new_val): + """Any non-server backend field change → reload. + + These all render into the HAProxy backend block (timeouts, health- + check directives, etc.), so they require regenerating the config. + """ + old = {"b1": _backend("b1", [_server("s1")], **{field: old_val})} + new = {"b1": _backend("b1", [_server("s1")], **{field: new_val})} + diff = _compute_backend_diff(old, new) + assert diff.is_structural, f"change to {field} should be structural" + + def test_concurrent_adds_and_removes_in_one_diff(self): + """A real-world case: scale-up adds N replicas while scale-down removes M.""" + old = {"b1": _backend("b1", [_server("s1"), _server("s2"), _server("s3")])} + new = {"b1": _backend("b1", [_server("s2"), _server("s4"), _server("s5")])} + diff = _compute_backend_diff(old, new) + assert not diff.is_structural + assert sorted(diff.servers_to_remove["b1"]) == ["s1", "s3"] + assert sorted(s.name for s in diff.servers_to_add["b1"]) == ["s4", "s5"] + + def test_changes_across_multiple_backends(self): + """Mixed updates across backends still take the fast path if all are server-only.""" + old = { + "b1": _backend("b1", [_server("s1")]), + "b2": _backend("b2", [_server("s2")], path_prefix="/y"), + } + new = { + "b1": _backend("b1", [_server("s1"), _server("s1b")]), + "b2": _backend("b2", [], path_prefix="/y"), + } + diff = _compute_backend_diff(old, new) + assert not diff.is_structural + assert [s.name for s in diff.servers_to_add["b1"]] == ["s1b"] + assert diff.servers_to_remove["b2"] == ["s2"] + + +class TestTryApplyServersDynamically: + """Behavior of HAProxyApi.try_apply_servers_dynamically. + + The contract: returns True only when the change was applied without a + reload. Otherwise the caller (HAProxyManager._apply_backend_update) + will perform a full reload. + """ + + def _make_api(self, backend_configs=None): + cfg = HAProxyConfig( + socket_path="/tmp/test.sock", + http_options=HTTPOptions(host="127.0.0.1", port=8000), + ) + return HAProxyApi(cfg=cfg, backend_configs=backend_configs or {}) + + @pytest.mark.asyncio + async def test_first_time_setup_falls_back_to_reload(self): + """Initial broadcast (no prior state) must reload — there's no diff base.""" + api = self._make_api(backend_configs={}) + new_configs = {"b1": _backend("b1", [_server("s1")])} + + applied = await api.try_apply_servers_dynamically(new_configs) + assert applied is False + # Internal state should have been updated so the subsequent reload + # uses the new configs. + assert api.backend_configs == new_configs + + @pytest.mark.asyncio + async def test_haproxy_not_running_falls_back_to_reload(self): + """If HAProxy isn't up yet, the runtime API isn't available.""" + api = self._make_api(backend_configs={"b1": _backend("b1", [_server("s1")])}) + new_configs = {"b1": _backend("b1", [_server("s1"), _server("s2")])} + + with patch.object(api, "is_running", AsyncMock(return_value=False)): + applied = await api.try_apply_servers_dynamically(new_configs) + assert applied is False + assert api.backend_configs == new_configs + + @pytest.mark.asyncio + async def test_structural_change_falls_back_to_reload(self): + """New backend → caller must reload; we still update internal state.""" + api = self._make_api(backend_configs={"b1": _backend("b1", [_server("s1")])}) + new_configs = { + "b1": _backend("b1", [_server("s1")]), + "b2": _backend("b2", [_server("s3")]), + } + + with patch.object(api, "is_running", AsyncMock(return_value=True)): + applied = await api.try_apply_servers_dynamically(new_configs) + assert applied is False + assert api.backend_configs == new_configs + + @pytest.mark.asyncio + async def test_server_add_uses_runtime_api(self): + """Pure add → `add server` + `enable server` over the socket, no reload.""" + api = self._make_api(backend_configs={"b1": _backend("b1", [_server("s1")])}) + new_configs = {"b1": _backend("b1", [_server("s1"), _server("s2", port=8001)])} + + socket_calls = [] + + async def fake_send(cmd): + socket_calls.append(cmd) + return "" + + with patch.object( + api, "is_running", AsyncMock(return_value=True) + ), patch.object( + api, "_send_socket_command", side_effect=fake_send + ), patch.object( + api, "_generate_config_file_internal" + ): + applied = await api.try_apply_servers_dynamically(new_configs) + + assert applied is True + # The exact commands matter — operators read these in HAProxy's logs. + assert any("add server b1/s2 10.0.0.1:8001" in c for c in socket_calls) + assert any("enable server b1/s2" in c for c in socket_calls) + assert not any("del server" in c for c in socket_calls) + + @pytest.mark.asyncio + async def test_server_remove_uses_runtime_api(self): + """Pure remove → `disable server` + `del server` over the socket, no reload.""" + api = self._make_api( + backend_configs={"b1": _backend("b1", [_server("s1"), _server("s2")])} + ) + new_configs = {"b1": _backend("b1", [_server("s1")])} + + socket_calls = [] + + async def fake_send(cmd): + socket_calls.append(cmd) + return "" + + with patch.object( + api, "is_running", AsyncMock(return_value=True) + ), patch.object( + api, "_send_socket_command", side_effect=fake_send + ), patch.object( + api, "_generate_config_file_internal" + ): + applied = await api.try_apply_servers_dynamically(new_configs) + + assert applied is True + # `disable` must precede `del` to handle in-flight connections cleanly. + disable_idx = next( + i for i, c in enumerate(socket_calls) if "disable server b1/s2" in c + ) + del_idx = next(i for i, c in enumerate(socket_calls) if "del server b1/s2" in c) + assert disable_idx < del_idx + + @pytest.mark.asyncio + async def test_socket_failure_falls_back_to_reload(self): + """If any runtime command raises, bail and let the caller reload. + + HAProxy can fail at the transport level (socket closed, connection + refused, timeout). The caller's reload converges live state back + to self.backend_configs. + """ + api = self._make_api(backend_configs={"b1": _backend("b1", [_server("s1")])}) + new_configs = {"b1": _backend("b1", [_server("s1"), _server("s2")])} + + with patch.object( + api, "is_running", AsyncMock(return_value=True) + ), patch.object( + api, + "_send_socket_command", + AsyncMock(side_effect=RuntimeError("socket closed")), + ): + applied = await api.try_apply_servers_dynamically(new_configs) + assert applied is False + # Internal state still updated so the fallback reload reflects truth. + assert api.backend_configs == new_configs + + @pytest.mark.asyncio + async def test_socket_returns_error_string_falls_back_to_reload(self): + """HAProxy can reject commands by returning an error string (not raising). + + The runtime API conveys most command-level failures as response + text (e.g. ``[ALERT] (123) : 'add server' : Backend X not found.``) + rather than transport-level exceptions. We must treat those + responses as failures or we'd silently skip a needed reload while + live HAProxy state diverges from our internal config. + """ + api = self._make_api(backend_configs={"b1": _backend("b1", [_server("s1")])}) + new_configs = {"b1": _backend("b1", [_server("s1"), _server("s2")])} + + async def fake_send(cmd): + if cmd.startswith("add server"): + return "[ALERT] (123) : 'add server' : Backend b1 not found.\n" + return "" + + with patch.object( + api, "is_running", AsyncMock(return_value=True) + ), patch.object(api, "_send_socket_command", side_effect=fake_send): + applied = await api.try_apply_servers_dynamically(new_configs) + assert applied is False + assert api.backend_configs == new_configs + + @pytest.mark.asyncio + async def test_send_runtime_command_raises_on_error_string(self): + """Direct test of the response validator that backs the apply path.""" + api = self._make_api() + + async def fake_send(cmd): + return "[ALERT] command rejected\n" + + with patch.object(api, "_send_socket_command", side_effect=fake_send): + with pytest.raises(RuntimeError, match="HAProxy rejected"): + await api._send_runtime_command("add server foo/bar 1.2.3.4:80") + + @pytest.mark.asyncio + async def test_send_runtime_command_passes_empty_response(self): + """Successful HAProxy runtime commands return empty/whitespace.""" + api = self._make_api() + + async def fake_send(cmd): + return "" + + with patch.object(api, "_send_socket_command", side_effect=fake_send): + result = await api._send_runtime_command("disable server foo/bar") + assert result == "" + + +if __name__ == "__main__": + import sys + + sys.exit(pytest.main(["-v", "-s", __file__]))