Skip to content

Commit 3c44db2

Browse files
[serve] Coalesce HAProxy controller broadcasts into a single apply
Under autoscaling churn the Serve controller fires target-group / fallback-target broadcasts tens of ms apart; without coalescing, each one runs its own runtime-API command burst on the HAProxy admin socket. The CLI mux serializes everything (runtime API, stats, the `-x` socket transfer used by reload) so command-level timeouts and `-x` failures cluster together once admin-socket pressure exceeds HAProxy's processing rate. Coalesce broadcasts in `HAProxyManager` into a single sleeping flush task. Updates arriving during the sleep are absorbed implicitly via in-place writes to `self._target_groups` / fallback fields; updates during the apply re-arm a `_coalesce_pending` flag so the trailing broadcast in a flurry isn't dropped. Window is configurable via `RAY_SERVE_HAPROXY_BROADCAST_COALESCE_S` (default 0.5s, set 0 to disable). Trade-off: each broadcast incurs up to one window of additional latency before reaching HAProxy, which is small relative to typical replica-start time and well below the rate at which admin-socket saturation begins. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 296bf23 commit 3c44db2

2 files changed

Lines changed: 77 additions & 9 deletions

File tree

python/ray/serve/_private/constants.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -759,6 +759,18 @@
759759
os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_CLIENT_S", "3600")
760760
)
761761

762+
# Window during which incoming controller broadcasts (target_groups,
763+
# fallback_targets) are coalesced into a single backend update before being
764+
# applied to HAProxy. Under autoscaling churn the controller can fire
765+
# broadcasts tens of ms apart; without coalescing each one issues its own
766+
# runtime-API command burst on the admin socket, which saturates HAProxy's
767+
# CLI mux and causes timeouts (and `-x` socket-transfer failures during the
768+
# fallback reload). 0.5s collapses typical burst clusters into one diff.
769+
# Set to 0 to disable coalescing entirely (legacy behaviour).
770+
RAY_SERVE_HAPROXY_BROADCAST_COALESCE_S = float(
771+
os.environ.get("RAY_SERVE_HAPROXY_BROADCAST_COALESCE_S", "0.5")
772+
)
773+
762774
# Number of consecutive failed server health checks that must occur
763775
# before haproxy marks the server as down.
764776
RAY_SERVE_HAPROXY_HEALTH_CHECK_FALL = int(

python/ray/serve/_private/haproxy.py

Lines changed: 65 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
RAY_SERVE_EXPERIMENTAL_PIP_HAPROXY,
3636
RAY_SERVE_HAPROXY_BALANCE_ALGORITHM,
3737
RAY_SERVE_HAPROXY_BINARY_PATH,
38+
RAY_SERVE_HAPROXY_BROADCAST_COALESCE_S,
3839
RAY_SERVE_HAPROXY_CONFIG_FILE_LOC,
3940
RAY_SERVE_HAPROXY_HARD_STOP_AFTER_S,
4041
RAY_SERVE_HAPROXY_HEALTH_CHECK_DOWNINTER,
@@ -1412,6 +1413,20 @@ def __init__(
14121413
# which can cause race conditions with SO_REUSEPORT
14131414
self._reload_lock = asyncio.Lock()
14141415

1416+
# Coalesce controller broadcasts: when a broadcast arrives we schedule
1417+
# a single sleeping flush task. Subsequent broadcasts during the sleep
1418+
# are absorbed implicitly because they overwrite `self._target_groups`
1419+
# / fallback fields in place; the flush picks up the latest snapshot
1420+
# when it wakes. This collapses bursts of replica-add/remove events
1421+
# (common during autoscaling churn) into one runtime-API command pile,
1422+
# which keeps the HAProxy admin socket from saturating.
1423+
# `_coalesce_pending` lets the flush task notice broadcasts that
1424+
# arrive during a long-running apply and trigger another iteration
1425+
# so the trailing broadcast in a flurry isn't dropped.
1426+
self._coalesce_window_s = RAY_SERVE_HAPROXY_BROADCAST_COALESCE_S
1427+
self._coalesce_task: Optional[asyncio.Task] = None
1428+
self._coalesce_pending = False
1429+
14151430
self.long_poll_client = long_poll_client or LongPollClient(
14161431
ray.get_actor(SERVE_CONTROLLER_NAME, namespace=SERVE_NAMESPACE),
14171432
{
@@ -1678,7 +1693,8 @@ async def _apply_backend_update(
16781693
if not applied_incrementally:
16791694
await self._haproxy.reload()
16801695

1681-
def _update_haproxy_backends(self) -> None:
1696+
def _build_name_to_backend_configs(self) -> Dict[str, BackendConfig]:
1697+
"""Snapshot the current target groups + fallbacks into backend configs."""
16821698
backend_configs = []
16831699
for target_group in self._target_groups:
16841700
fallback_target = None
@@ -1690,16 +1706,56 @@ def _update_haproxy_backends(self) -> None:
16901706
backend_config = self._create_backend_config(target_group, fallback_target)
16911707
backend_configs.append(backend_config)
16921708

1693-
logger.info(
1694-
f"Got updated backend configs: {backend_configs}.",
1695-
extra={"log_to_stderr": True},
1696-
)
1709+
return {bc.name: bc for bc in backend_configs}
16971710

1698-
name_to_backend_configs = {
1699-
backend_config.name: backend_config for backend_config in backend_configs
1700-
}
1711+
def _update_haproxy_backends(self) -> None:
1712+
# Schedule a coalesced flush. If a flush is already pending (sleeping
1713+
# or applying), set `_coalesce_pending` so the running task picks up
1714+
# our broadcast on the next loop iteration. With coalescing disabled
1715+
# (window=0), apply synchronously to preserve legacy behaviour.
1716+
if self._coalesce_window_s <= 0:
1717+
name_to_backend_configs = self._build_name_to_backend_configs()
1718+
logger.info(
1719+
f"Got updated backend configs: {list(name_to_backend_configs.values())}.",
1720+
extra={"log_to_stderr": True},
1721+
)
1722+
self.event_loop.create_task(
1723+
self._apply_backend_update(name_to_backend_configs)
1724+
)
1725+
return
1726+
1727+
self._coalesce_pending = True
1728+
if self._coalesce_task is None or self._coalesce_task.done():
1729+
self._coalesce_task = self.event_loop.create_task(
1730+
self._coalesce_and_apply()
1731+
)
1732+
1733+
async def _coalesce_and_apply(self) -> None:
1734+
"""Drain pending broadcasts, applying each batch after a short sleep.
17011735
1702-
self.event_loop.create_task(self._apply_backend_update(name_to_backend_configs))
1736+
Broadcasts arriving during the sleep are absorbed because the
1737+
long-poll callbacks overwrite `self._target_groups` / fallback fields
1738+
in place — by the time we wake up, the snapshot already reflects them.
1739+
Broadcasts arriving during the apply are handled by the outer loop:
1740+
they re-set `_coalesce_pending`, and we iterate again.
1741+
"""
1742+
try:
1743+
while self._coalesce_pending:
1744+
# Clear before sleep so any broadcast during sleep+apply
1745+
# re-arms the flag and triggers another iteration.
1746+
self._coalesce_pending = False
1747+
await asyncio.sleep(self._coalesce_window_s)
1748+
name_to_backend_configs = self._build_name_to_backend_configs()
1749+
logger.info(
1750+
f"Got updated backend configs: {list(name_to_backend_configs.values())}.",
1751+
extra={"log_to_stderr": True},
1752+
)
1753+
await self._apply_backend_update(name_to_backend_configs)
1754+
except Exception as e:
1755+
# Don't let an apply failure kill the coalescer. The next broadcast
1756+
# will schedule a fresh flush task; the underlying error is already
1757+
# logged by `_apply_backend_update` / `try_apply_servers_dynamically`.
1758+
logger.error(f"Coalesced backend apply failed: {e}")
17031759

17041760
def update_target_groups(self, target_groups: List[TargetGroup]) -> None:
17051761
self._target_groups = target_groups

0 commit comments

Comments
 (0)