diff --git a/CHANGELOG.md b/CHANGELOG.md index fda070f506..4db0bf8b64 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ Documenting **breaking** configuration changes — renamed, removed, or moved fields that require users to update existing configs. +- **`client.dp_rank_count` removed**: With external-LB data parallelism each DP rank is its own endpoint (one `client.base_url` per rank), so the client no longer pins a rollout to an internal DP shard via the `X-data-parallel-rank` header. The `[orchestrator.student.client] dp_rank_count` field (and its per-rank client expansion) is gone — the router load-balances across the per-rank endpoints. Existing configs setting `dp_rank_count` must drop it (`extra="forbid"` rejects it); there is nothing to migrate. (2026-06-03) - **`trainer.experimental.token_export` removed → `trainer.enable_token_export`**: `TrainerExperimentalConfig` and the `[trainer.experimental] token_export = true` setting have been removed with no migration alias. Existing trainer configs must delete the `[trainer.experimental]` block and set the new top-level trainer flag instead: `enable_token_export = true` in a trainer config file, or `trainer.enable_token_export = true` when nested under `[trainer]` in an RL config. The trainer entrypoint CLI flag is `--enable-token-export`. (2026-06-10) - **`inference.kv_cache_offload.cpu_bytes` removed → discriminated `type` config**: The flat `[inference.kv_cache_offload]` block with a single `cpu_bytes` field is replaced by a backend-discriminated union with composable `cpu`/`disk` tiers. Migrate native CPU offload from `[inference.kv_cache_offload]\ncpu_bytes = N` to `[inference.kv_cache_offload]\ntype = "native"` plus `[inference.kv_cache_offload.cpu]\nnum_bytes = N`. A `type = "mooncake"` backend (per-node distributed store; multi-node/SLURM only) and an optional `[inference.kv_cache_offload.disk]\npath = "..."` tier (layered behind cpu) are also available. `extra="forbid"` rejects the old `cpu_bytes` key, so existing configs must migrate. (2026-06-02) - **Orchestrator async-pipeline rewrite** (collection of removals/renames). The orchestrator was rewritten to overlap train/eval rollouts on a shared concurrency limiter; several config fields were removed or renamed. diff --git a/docs/inference.md b/docs/inference.md index d9b1e9947c..6a3652f203 100644 --- a/docs/inference.md +++ b/docs/inference.md @@ -30,7 +30,7 @@ We support 3 distinct deployment shapes: Most of the features are supported for all deployment shapes, with few exceptions. These exceptions are rejected on validation. -You can select the deployment shape with `InferenceDeploymentConfig` in your config file. This is a config-field that allows you to set the deployment shape, deployment-specific knobs such as `num_nodes`, `num_replicas`, `router_port`, `backend_port`, etc. +You can select the deployment shape with `InferenceDeploymentConfig` in your config file. This is a config-field that allows you to set the deployment shape, deployment-specific knobs such as `num_nodes`, `num_replicas`, `backend_port`, and a `[...deployment.router]` block, etc. ```toml [inference.deployment] @@ -102,7 +102,7 @@ tp = 2 dp = 4 ``` -This configuration will run 2 independent vLLM replicas, each with `tp=2` and `dp=4`. Routing will be handled by the `vllm-router` instance running on the same node as the 1st replica. We aim to support more advanced routing options, such as `llm-d` or `dynamo` in the future. You can read more about the supported routing options in the [router](#router) section. +This configuration will run 2 independent vLLM replicas, each with `tp=2` and `dp=4`. Routing is handled by a router instance running on the same node as the 1st replica — either `vllm-router` (default) or the upstream `llm-d` EPP+Envoy, selected via the `[...deployment.router]` block. You can read more about the supported routing options in the [router](#router) section. ### Wide-EP @@ -167,9 +167,22 @@ This will run 3 inference replicas, each running on 6 nodes. Each replica will r ## Router -We use our own fork of [vllm-router](https://github.com/PrimeIntellect-ai/router) as the request handler. We plan to support more advanced proxy options in the future. +Multi-node and disaggregated deployments front their vLLM backends with a router, configured via a discriminated `[...deployment.router]` block (`type = "vllm-router" | "llm-d"`): -Right now, router handles 2 most important things: +```toml +[inference.deployment.router] # or [deployment.router] for the standalone inference entrypoint +type = "llm-d" # "vllm-router" (default) or "llm-d" +# llm-d-only knobs (all optional): +scorers = { "prefix-cache-scorer" = 3.0, "active-request-scorer" = 2.0 } # base, applied to every profile +prefill_scorer_overrides = { "queue-scorer" = 2.0, "kv-cache-utilization-scorer" = 2.0 } # merged onto the P/D prefill profile +decode_scorer_overrides = {} # merged onto the P/D decode profile +non_cached_tokens = 16 # below this many non-cached prompt tokens, skip remote prefill (P/D) +``` + +- **`vllm-router`** (default) — our fork of [vllm-router](https://github.com/PrimeIntellect-ai/router). Knob: `policy`. +- **`llm-d`** — the upstream [llm-d](https://llm-d.ai) Endpoint Picker (EPP) + Envoy proxy. Routing combines **prefix-cache affinity** (grouped rollouts reuse a cached prefix and skip prefill) with the **`active-request-scorer`** — an in-flight load balancer that spreads requests across ranks immediately, unlike the metrics-scraped `queue-scorer` / `kv-cache-utilization-scorer` / `load-aware-scorer` (which lag and concentrate bursts of same-prefix requests). The scorer weights follow the upstream llm-d P/D guide; tune via `scorers` (base) + `prefill_scorer_overrides` / `decode_scorer_overrides` (per-profile, P/D). Does not support `enable_return_routed_experts` (router replay). + +Both backends support the 2 most important things: - Request routing - KV cache re-use and balanced routing - P/D disaggregation - handling the prefill and decode stages separately diff --git a/packages/prime-rl-configs/src/prime_rl/configs/inference.py b/packages/prime-rl-configs/src/prime_rl/configs/inference.py index 9579259366..91b48117fe 100644 --- a/packages/prime-rl-configs/src/prime_rl/configs/inference.py +++ b/packages/prime-rl-configs/src/prime_rl/configs/inference.py @@ -167,8 +167,26 @@ def to_connector_dict(self) -> dict[str, Any]: ] +# Known llm-d EPP scorer plugins (used to guard the ``scorers`` map against typos). +KNOWN_SCORERS = frozenset( + { + "prefix-cache-scorer", + "precise-prefix-cache-scorer", + "queue-scorer", + "kv-cache-utilization-scorer", + "active-request-scorer", + "load-aware-scorer", + "running-requests-size-scorer", + "token-load-scorer", + "latency-scorer", + "session-affinity-scorer", + "lora-affinity-scorer", + } +) + + class VllmRouterConfig(BaseConfig): - """PrimeIntellect vllm-router fronting the per-rank (external-LB) endpoints.""" + """PrimeIntellect vllm-router.""" type: Literal["vllm-router"] = "vllm-router" @@ -179,9 +197,57 @@ class VllmRouterConfig(BaseConfig): """Routing policy, e.g. ``consistent_hash`` or ``round_robin``.""" -# Discriminated on ``type`` so additional router backends can be added to the -# union (a single member needs no discriminator yet). -RouterConfig: TypeAlias = VllmRouterConfig +class LlmdRouterConfig(BaseConfig): + """llm-d router backend (EPP + Envoy).""" + + type: Literal["llm-d"] = "llm-d" + + port: int = 8000 + """Port the Envoy gateway listens on — becomes the client-facing router URL.""" + + scorers: dict[str, float] = { + "prefix-cache-scorer": 3.0, + "active-request-scorer": 2.0, + } + """EPP scorer name → weight, applied to every routing profile (before the per-profile P/D overrides). Defaults to prefix-cache affinity plus in-flight (active-request) load balancing. Unknown scorer names are rejected.""" + + prefill_scorer_overrides: dict[str, float] = { + "queue-scorer": 2.0, + "kv-cache-utilization-scorer": 2.0, + } + """P/D only: scorer → weight merged onto ``scorers`` for the prefill profile (a per-profile weight overrides the base).""" + + decode_scorer_overrides: dict[str, float] = {} + """P/D only: scorer → weight merged onto ``scorers`` for the decode profile (a per-profile weight overrides the base); empty by default.""" + + non_cached_tokens: int = 16 + """P/D only: requests with fewer than this many non-cached prompt tokens skip remote prefill and run decode-only.""" + + decode_sidecar_port: int = 8300 + """P/D only: port the decode-side llm-d sidecar listens on.""" + + @property + def prefill_scorers(self) -> dict[str, float]: + """Effective prefill-profile scorers: ``scorers`` merged with ``prefill_scorer_overrides``.""" + return {**self.scorers, **self.prefill_scorer_overrides} + + @property + def decode_scorers(self) -> dict[str, float]: + """Effective decode-profile scorers: ``scorers`` merged with ``decode_scorer_overrides``.""" + return {**self.scorers, **self.decode_scorer_overrides} + + @model_validator(mode="after") + def validate_scorers(self): + unknown = ( + set(self.scorers) | set(self.prefill_scorer_overrides) | set(self.decode_scorer_overrides) + ) - KNOWN_SCORERS + if unknown: + raise ValueError(f"Unknown llm-d scorer(s): {sorted(unknown)}. Known scorers: {sorted(KNOWN_SCORERS)}.") + return self + + +# Discriminated on ``type`` so the launch path can pick the router backend. +RouterConfig: TypeAlias = Annotated[VllmRouterConfig | LlmdRouterConfig, Field(discriminator="type")] class BaseInferenceDeploymentConfig(BaseConfig): @@ -369,6 +435,18 @@ def validate_multi_node_requires_slurm(self): raise ValueError("Must use SLURM for multi-node / disaggregated deployment.") return self + @model_validator(mode="after") + def validate_llmd_no_routed_experts(self): + """Reject routed-expert return with the llm-d router (breaks P/D, unverified for multi-node).""" + router = getattr(self.deployment, "router", None) + if router is not None and router.type == "llm-d" and self.enable_return_routed_experts: + raise ValueError( + "The llm-d router backend does not support routed-expert return " + "(enable_return_routed_experts): it breaks P/D and is unverified for multi-node. " + "Use router type 'vllm-router' for routed-expert runs." + ) + return self + @model_validator(mode="after") def auto_setup_kv_cache_offload(self): if self.kv_cache_offload is not None: diff --git a/packages/prime-rl-configs/src/prime_rl/configs/rl.py b/packages/prime-rl-configs/src/prime_rl/configs/rl.py index 832234eea9..f2f246e1c8 100644 --- a/packages/prime-rl-configs/src/prime_rl/configs/rl.py +++ b/packages/prime-rl-configs/src/prime_rl/configs/rl.py @@ -438,6 +438,25 @@ def auto_setup_router_replay(self): ) return self + @model_validator(mode="after") + def validate_llmd_no_routed_experts(self): + """Reject routed-expert return with the llm-d router (breaks P/D, unverified for multi-node). + + Runs after ``auto_setup_router_replay`` so it also catches the + ``trainer.enable_router_replay`` path, which sets the inference flag here + (after InferenceConfig's own validators, which therefore miss it). + """ + if self.inference is not None and self.inference.enable_return_routed_experts: + router = getattr(self.inference.deployment, "router", None) + if router is not None and router.type == "llm-d": + raise ValueError( + "The llm-d router backend does not support routed-expert return " + "(inference.enable_return_routed_experts / trainer.enable_router_replay): it " + "breaks P/D and is unverified for multi-node. Use router type 'vllm-router' " + "for router-replay runs." + ) + return self + @model_validator(mode="after") def validate_router_replay_without_kv_offload(self): if ( @@ -606,16 +625,13 @@ def auto_setup_disaggregated_inference(self): def auto_setup_inference_client(self): """Auto-configure orchestrator student client from the inference server config. - For all modes, sets dp_rank_count from inference DP size. For SFT mode, - also sets base_url - rl/opd rely on the ClientConfig default + For SFT mode, sets base_url - rl/opd rely on the ClientConfig default (``["http://localhost:8000/v1"]``) which already matches the auto-launched student vLLM at inference.server.port = 8000. """ if self.inference is None: return self client = self.orchestrator.student.client - if "dp_rank_count" not in client.model_fields_set: - client.dp_rank_count = self.inference.data_parallel_size_local or self.inference.parallel.dp if self.orchestrator.training_mode == "sft" and "base_url" not in client.model_fields_set: host = self.inference.server.host or "localhost" port = self.inference.server.port diff --git a/packages/prime-rl-configs/src/prime_rl/configs/shared.py b/packages/prime-rl-configs/src/prime_rl/configs/shared.py index ff311f145d..b510a26f75 100644 --- a/packages/prime-rl-configs/src/prime_rl/configs/shared.py +++ b/packages/prime-rl-configs/src/prime_rl/configs/shared.py @@ -127,9 +127,6 @@ class ClientConfig(BaseConfig): skip_model_check: bool = False """Skip checking that the model is available in the inference pool. Useful for external APIs or keys that do not expose ``/models``.""" - dp_rank_count: int = Field(1, ge=1) - """Number of data-parallel ranks behind each base URL. When > 1, each URL is expanded into ``dp_rank_count`` logical clients pinned via the ``X-data-parallel-rank`` header, so every request within a rollout hits the same DP engine and reuses KV cache. Auto-set from the inference config when using the RL entrypoint.""" - admin_base_url: list[str] | None = None """Separate base URLs for admin operations (weight updates, health checks). When set, admin clients bypass routers and hit each server directly — used in disaggregated P/D deployments where the router must not handle admin traffic.""" diff --git a/scripts/install_llmd.sh b/scripts/install_llmd.sh new file mode 100755 index 0000000000..730619f774 --- /dev/null +++ b/scripts/install_llmd.sh @@ -0,0 +1,86 @@ +#!/usr/bin/env bash +# Install the llm-d standalone (no-Kubernetes) routing binaries into +# third_party/llmd/bin: +# - epp : llm-d-router Endpoint Picker (the routing brain) +# - pd-sidecar : decode-side proxy for P/D disaggregation +# - envoy : the data-plane proxy that calls the EPP via ext_proc +# +# epp/pd-sidecar are built from a pinned llm-d-router commit. We currently build +# from a small fork that adds P/D disaggregation for vLLM's token-in +# /inference/v1/generate endpoint (prime-rl's renderer / TITO rollout path) — +# upstream's pd-sidecar only disaggregates the OpenAI endpoints, so token-in P/D +# silently runs decode-only. The fork is pending upstream PR +# llm-d/llm-d-router#1458; switch LLMD_ROUTER_REPO back to the upstream repo and +# bump LLMD_ROUTER_REF once it merges. The fork is branched off the upstream +# commit that added the EPP vllmhttp parser (PR #1248), which the renderer path +# also needs. +# +# System Go is not required: a Go toolchain is vendored under third_party/llmd/go +# and used to bootstrap; GOTOOLCHAIN=auto fetches the exact version the module +# pins. Envoy is extracted as a static binary from its release container image +# WITHOUT docker: we build `crane` (pure-Go, talks straight to the OCI registry) +# with the same vendored toolchain and use it to export the image filesystem. +# This is the only step that previously required a docker daemon, which cluster +# compute nodes don't have. +set -euo pipefail + +LLMD_ROUTER_REPO="${LLMD_ROUTER_REPO:-https://github.com/S1ro1/llm-d-router.git}" +LLMD_ROUTER_REF="${LLMD_ROUTER_REF:-1ca4243ec84c657b4a5f507a1776d6c15a618d5b}" +GO_BOOTSTRAP_VERSION="${GO_BOOTSTRAP_VERSION:-1.23.4}" +ENVOY_VERSION="${ENVOY_VERSION:-1.36.0}" +ENVOY_IMAGE="${ENVOY_IMAGE:-envoyproxy/envoy:v${ENVOY_VERSION}}" + +SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +PROJECT_DIR=$(cd "$SCRIPT_DIR/.." && pwd) +LLMD_DIR="$PROJECT_DIR/third_party/llmd" +BIN_DIR="$LLMD_DIR/bin" +GO_ROOT="$LLMD_DIR/go" +SRC_DIR="$LLMD_DIR/src" +GO_TOOLS_BIN="$LLMD_DIR/gotools/bin" +mkdir -p "$BIN_DIR" "$GO_TOOLS_BIN" + +# --- Go toolchain (vendored bootstrap; auto-upgrades to the module's version) --- +if [ ! -x "$GO_ROOT/bin/go" ]; then + echo "[install_llmd] downloading bootstrap Go $GO_BOOTSTRAP_VERSION" + rm -rf "$GO_ROOT" + curl -fsSL "https://go.dev/dl/go${GO_BOOTSTRAP_VERSION}.linux-amd64.tar.gz" | tar -xz -C "$LLMD_DIR" +fi +export GOROOT="$GO_ROOT" +export PATH="$GO_ROOT/bin:$PATH" +export GOTOOLCHAIN=auto +echo "[install_llmd] bootstrap $(go version)" + +# --- Fetch llm-d-router source at the pinned ref --- +echo "[install_llmd] fetching ${LLMD_ROUTER_REPO}@${LLMD_ROUTER_REF}" +if [ ! -d "$SRC_DIR/.git" ]; then + rm -rf "$SRC_DIR" + git clone --quiet "$LLMD_ROUTER_REPO" "$SRC_DIR" +fi +git -C "$SRC_DIR" remote set-url origin "$LLMD_ROUTER_REPO" +git -C "$SRC_DIR" fetch --quiet origin +git -C "$SRC_DIR" checkout --quiet "$LLMD_ROUTER_REF" + +# --- Build epp + pd-sidecar --- +echo "[install_llmd] building epp + pd-sidecar" +( cd "$SRC_DIR" && go build -o "$BIN_DIR/epp" ./cmd/epp && go build -o "$BIN_DIR/pd-sidecar" ./cmd/pd-sidecar ) + +# --- Envoy static binary (extract from the release image; keep if present) --- +# No docker: build crane with the vendored toolchain and export the image's +# filesystem straight from the registry, then pull out the static envoy binary. +if [ ! -x "$BIN_DIR/envoy" ]; then + if [ ! -x "$GO_TOOLS_BIN/crane" ]; then + echo "[install_llmd] building crane (OCI registry client, no docker needed)" + GOBIN="$GO_TOOLS_BIN" GOFLAGS=-mod=mod \ + go install github.com/google/go-containerregistry/cmd/crane@latest + fi + echo "[install_llmd] extracting Envoy ${ENVOY_VERSION} from ${ENVOY_IMAGE}" + "$GO_TOOLS_BIN/crane" export "$ENVOY_IMAGE" - \ + | tar -xf - -O usr/local/bin/envoy > "$BIN_DIR/envoy" + chmod +x "$BIN_DIR/envoy" +fi + +echo "[install_llmd] installed binaries in $BIN_DIR:" +"$BIN_DIR/epp" --version 2>&1 | head -1 || true +"$BIN_DIR/envoy" --version 2>&1 | head -1 || true +"$BIN_DIR/pd-sidecar" --help >/dev/null 2>&1 && echo " pd-sidecar OK" || true +echo "[install_llmd] done" diff --git a/skills/install/SKILL.md b/skills/install/SKILL.md index aa3dad0661..459e1a4728 100644 --- a/skills/install/SKILL.md +++ b/skills/install/SKILL.md @@ -59,6 +59,16 @@ Flags: `--workspace DIR`, `--deepep-ref REF` (default `73b6ea4`), `--nvshmem-ver Verify: `uv run python -c 'import deep_ep; print(deep_ep.__file__)'`. +### llm-d router backend + +Multi-node / disaggregated deployments can route through the upstream llm-d Endpoint Picker instead of `vllm-router` (set `[...deployment.router] type = "llm-d"`). It needs three native binaries — install once: + +```bash +bash scripts/install_llmd.sh # builds epp + pd-sidecar from a pinned llm-d-router commit (vendored Go), fetches envoy +``` + +Binaries land in `third_party/llmd/bin/{epp,envoy,pd-sidecar}` (a shared path, so SLURM nodes see them). `epp` is pinned to the commit that includes the `vllmhttp-parser` (PR #1248) so prime-rl's renderer/TITO `/inference/v1/generate` path routes correctly. Override the pin with `LLMD_ROUTER_REF=`. The EPP + Envoy + endpoints configs are rendered from `templates/llmd/*.yaml.j2` (included into the SLURM script); only the per-node IPv4 addresses are filled in inline at launch time. + ## Key files - `pyproject.toml` — dependencies, extras, dependency groups diff --git a/src/prime_rl/entrypoints/inference.py b/src/prime_rl/entrypoints/inference.py index 56bf19c655..c770c76fce 100644 --- a/src/prime_rl/entrypoints/inference.py +++ b/src/prime_rl/entrypoints/inference.py @@ -47,7 +47,7 @@ def write_slurm_script(config: InferenceConfig, config_path: Path, script_path: dp_per_node=dp_per_node, num_nodes=getattr(config.deployment, "num_nodes", 1), port=config.server.port, - disaggregated=is_disaggregated, + is_disaggregated=is_disaggregated, kv_offload=offload is not None, kv_offload_mooncake=is_mooncake, kv_offload_cpu_bytes=int(offload.cpu.num_bytes) if is_mooncake else 0, @@ -65,8 +65,7 @@ def write_slurm_script(config: InferenceConfig, config_path: Path, script_path: num_decode_replicas=config.deployment.num_decode_replicas, prefill_port=config.deployment.prefill_port, decode_port=config.deployment.decode_port, - router_port=config.deployment.router.port, - router_policy=config.deployment.router.policy, + router=config.deployment.router, data_parallel_rpc_port=config.data_parallel_rpc_port, use_deep_gemm=config.use_deep_gemm, prefill_env_overrides=config.deployment.prefill_env_overrides, @@ -74,11 +73,11 @@ def write_slurm_script(config: InferenceConfig, config_path: Path, script_path: ) elif is_multi_node: template_vars.update( - router_port=config.deployment.router.port, + router=config.deployment.router, backend_port=config.deployment.backend_port, - router_policy=config.deployment.router.policy, data_parallel_rpc_port=config.data_parallel_rpc_port, enable_expert_parallel=config.enable_expert_parallel, + infer_nodes_per_replica=config.deployment.num_nodes, ) script = template.render(**template_vars) diff --git a/src/prime_rl/entrypoints/rl.py b/src/prime_rl/entrypoints/rl.py index db1e27b995..6f6c2fd8da 100644 --- a/src/prime_rl/entrypoints/rl.py +++ b/src/prime_rl/entrypoints/rl.py @@ -12,6 +12,7 @@ import pynvml import tomli_w +from prime_rl.configs.inference import VllmRouterConfig from prime_rl.configs.rl import RLConfig from prime_rl.utils.config import cli from prime_rl.utils.logger import get_logger, setup_logger @@ -371,7 +372,7 @@ def write_slurm_script(config: RLConfig, config_dir: Path, script_path: Path) -> num_prefill_replicas=infer_deploy.num_prefill_replicas, num_decode_replicas=infer_deploy.num_decode_replicas, gpus_per_node=config.deployment.gpus_per_node, - router_port=infer_deploy.router.port, + router=infer_deploy.router, prefill_port=infer_deploy.prefill_port, decode_port=infer_deploy.decode_port, inference_tp=config.inference.parallel.tp, @@ -397,7 +398,8 @@ def write_slurm_script(config: RLConfig, config_dir: Path, script_path: Path) -> nodes_per_infer_replica=config.deployment.num_infer_nodes, num_infer_replicas=config.deployment.num_infer_replicas, gpus_per_node=config.deployment.gpus_per_node, - router_port=config.inference.deployment.router.port if config.inference else 8000, + router=config.inference.deployment.router if config.inference else VllmRouterConfig(), + infer_nodes_per_replica=config.deployment.num_infer_nodes, backend_port=config.inference.deployment.backend_port if config.inference else 8100, inference_tp=config.inference.parallel.tp if config.inference else 1, inference_enable_expert_parallel=config.inference.enable_expert_parallel if config.inference else False, diff --git a/src/prime_rl/templates/_launch_rank.sh.j2 b/src/prime_rl/templates/_launch_rank.sh.j2 index f9d35fa935..c6026d0c26 100644 --- a/src/prime_rl/templates/_launch_rank.sh.j2 +++ b/src/prime_rl/templates/_launch_rank.sh.j2 @@ -15,14 +15,20 @@ rank_log() { # launch_inference_rank launch_inference_rank() { local port="$1" gpus="$2" dp="$3" rpc="$4" extra="$5" nixl="$6" log="$7" + # Per-rank dir for the mooncake-store lookup ipc socket. It must be unique per process: + # external-LB runs one engine per process, all with data_parallel_rank_local 0, so the + # lookup socket name collides on bind if the dir is shared node-wide. $port is unique per + # rank on the node. Node-local /tmp also avoids inheriting an absent submit-node TMPDIR. + local rpcbase="/tmp/vllm-rpc-$USER-$SLURM_JOB_ID-$port" + mkdir -p "$rpcbase" local -a cmd=(uv run inference @ "$CONFIG_PATH" --server.host 0.0.0.0 --server.port "$port" --parallel.dp "$dp" --data-parallel-size-local 1 --api-server-count 1) [ -n "$rpc" ] && cmd+=(--data-parallel-rpc-port "$rpc") [ -n "$extra" ] && cmd+=(--vllm-extra "$extra") if [ -n "$nixl" ]; then - CUDA_VISIBLE_DEVICES="$gpus" VLLM_NIXL_SIDE_CHANNEL_PORT="$nixl" "${cmd[@]}" 2>&1 | tee -a "$log" & + CUDA_VISIBLE_DEVICES="$gpus" VLLM_NIXL_SIDE_CHANNEL_PORT="$nixl" VLLM_RPC_BASE_PATH="$rpcbase" "${cmd[@]}" 2>&1 | tee -a "$log" & else - CUDA_VISIBLE_DEVICES="$gpus" "${cmd[@]}" 2>&1 | tee -a "$log" & + CUDA_VISIBLE_DEVICES="$gpus" VLLM_RPC_BASE_PATH="$rpcbase" "${cmd[@]}" 2>&1 | tee -a "$log" & fi } diff --git a/src/prime_rl/templates/_launch_router.sh.j2 b/src/prime_rl/templates/_launch_router.sh.j2 index f9f2ef926f..03f02052ae 100644 --- a/src/prime_rl/templates/_launch_router.sh.j2 +++ b/src/prime_rl/templates/_launch_router.sh.j2 @@ -1,9 +1,59 @@ {# Shared router launch helper, included once per template and called by the - replica-head rank. vllm-router today; an llm-d (EPP+Envoy) branch can slot in - here so the call sites stay router-agnostic. #} -# launch_router [replica_idx] + replica-head rank. The router backend (vllm-router vs llm-d EPP+Envoy) is + chosen here via router.type, so the call sites stay router-agnostic. + node_base/node_count locate this replica's nodes in HOSTNAMES for llm-d + endpoint discovery (disagg uses NUM_PREFILL_NODES/NUM_DECODE_NODES). #} +# launch_router [node_count] launch_router() { - local mode="$1" worker_args="$2" port="$3" policy="$4" log="$5" replica="${6:-}" + local mode="$1" worker_args="$2" port="$3" policy="$4" log="$5" replica="${6:-0}" node_base="${7:-0}" node_count="${8:-0}" +{%- if router.type == "llm-d" %} + echo "Starting llm-d router (EPP+Envoy) on $LOCAL_IP:$port for replica $replica" | tee "$log" + export PATH="$PROJECT_DIR/third_party/llmd/bin:$PATH" + local LLMD_DIR="$OUTPUT_DIR/logs/inference/llmd_${replica}" + mkdir -p "$LLMD_DIR" + read -ra HOSTNAMES <<< "$HOSTNAMES_STR" +{%- if is_disaggregated %} +{%- set ep_disaggregated = true %} +{%- set ep_num_prefill = num_prefill_nodes %} +{%- set ep_num_decode = num_decode_nodes %} +{%- set ep_dp = dp_per_node %} +{%- set ep_prefill_port = prefill_port %} +{%- set ep_sidecar_port = router.decode_sidecar_port %} + cat > "$LLMD_DIR/epp.yaml" < "$LLMD_DIR/endpoints.yaml" < "$LLMD_DIR/epp.yaml" < "$LLMD_DIR/endpoints.yaml" < "$LLMD_DIR/envoy.yaml" <> "$log" 2>&1 & + envoy -c "$LLMD_DIR/envoy.yaml" >> "$log" 2>&1 & +{%- else %} local tag="vllm-router"; [ "$mode" = pd ] && tag="vllm-router (PD)" echo "Starting $tag on $LOCAL_IP:$port${replica:+ for replica $replica}" | tee "$log" local -a cmd=(vllm-router --policy "$policy" --host 0.0.0.0 --port "$port" @@ -14,4 +64,5 @@ launch_router() { cmd+=(--worker-urls $worker_args) fi "${cmd[@]}" >> "$log" 2>&1 & +{%- endif %} } diff --git a/src/prime_rl/templates/_mooncake_store.sh.j2 b/src/prime_rl/templates/_mooncake_store.sh.j2 index 6068fd06d1..bff560a609 100644 --- a/src/prime_rl/templates/_mooncake_store.sh.j2 +++ b/src/prime_rl/templates/_mooncake_store.sh.j2 @@ -5,7 +5,8 @@ SLURM templates inside the per-node `bash -c` body, so NO single quotes / apostrophes. Requires shell vars: LOCAL_IP, INFER_NODE_RANK, OUTPUT_DIR (and SLURM env). Requires jinja vars: kv_offload_cpu_bytes, kv_offload_disk_path, kv_offload_device_name. - Exports PYTHONHASHSEED + MOONCAKE_CONFIG_PATH for the vLLM process on this node. -#} + Exports PYTHONHASHSEED + MOONCAKE_CONFIG_PATH for the vLLM process. (VLLM_RPC_BASE_PATH, + used by the mooncake-store lookup ipc socket, is set per-rank in _launch_rank.sh.j2.) -#} # ---- Mooncake distributed store (shared pool across inference nodes) ---- MC_DIR="$OUTPUT_DIR/mooncake/node_${INFER_NODE_RANK}" mkdir -p "$MC_DIR" @@ -23,9 +24,11 @@ {%- endif %} if [ "$SLURMD_NODENAME" = "$MC_HEAD" ]; then {%- if kv_offload_disk_path %} - mooncake_master -rpc_port=50051 -enable_http_metadata_server -http_metadata_server_host=0.0.0.0 -http_metadata_server_port=8080 -enable_offload=true -root_fs_dir={{ kv_offload_disk_path }} > "$MC_DIR/master.log" 2>&1 & + mooncake_master -rpc_port=50051 -enable_http_metadata_server -http_metadata_server_host=0.0.0.0 -http_metadata_server_port=8080 -default_kv_lease_ttl=3600000 -enable_offload=true -root_fs_dir={{ kv_offload_disk_path }} > "$MC_DIR/master.log" 2>&1 & {%- else %} - mooncake_master -rpc_port=50051 -enable_http_metadata_server -http_metadata_server_host=0.0.0.0 -http_metadata_server_port=8080 > "$MC_DIR/master.log" 2>&1 & + # default_kv_lease_ttl raised from 5s: KV blocks were expiring before the decode side + # reused them ("invalid KV blocks ... may have expired"), the real non-NIC config issue. + mooncake_master -rpc_port=50051 -enable_http_metadata_server -http_metadata_server_host=0.0.0.0 -http_metadata_server_port=8080 -default_kv_lease_ttl=3600000 > "$MC_DIR/master.log" 2>&1 & {%- endif %} fi # Every node waits for the shared master + metadata, then contributes its segment. diff --git a/src/prime_rl/templates/inference.sbatch.j2 b/src/prime_rl/templates/inference.sbatch.j2 index e7d7f784bc..836486ad40 100755 --- a/src/prime_rl/templates/inference.sbatch.j2 +++ b/src/prime_rl/templates/inference.sbatch.j2 @@ -28,7 +28,7 @@ set -e # Configs export NUM_NODES={{ num_nodes }} export GPUS_PER_NODE={{ gpus_per_node }} -{% if disaggregated -%} +{% if is_disaggregated -%} export NUM_PREFILL_NODES={{ num_prefill_nodes }} export NUM_DECODE_NODES={{ num_decode_nodes }} export NUM_PREFILL_REPLICAS={{ num_prefill_replicas }} @@ -38,10 +38,13 @@ export NODES_PER_PREFILL_REPLICA=$((NUM_PREFILL_NODES / NUM_PREFILL_REPLICAS)) export NODES_PER_DECODE_REPLICA=$((NUM_DECODE_NODES / NUM_DECODE_REPLICAS)) export PREFILL_PORT={{ prefill_port }} export DECODE_PORT={{ decode_port }} -export ROUTER_PORT={{ router_port }} +export ROUTER_PORT={{ router.port }} export RPC_PORT={{ data_parallel_rpc_port }} +{%- if router.type == "llm-d" %} +export DECODE_SIDECAR_PORT={{ router.decode_sidecar_port }} +{%- endif %} {%- elif num_nodes > 1 %} -export ROUTER_PORT={{ router_port }} +export ROUTER_PORT={{ router.port }} export BACKEND_PORT={{ backend_port }} export RPC_PORT={{ data_parallel_rpc_port }} {%- endif %} @@ -70,7 +73,7 @@ uv sync --all-extras # Print inference URLs export HOSTNAMES=( $( scontrol show hostnames $SLURM_JOB_NODELIST ) ) export HOSTNAMES_STR="${HOSTNAMES[*]}" -{% if disaggregated -%} +{% if is_disaggregated -%} # Build per-replica router URLs INFER_URLS="" ALL_ROUTER_ARGS="" @@ -132,6 +135,10 @@ srun bash -s <<'CLEANUP_SH' pkill -9 -f "[p]ython.*prime_rl" 2>/dev/null pkill -9 -f "[t]orchrun" 2>/dev/null pkill -9 -f "[v]llm-router" 2>/dev/null + # llm-d router procs (EPP + Envoy + decode pd-sidecar) left over from a prior run. + pkill -9 -f "[e]pp.*pool-name" 2>/dev/null + pkill -9 -f "[e]nvoy.*envoy.yaml" 2>/dev/null + pkill -9 -f "[p]d-sidecar" 2>/dev/null pkill -9 -f "[v]llm" 2>/dev/null pkill -9 -f "[p]rime_rl" 2>/dev/null # Also match prctl-set process names (kernel comm) like "vllm::router" @@ -144,7 +151,7 @@ srun bash -s <<'CLEANUP_SH' pkill -9 -x mooncake_client 2>/dev/null sleep 2 rm -rf /dev/shm/vllm-* /dev/shm/vllm_* /tmp/vllm-* /tmp/vllm_* /tmp/torch-* /tmp/torchelastic_* 2>/dev/null - procs=$(ps -eo comm,args | grep -E "python|torchrun|vllm|vllm::|mooncake_" | grep -v grep | wc -l) + procs=$(ps -eo comm,args | grep -E "python|torchrun|vllm|vllm::|mooncake_|[e]pp |envoy|pd-sidecar" | grep -v grep | wc -l) gpu=$(nvidia-smi --query-gpu=memory.used --format=csv,noheader,nounits | awk "{s+=\$1} END {print s}") echo "[node-cleanup] $(hostname) procs=$procs gpu_mem=${gpu}MiB" CLEANUP_SH @@ -179,7 +186,7 @@ srun --kill-on-bad-exit=1 bash -c ' {% include "_mooncake_store.sh.j2" %} {%- endif %} -{% if disaggregated -%} +{% if is_disaggregated -%} # NVSHMEM libs for DeepEP — ensure the rpath target exists on all nodes if [ ! -d /tmp/deepep_build/nvshmem/lib ] && [ -d "$OUTPUT_DIR/nvshmem/lib" ]; then mkdir -p /tmp/deepep_build/nvshmem @@ -271,8 +278,28 @@ srun --kill-on-bad-exit=1 bash -c ' # Start the router on the first prefill node of each replica (external LB across per-rank endpoints) if [ "$RANK_IN_REPLICA" -eq 0 ]; then REPLICA_ROUTER_ARGS=$(echo "$ALL_ROUTER_ARGS" | cut -d"|" -f$((REPLICA_IDX + 1))) - launch_router pd "$REPLICA_ROUTER_ARGS" "$ROUTER_PORT" "{{ router_policy }}" "$OUTPUT_DIR/logs/inference/router_${REPLICA_IDX}.log" "$REPLICA_IDX" + launch_router pd "$REPLICA_ROUTER_ARGS" "$ROUTER_PORT" "{{ router.policy }}" "$OUTPUT_DIR/logs/inference/router_${REPLICA_IDX}.log" "$REPLICA_IDX" "$REPLICA_BASE" + fi +{%- if router.type == "llm-d" %} + # pd-sidecar on each decode node: EPP routes decode -> sidecar (DECODE_SIDECAR_PORT + # + rank) -> vLLM decode (DECODE_PORT + rank) after orchestrating remote prefill. + # Every decode node fronts its own local ranks, so launch on each (not just the + # replica head), or decode nodes beyond the head get advertised but unserved endpoints. + if [ "$ROLE" = "decode" ]; then + SIDECAR_LOG="$OUTPUT_DIR/logs/inference/sidecar_${INFER_NODE_RANK}.log" + echo "Starting pd-sidecar on $LOCAL_IP:$DECODE_SIDECAR_PORT(+rank) -> vLLM $DECODE_PORT(+rank)" | tee $SIDECAR_LOG + export PATH="$PROJECT_DIR/third_party/llmd/bin:$PATH" + pd-sidecar \ + --port $DECODE_SIDECAR_PORT \ + --vllm-port $DECODE_PORT \ + --kv-connector nixlv2 \ + --secure-proxy=false \ + --enable-ssrf-protection=false \ + --inference-pool prime-rl \ + --data-parallel-size {{ dp_per_node }} \ + >> $SIDECAR_LOG 2>&1 & fi +{%- endif %} # External-LB: one vLLM API server per local DP rank (PORT + k), each pinned to its TP # slice of GPUs with a unique NIXL side-channel port. Each role is its own external-LB DP @@ -296,7 +323,7 @@ srun --kill-on-bad-exit=1 bash -c ' # Start the router on node 0 (balances across per-rank endpoints — no intra-node DP header) if [ "$INFER_NODE_RANK" -eq 0 ]; then - launch_router regular "$ROUTER_ARGS" "$ROUTER_PORT" "{{ router_policy }}" "$OUTPUT_DIR/logs/inference/router.log" + launch_router regular "$ROUTER_ARGS" "$ROUTER_PORT" "{{ router.policy }}" "$OUTPUT_DIR/logs/inference/router.log" 0 0 "$NUM_NODES" fi # External-LB: one vLLM API server per DP rank on this node (BACKEND_PORT + dp_local), diff --git a/src/prime_rl/templates/llmd/endpoints.yaml.j2 b/src/prime_rl/templates/llmd/endpoints.yaml.j2 new file mode 100644 index 0000000000..1f3bec6214 --- /dev/null +++ b/src/prime_rl/templates/llmd/endpoints.yaml.j2 @@ -0,0 +1,41 @@ +{# llm-d file-discovery endpoints: one entry per DP rank. Rendered (via include) + into the sbatch; the per-node address is the __LLMD_ADDR___ placeholder, + filled with each SLURM host's IPv4 inline in the sbatch (file-discovery rejects + hostnames). Node ordering matches the sbatch's host loop: prefill then decode. + The including sbatch sets ep_* via {% set %}. #} +endpoints: +{%- if ep_disaggregated %} +{%- for node in range(ep_num_prefill) %} +{%- for rank in range(ep_dp) %} + - name: prefill-{{ node }}-rank-{{ rank }} + address: __LLMD_ADDR_{{ node }}__ + port: "{{ ep_prefill_port + rank }}" + namespace: default + labels: + llm-d.ai/pool: prime-rl + llm-d.ai/role: prefill +{%- endfor %} +{%- endfor %} +{%- for node in range(ep_num_decode) %} +{%- for rank in range(ep_dp) %} + - name: decode-{{ node }}-rank-{{ rank }} + address: __LLMD_ADDR_{{ ep_num_prefill + node }}__ + port: "{{ ep_sidecar_port + rank }}" + namespace: default + labels: + llm-d.ai/pool: prime-rl + llm-d.ai/role: decode +{%- endfor %} +{%- endfor %} +{%- else %} +{%- for node in range(ep_num_nodes) %} +{%- for rank in range(ep_dp) %} + - name: backend-{{ node }}-rank-{{ rank }} + address: __LLMD_ADDR_{{ node }}__ + port: "{{ ep_backend_port + rank }}" + namespace: default + labels: + llm-d.ai/pool: prime-rl +{%- endfor %} +{%- endfor %} +{%- endif %} diff --git a/src/prime_rl/templates/llmd/envoy.yaml.j2 b/src/prime_rl/templates/llmd/envoy.yaml.j2 new file mode 100644 index 0000000000..ca0d8d1569 --- /dev/null +++ b/src/prime_rl/templates/llmd/envoy.yaml.j2 @@ -0,0 +1,97 @@ +{# Envoy data plane: EPP picks the backend (ext_proc) and sets + x-gateway-destination-endpoint; the ORIGINAL_DST cluster routes there. Both + /v1/ (MITO) and /inference/v1/ (TITO) go through the picker. EPP gRPC is on + loopback 9002; the gateway listens on the configured port. #} +admin: + address: { socket_address: { address: 127.0.0.1, port_value: 9901 } } +static_resources: + listeners: + - name: prime_rl + address: { socket_address: { address: 0.0.0.0, port_value: {{ router.port }} } } + filter_chains: + - filters: + - name: envoy.filters.network.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: prime_rl + codec_type: AUTO + stream_idle_timeout: 0s + request_timeout: 0s + route_config: + virtual_hosts: + - name: prime_rl + domains: ["*"] + routes: + - match: { prefix: "/v1/" } + route: { cluster: backend_cluster, timeout: 0s, idle_timeout: 86400s } + - match: { prefix: "/inference/v1/" } + route: { cluster: backend_cluster, timeout: 0s, idle_timeout: 86400s } + http_filters: + - name: envoy.filters.http.ext_proc + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExternalProcessor + grpc_service: + envoy_grpc: { cluster_name: epp_cluster } + timeout: 10s + processing_mode: + request_header_mode: SEND + response_header_mode: SEND + request_body_mode: FULL_DUPLEX_STREAMED + response_body_mode: FULL_DUPLEX_STREAMED + request_trailer_mode: SEND + response_trailer_mode: SEND + message_timeout: 1000s + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + suppress_envoy_headers: true + clusters: + - name: backend_cluster + type: ORIGINAL_DST + lb_policy: CLUSTER_PROVIDED + original_dst_lb_config: + use_http_header: true + http_header_name: x-gateway-destination-endpoint + connect_timeout: 5s + # Default circuit breakers (~1024) overflow at high inflight concurrency (>=1536), + # causing "reset reason: overflow" 500s. Raise far above the max concurrent rollouts. + circuit_breakers: + thresholds: + - priority: DEFAULT + max_connections: 100000 + max_pending_requests: 100000 + max_requests: 100000 + max_retries: 1000 + - priority: HIGH + max_connections: 100000 + max_pending_requests: 100000 + max_requests: 100000 + max_retries: 1000 + - name: epp_cluster + type: STRICT_DNS + typed_extension_protocol_options: + envoy.extensions.upstreams.http.v3.HttpProtocolOptions: + "@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions + explicit_http_config: + http2_protocol_options: {} + load_assignment: + cluster_name: epp_cluster + endpoints: + - lb_endpoints: + - endpoint: + address: { socket_address: { address: 127.0.0.1, port_value: 9002 } } + connect_timeout: 5s + # ext_proc (EPP) connection pool overflows at high concurrency without this — the + # gRPC ext_proc calls are 1:1 with inflight requests, so cap well above max rollouts. + circuit_breakers: + thresholds: + - priority: DEFAULT + max_connections: 100000 + max_pending_requests: 100000 + max_requests: 100000 + max_retries: 1000 + - priority: HIGH + max_connections: 100000 + max_pending_requests: 100000 + max_requests: 100000 + max_retries: 1000 diff --git a/src/prime_rl/templates/llmd/epp_estimate.yaml.j2 b/src/prime_rl/templates/llmd/epp_estimate.yaml.j2 new file mode 100644 index 0000000000..27f041ff1b --- /dev/null +++ b/src/prime_rl/templates/llmd/epp_estimate.yaml.j2 @@ -0,0 +1,34 @@ +{# llm-d EPP config: single-pool (non-P/D) routing with approximate prefix-cache + scoring. The vllmhttp parser embeds the OpenAI parser and also handles + /inference/v1/generate (token_ids), so MITO + TITO both get prefix-cache + scoring. Included into the sbatch as a heredoc; $LLMD_DIR is the per-replica + config dir, expanded at launch time. #} +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: + - name: file-disc + type: file-discovery + parameters: + path: $LLMD_DIR/endpoints.yaml + watchFile: true + - name: vllmhttp-parser + type: vllmhttp-parser +{%- for scorer in router.scorers %} + - type: {{ scorer }} +{%- endfor %} + - type: max-score-picker + - type: single-profile-handler +schedulingProfiles: + - name: default + plugins: +{%- for scorer, weight in router.scorers.items() %} + - pluginRef: {{ scorer }} + weight: {{ weight }} +{%- endfor %} + - pluginRef: max-score-picker +requestHandler: + parser: + pluginRef: vllmhttp-parser +dataLayer: + discovery: + pluginRef: file-disc diff --git a/src/prime_rl/templates/llmd/epp_pd.yaml.j2 b/src/prime_rl/templates/llmd/epp_pd.yaml.j2 new file mode 100644 index 0000000000..909b531fcd --- /dev/null +++ b/src/prime_rl/templates/llmd/epp_pd.yaml.j2 @@ -0,0 +1,56 @@ +{# llm-d EPP config: prefill/decode disaggregation. disagg-profile-handler runs + the prefill profile when prefix-based-pd-decider sees >= nonCachedTokens + non-cached prompt tokens, else decode-only. Mirrors the upstream llm-d PD + guide: the prefill profile uses prefill_scorers (prefix + queue/kv) and the + decode profile uses scorers (prefix + active-request); prefill-filter / + decode-filter select endpoints by llm-d.ai/role label. Included into the + sbatch as a heredoc; $LLMD_DIR is the per-replica config dir, expanded at + launch time. #} +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: + - name: file-disc + type: file-discovery + parameters: + path: $LLMD_DIR/endpoints.yaml + watchFile: true + - name: vllmhttp-parser + type: vllmhttp-parser + - name: pd-decider + type: prefix-based-pd-decider + parameters: + nonCachedTokens: {{ router.non_cached_tokens }} + - name: profile-handler + type: disagg-profile-handler + parameters: + deciders: + prefill: pd-decider + - type: prefill-filter + - type: decode-filter +{%- for scorer in ((router.prefill_scorers.keys() | list) + (router.decode_scorers.keys() | list)) | unique %} + - type: {{ scorer }} +{%- endfor %} + - type: max-score-picker +schedulingProfiles: + - name: prefill + plugins: + - pluginRef: prefill-filter +{%- for scorer, weight in router.prefill_scorers.items() %} + - pluginRef: {{ scorer }} + weight: {{ weight }} +{%- endfor %} + - pluginRef: max-score-picker + - name: decode + plugins: + - pluginRef: decode-filter +{%- for scorer, weight in router.decode_scorers.items() %} + - pluginRef: {{ scorer }} + weight: {{ weight }} +{%- endfor %} + - pluginRef: max-score-picker +requestHandler: + parser: + pluginRef: vllmhttp-parser +dataLayer: + discovery: + pluginRef: file-disc diff --git a/src/prime_rl/templates/multi_node_rl.sbatch.j2 b/src/prime_rl/templates/multi_node_rl.sbatch.j2 index c462610778..735bb64662 100755 --- a/src/prime_rl/templates/multi_node_rl.sbatch.j2 +++ b/src/prime_rl/templates/multi_node_rl.sbatch.j2 @@ -32,7 +32,7 @@ export NODES_PER_INFER_REPLICA={{ nodes_per_infer_replica }} export NUM_INFER_REPLICAS={{ num_infer_replicas }} export GPUS_PER_NODE={{ gpus_per_node }} {% if num_infer_nodes > 0 -%} -export ROUTER_PORT={{ router_port }} +export ROUTER_PORT={{ router.port }} export INFERENCE_TP={{ inference_tp }} export INFERENCE_DP_LOCAL=$((GPUS_PER_NODE / INFERENCE_TP)) export INFERENCE_DATA_PARALLEL_RPC_PORT={{ inference_data_parallel_rpc_port }} @@ -45,6 +45,9 @@ export NODES_PER_PREFILL_REPLICA=$((NUM_PREFILL_NODES / NUM_PREFILL_REPLICAS)) export NODES_PER_DECODE_REPLICA=$((NUM_DECODE_NODES / NUM_DECODE_REPLICAS)) export PREFILL_PORT={{ prefill_port }} export DECODE_PORT={{ decode_port }} +{%- if router.type == "llm-d" %} +export DECODE_SIDECAR_PORT={{ router.decode_sidecar_port }} +{%- endif %} {%- else -%} export BACKEND_PORT={{ backend_port }} export INFERENCE_ENABLE_EXPERT_PARALLEL={{ "1" if inference_enable_expert_parallel else "0" }} @@ -166,6 +169,10 @@ srun bash -s <<'CLEANUP_SH' pkill -9 -f "[p]ython.*prime_rl" 2>/dev/null pkill -9 -f "[t]orchrun" 2>/dev/null pkill -9 -f "[v]llm-router" 2>/dev/null + # llm-d router procs (EPP + Envoy + decode pd-sidecar) left over from a prior run. + pkill -9 -f "[e]pp.*pool-name" 2>/dev/null + pkill -9 -f "[e]nvoy.*envoy.yaml" 2>/dev/null + pkill -9 -f "[p]d-sidecar" 2>/dev/null pkill -9 -f "[v]llm" 2>/dev/null pkill -9 -f "[p]rime_rl" 2>/dev/null # Also match prctl-set process names (kernel comm) like "vllm::router" @@ -178,7 +185,7 @@ srun bash -s <<'CLEANUP_SH' pkill -9 -x mooncake_client 2>/dev/null sleep 2 rm -rf /dev/shm/vllm-* /dev/shm/vllm_* /tmp/vllm-* /tmp/vllm_* /tmp/torch-* /tmp/torchelastic_* 2>/dev/null - procs=$(ps -eo comm,args | grep -E "python|torchrun|vllm|vllm::|mooncake_" | grep -v grep | wc -l) + procs=$(ps -eo comm,args | grep -E "python|torchrun|vllm|vllm::|mooncake_|[e]pp |envoy|pd-sidecar" | grep -v grep | wc -l) gpu=$(nvidia-smi --query-gpu=memory.used --format=csv,noheader,nounits | awk "{s+=\$1} END {print s}") echo "[node-cleanup] $(hostname) procs=$procs gpu_mem=${gpu}MiB" CLEANUP_SH @@ -307,8 +314,28 @@ if [ "$SLURM_PROCID" -lt "$NUM_INFER_NODES" ]; then # Start the router on the first node of each replica (PD mode, external LB across per-rank endpoints) if [ "$RANK_IN_REPLICA" -eq 0 ]; then REPLICA_ROUTER_ARGS=$(echo "$ALL_ROUTER_ARGS" | cut -d"|" -f$((REPLICA_IDX + 1))) - launch_router pd "$REPLICA_ROUTER_ARGS" "$ROUTER_PORT" consistent_hash "$OUTPUT_DIR/logs/inference/router_${REPLICA_IDX}.log" "$REPLICA_IDX" + launch_router pd "$REPLICA_ROUTER_ARGS" "$ROUTER_PORT" "{{ router.policy }}" "$OUTPUT_DIR/logs/inference/router_${REPLICA_IDX}.log" "$REPLICA_IDX" "$REPLICA_BASE" + fi +{%- if router.type == "llm-d" %} + # pd-sidecar on each decode node: EPP routes decode -> sidecar (DECODE_SIDECAR_PORT + # + rank) -> vLLM decode (DECODE_PORT + rank) after orchestrating remote prefill. + # Every decode node fronts its own local ranks, so launch on each (not just the + # replica head), or decode nodes beyond the head get advertised but unserved endpoints. + if [ "$ROLE" = "decode" ]; then + SIDECAR_LOG="$OUTPUT_DIR/logs/inference/sidecar_${INFER_NODE_RANK}.log" + echo "Starting pd-sidecar on $LOCAL_IP:$DECODE_SIDECAR_PORT(+rank) -> vLLM $DECODE_PORT(+rank)" | tee $SIDECAR_LOG + export PATH="$PROJECT_DIR/third_party/llmd/bin:$PATH" + pd-sidecar \ + --port $DECODE_SIDECAR_PORT \ + --vllm-port $DECODE_PORT \ + --kv-connector nixlv2 \ + --secure-proxy=false \ + --enable-ssrf-protection=false \ + --inference-pool prime-rl \ + --data-parallel-size $INFERENCE_DP_LOCAL \ + >> $SIDECAR_LOG 2>&1 & fi +{%- endif %} # External-LB: one vLLM API server per local DP rank (PORT + k), each pinned to its TP # slice of GPUs with a unique NIXL side-channel port. Each role is its own external-LB DP @@ -325,7 +352,7 @@ if [ "$SLURM_PROCID" -lt "$NUM_INFER_NODES" ]; then # Start the router on the first node of each replica if [ "$RANK_IN_REPLICA" -eq 0 ]; then REPLICA_ROUTER_ARGS=$(echo "$ALL_ROUTER_ARGS" | cut -d"|" -f$((REPLICA_IDX + 1))) - launch_router regular "$REPLICA_ROUTER_ARGS" "$ROUTER_PORT" consistent_hash "$OUTPUT_DIR/logs/inference/router_${REPLICA_IDX}.log" "$REPLICA_IDX" + launch_router regular "$REPLICA_ROUTER_ARGS" "$ROUTER_PORT" "{{ router.policy }}" "$OUTPUT_DIR/logs/inference/router_${REPLICA_IDX}.log" "$REPLICA_IDX" "$((REPLICA_IDX * NODES_PER_INFER_REPLICA))" "$NODES_PER_INFER_REPLICA" fi # External-LB: one vLLM API server per DP rank on this node (BACKEND_PORT + d), diff --git a/src/prime_rl/utils/client.py b/src/prime_rl/utils/client.py index 533f6e2711..8e009b9b31 100644 --- a/src/prime_rl/utils/client.py +++ b/src/prime_rl/utils/client.py @@ -17,15 +17,15 @@ from prime_rl.configs.shared import ClientConfig from prime_rl.utils.logger import get_logger -# Identity tuple used by ``select_train_client`` to key load counts. ``api_base_url`` -# distinguishes servers; ``X-data-parallel-rank`` distinguishes DP shards within a -# server, since the router uses that header to route to specific GPU ranks. -ClientIdentity = tuple[str, str | None] +# Identity used by ``select_train_client`` to key load counts. With external-LB +# data parallelism each DP rank is its own endpoint, so ``api_base_url`` alone +# uniquely identifies an inference target. +ClientIdentity = str def client_identity(client: vf.ClientConfig) -> ClientIdentity: """Stable identity for load balancing across inference clients.""" - return (client.api_base_url, client.extra_headers.get("X-data-parallel-rank")) + return client.api_base_url @runtime_checkable @@ -200,27 +200,24 @@ def setup_clients( k: v for k, v in ((k, os.getenv(v)) for k, v in client_config.headers_from_env.items()) if v is not None } for base_url in client_config.base_url: - for dp_rank in range(client_config.dp_rank_count): - headers = {**client_config.headers, **env_headers} - if client_config.dp_rank_count > 1: - headers["X-data-parallel-rank"] = str(dp_rank) - clients.append( - vf.ClientConfig( - client_idx=client_idx, - client_type=client_type, - api_base_url=base_url, - api_key_var=client_config.api_key_var, - timeout=client_config.timeout, - connect_timeout=client_config.connect_timeout, - max_connections=8192, - max_keepalive_connections=8192, - max_retries=10, - extra_headers=headers, - extra_headers_from_state=client_config.extra_headers_from_state, - **renderer_extra, - ) + headers = {**client_config.headers, **env_headers} + clients.append( + vf.ClientConfig( + client_idx=client_idx, + client_type=client_type, + api_base_url=base_url, + api_key_var=client_config.api_key_var, + timeout=client_config.timeout, + connect_timeout=client_config.connect_timeout, + max_connections=8192, + max_keepalive_connections=8192, + max_retries=10, + extra_headers=headers, + extra_headers_from_state=client_config.extra_headers_from_state, + **renderer_extra, ) - client_idx += 1 + ) + client_idx += 1 return clients diff --git a/src/prime_rl/utils/elastic.py b/src/prime_rl/utils/elastic.py index 951b3673c1..b221046078 100644 --- a/src/prime_rl/utils/elastic.py +++ b/src/prime_rl/utils/elastic.py @@ -197,7 +197,6 @@ def _rebuild_clients(self) -> None: api_key_var=self.client_config.api_key_var, headers=self.client_config.headers, headers_from_env=self.client_config.headers_from_env, - dp_rank_count=self.client_config.dp_rank_count, extra_headers_from_state=self.client_config.extra_headers_from_state, ) self._train_clients = ( diff --git a/tests/unit/utils/test_client.py b/tests/unit/utils/test_client.py index 69325e6c4a..8e7f79c3a1 100644 --- a/tests/unit/utils/test_client.py +++ b/tests/unit/utils/test_client.py @@ -49,14 +49,15 @@ def test_load_lora_adapter_succeeds_on_first_attempt(): ) -def test_setup_clients_assigns_renderer_and_dp_rank_headers(): +def test_setup_clients_creates_one_renderer_client_per_url(): from renderers import Qwen3VLRendererConfig + # External-LB: base_url is the list of per-rank endpoints (the URL is the rank + # selector), so each URL maps to exactly one client with no rank header. client_config = ClientConfig( - base_url=["http://worker-a:8000/v1"], + base_url=["http://worker-a:8000/v1", "http://worker-a:8001/v1"], api_key_var="PRIME_API_KEY", headers={"X-Test": "test"}, - dp_rank_count=2, extra_headers_from_state={"X-Session-ID": "session_id"}, ) @@ -70,8 +71,11 @@ def test_setup_clients_assigns_renderer_and_dp_rank_headers(): assert [client.client_type for client in clients] == ["renderer", "renderer"] assert [client.renderer_config for client in clients] == [renderer_settings, renderer_settings] assert [client.renderer_model_name for client in clients] == [None, None] - assert [client.api_base_url for client in clients] == ["http://worker-a:8000/v1"] * 2 - assert [client.extra_headers["X-data-parallel-rank"] for client in clients] == ["0", "1"] + assert [client.api_base_url for client in clients] == [ + "http://worker-a:8000/v1", + "http://worker-a:8001/v1", + ] + assert all("X-data-parallel-rank" not in client.extra_headers for client in clients) assert clients[0].extra_headers["X-Test"] == "test" assert clients[0].extra_headers_from_state == {"X-Session-ID": "session_id"}