Skip to content
105 changes: 74 additions & 31 deletions python/ray/llm/_internal/serve/core/configs/accelerators.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
import ray.util.accelerators.accelerators as accelerators
from ray.llm._internal.serve.observability.logging import get_logger
from ray.util.placement_group import PlacementGroup, placement_group
from ray.util.tpu import get_tpu_version_from_type, slice_placement_group
from ray.util.tpu import (
get_chips_per_host,
get_tpu_version_from_type,
slice_placement_group,
)

logger = get_logger(__name__)

Expand Down Expand Up @@ -91,14 +95,40 @@ def create_placement_group(
) -> PlacementGroup:
pass

@property
def requires_deferred_placement_group(self) -> bool:
"""
If True, Ray Serve will not provision a placement group for the deployment.
Instead, creation is deferred to the replica at runtime.
Defaults to False.
"""
return False
def get_placement_group_bundle_label_selector(
self, accelerator_type_str: Optional[str] = None
) -> Optional[Dict[str, str]]:
"""Returns label selectors to apply to the placement group bundles."""
return None

def apply_placement_group_bundle_label_selector(
self,
deployment_options: Dict[str, Any],
accelerator_type_str: Optional[str],
num_bundles: int,
) -> None:
"""Safely applies hardware-specific label selectors to the deployment options."""
accel_selectors = self.get_placement_group_bundle_label_selector(
accelerator_type_str
)
if not accel_selectors:
return

existing_selectors = (
deployment_options.get("placement_group_bundle_label_selector") or []
)
merged_selectors = []

for i in range(num_bundles):
selector = (
existing_selectors[i].copy()
if i < len(existing_selectors) and existing_selectors[i]
else {}
)
selector.update(accel_selectors)
merged_selectors.append(selector)

deployment_options["placement_group_bundle_label_selector"] = merged_selectors

@property
@abstractmethod
Expand Down Expand Up @@ -180,10 +210,21 @@ def __init__(self, config: TPUConfig):
def default_bundles(
self, *, num_devices: int, accelerator_type_str: Optional[str] = None
):
bundle = {"TPU": 1}
if self._config.topology and accelerator_type_str:
version = get_tpu_version_from_type(accelerator_type_str)
chips_per_host = get_chips_per_host(self._config.topology, version)

num_bundles = max(1, num_devices // chips_per_host)
bundle = {"TPU": chips_per_host}
else:
# Fallback to single-chip/single-host scheduling
num_bundles = num_devices
bundle = {"TPU": 1}

if accelerator_type_str:
bundle[format_ray_accelerator_resource(accelerator_type_str)] = 0.001
return [bundle.copy() for _ in range(num_devices)]

return [bundle.copy() for _ in range(num_bundles)]

def create_placement_group(
self,
Expand Down Expand Up @@ -211,7 +252,8 @@ def create_placement_group(
if not tpu_bundles:
worker_bundle = {"TPU": 1}
else:
worker_bundle = tpu_bundles[0]
# Use the last bundle to avoid picking up merged CPU driver resources
worker_bundle = tpu_bundles[-1]

# Ensure all TPU bundles are homogeneous
if any(b != worker_bundle for b in tpu_bundles):
Expand Down Expand Up @@ -239,38 +281,39 @@ def create_placement_group(
)
return self._slice_pg_wrapper.placement_group

@property
def requires_deferred_placement_group(self) -> bool:
"""
If a TPU topology is specified, we defer PG creation so the replica can
provision a `SlicePlacementGroup` at runtime. This ensures multi-host
TPU slices are gang-scheduled atomically according to their physical
topology rather than fragmented across the cluster.
"""
return bool(self._config.topology)
def get_placement_group_bundle_label_selector(
self, accelerator_type_str: Optional[str] = None
) -> Optional[Dict[str, str]]:
if self._config.topology and accelerator_type_str:
return {
"ray.io/tpu-topology": self._config.topology,
"ray.io/accelerator-type": accelerator_type_str,
}
return None

@property
def requires_remote_initialization(self) -> bool:
return True

def get_remote_options(self, accelerator_type_str: str = None):
# TPUs use custom resource strings rather than a native kwarg
options: Dict[str, Any] = {"resources": {"TPU": 0.001}}

# The PlacementGroupSchedulingStrategy natively handles routing the task to
# the correct hardware. We omit TPU resource requests to avoid consuming
# chips that the model engine workers must use.
options: Dict[str, Any] = {"resources": {}}
if accelerator_type_str:
options["accelerator_type"] = accelerator_type_str
# Pin the task to the TPU accelerator to avoid scheduling on a CPU bundle.
options["label_selector"] = {
"ray.io/accelerator-type": accelerator_type_str
}
Comment thread
ryanaoleary marked this conversation as resolved.
Comment thread
cursor[bot] marked this conversation as resolved.
return options

def shutdown(self):
if self._slice_pg_wrapper is not None:
slice_pg_wrapper = getattr(self, "_slice_pg_wrapper", None)
if slice_pg_wrapper is not None:
try:
logger.info("Shutting down TPU slice PG for server replica.")
self._slice_pg_wrapper.shutdown()
slice_pg_wrapper.shutdown()
except Exception as e:
logger.warning(f"Failed to shut down TPU slice PG: {e}")
finally:
self._slice_pg_wrapper = None

def __del__(self):
"""Ensure placement groups are cleaned up when this backend is garbage collected."""
self.shutdown()
59 changes: 39 additions & 20 deletions python/ray/llm/_internal/serve/core/server/llm_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,14 @@ async def _maybe_add_request_id_to_request(
"""Add the request id to the request."""
request_id = get_serve_request_id()
if request_id:
request.request_id = request_id
if hasattr(request, "request_id"):
return
try:
request.request_id = request_id
except ValueError:
# Pydantic v2 strict schemas reject unknown fields.
# Safely ignore as request_id is only used for internal Ray Serve logging.
pass

async def _maybe_resolve_lora_from_multiplex(self) -> None:
"""Handle the lora model for the request."""
Expand Down Expand Up @@ -699,27 +706,40 @@ def get_deployment_options(cls, llm_config: "LLMConfig"):
# deployment_options
ray_actor_options = deployment_options.get("ray_actor_options", {})

if not engine_config.accelerator.requires_deferred_placement_group:
replica_actor_resources = {
"CPU": ray_actor_options.get("num_cpus", 1),
"GPU": ray_actor_options.get("num_gpus", 0),
**ray_actor_options.get("resources", {}),
}
if "memory" in ray_actor_options:
replica_actor_resources["memory"] = ray_actor_options["memory"]
replica_actor_resources = {
"CPU": ray_actor_options.get("num_cpus", 1),
"GPU": ray_actor_options.get("num_gpus", 0),
**ray_actor_options.get("resources", {}),
}
if "memory" in ray_actor_options:
replica_actor_resources["memory"] = ray_actor_options["memory"]

# TODO: Move this _merge_replica_actor_and_child_actor_bundles to a
# more generic place.
pg_bundles = _merge_replica_actor_and_child_actor_bundles(
engine_config.placement_bundles, replica_actor_resources
)
# TODO: Move this _merge_replica_actor_and_child_actor_bundles to a
# more generic place.
pg_bundles = _merge_replica_actor_and_child_actor_bundles(
engine_config.placement_bundles, replica_actor_resources
)

deployment_options.update(
{
"placement_group_bundles": pg_bundles,
"placement_group_strategy": engine_config.placement_strategy,
}
)

deployment_options.update(
{
"placement_group_bundles": pg_bundles,
"placement_group_strategy": engine_config.placement_strategy,
}
# Append hardware-specific `bundle_label_selectors` to the deployment options if needed
accelerator_type_str = (
getattr(
llm_config.accelerator_type, "value", str(llm_config.accelerator_type)
)
if llm_config.accelerator_type
else None
)
engine_config.accelerator.apply_placement_group_bundle_label_selector(
deployment_options=deployment_options,
accelerator_type_str=accelerator_type_str,
num_bundles=len(pg_bundles),
)

# Handle env vars from runtime_env
default_runtime_env = ray.get_runtime_context().runtime_env
Expand All @@ -728,7 +748,6 @@ def get_deployment_options(cls, llm_config: "LLMConfig"):
"worker_process_setup_hook"
] = "ray.llm._internal.serve._worker_process_setup_hook"

ray_actor_options = deployment_options.get("ray_actor_options", {})
ray_actor_options["runtime_env"] = {
**default_runtime_env,
# Existing runtime_env should take precedence over the default.
Expand Down
4 changes: 4 additions & 0 deletions python/ray/llm/_internal/serve/engines/vllm/vllm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,10 @@ async def start(self) -> None:
from vllm.entrypoints.openai.api_server import init_app_state

callback = self.llm_config.get_or_create_callback()
if callback.ctx.placement_group:
# Ensure existing PG for the Serve deployment is scheduled
# before attempting to initialize the engine.
await callback.ctx.placement_group.ready()
await callback.run_callback("on_before_node_init")
if callback.ctx.run_init_node:
await initialize_node(self.llm_config)
Expand Down
37 changes: 25 additions & 12 deletions python/ray/llm/tests/serve/cpu/configs/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

from ray.llm._internal.common.utils.download_utils import NodeModelDownloadable
from ray.llm._internal.serve.core.configs.accelerators import (
CPUAccelerator,
GPUAccelerator,
TPUAccelerator,
TPUConfig,
)
Expand Down Expand Up @@ -403,19 +401,34 @@ def test_engine_config_infers_tpu_from_accelerator_type_string(self):
assert isinstance(engine_config.accelerator, TPUAccelerator)
assert engine_config.accelerator_type == "TPU-V6E"

def test_requires_deferred_placement_group(self):
"""Test that requires_deferred_placement_group correctly identifies deferred PG requirements."""
cpu_accel = CPUAccelerator()
assert cpu_accel.requires_deferred_placement_group is False
def test_tpu_accelerator_get_placement_group_bundle_label_selector(self):
"""Test that TPUAccelerator correctly generates topology labels for Serve."""
tpu_accel_no_topology = TPUAccelerator(TPUConfig(kind="tpu"))
assert (
tpu_accel_no_topology.get_placement_group_bundle_label_selector("TPU-V6E")
is None
)

gpu_accel = GPUAccelerator()
assert gpu_accel.requires_deferred_placement_group is False
tpu_accel_with_topology = TPUAccelerator(TPUConfig(kind="tpu", topology="4x4"))
assert tpu_accel_with_topology.get_placement_group_bundle_label_selector(
"TPU-V6E"
) == {
"ray.io/tpu-topology": "4x4",
"ray.io/accelerator-type": "TPU-V6E",
}

tpu_accel_no_topo = TPUAccelerator(TPUConfig(kind="tpu"))
assert tpu_accel_no_topo.requires_deferred_placement_group is False
def test_tpu_accelerator_get_remote_options(self):
"""Test that TPUAccelerator get_remote_options returns an empty resources dict and label selector."""
tpu_accel = TPUAccelerator(TPUConfig(kind="tpu"))

tpu_accel_with_topo = TPUAccelerator(TPUConfig(kind="tpu", topology="4x4"))
assert tpu_accel_with_topo.requires_deferred_placement_group is True
options_no_type = tpu_accel.get_remote_options()
assert options_no_type == {"resources": {}}

options_with_type = tpu_accel.get_remote_options("TPU-V6E")
assert options_with_type == {
"resources": {},
"label_selector": {"ray.io/accelerator-type": "TPU-V6E"},
}
Comment thread
ryanaoleary marked this conversation as resolved.


if __name__ == "__main__":
Expand Down
3 changes: 2 additions & 1 deletion python/ray/llm/tests/serve/cpu/deployments/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def ray_tpu_cluster():
"ray.io/tpu-slice-name": "test-slice",
"ray.io/tpu-worker-id": str(i),
"ray.io/tpu-pod-type": pod_type,
"ray.io/accelerator-type": "TPU-V6E",
}
resources = {"TPU": 4, "accelerator_type:TPU-V6E": 4}

Expand All @@ -50,6 +51,6 @@ def ray_tpu_cluster():
env_vars=env_vars,
)

ray.init(address=cluster.address)
ray.init(address=cluster.address, ignore_reinit_error=True)
yield cluster
ray.shutdown()
Loading
Loading