Skip to content

Commit 86fa1cb

Browse files
committed
Remove deferred PG logic and ensure Serve replica actor runs on co-located TPU slice
Signed-off-by: ryanaoleary <ryanaoleary@google.com>
1 parent 2585698 commit 86fa1cb

8 files changed

Lines changed: 331 additions & 283 deletions

File tree

python/ray/llm/_internal/serve/core/configs/accelerators.py

Lines changed: 59 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@
88
import ray.util.accelerators.accelerators as accelerators
99
from ray.llm._internal.serve.observability.logging import get_logger
1010
from ray.util.placement_group import PlacementGroup, placement_group
11-
from ray.util.tpu import get_tpu_version_from_type, slice_placement_group
11+
from ray.util.tpu import (
12+
get_chips_per_host,
13+
get_tpu_version_from_type,
14+
slice_placement_group,
15+
)
1216

1317
logger = get_logger(__name__)
1418

@@ -91,14 +95,38 @@ def create_placement_group(
9195
) -> PlacementGroup:
9296
pass
9397

94-
@property
95-
def requires_deferred_placement_group(self) -> bool:
96-
"""
97-
If True, Ray Serve will not provision a placement group for the deployment.
98-
Instead, creation is deferred to the replica at runtime.
99-
Defaults to False.
100-
"""
101-
return False
98+
def get_placement_group_labels(
99+
self, accelerator_type_str: Optional[str] = None
100+
) -> Optional[Dict[str, str]]:
101+
"""Returns labels to be applied to the placement group bundles."""
102+
return None
103+
104+
def apply_placement_group_bundle_labels(
105+
self,
106+
deployment_options: Dict[str, Any],
107+
accelerator_type_str: Optional[str],
108+
num_bundles: int,
109+
) -> None:
110+
"""Safely applies hardware-specific labels to the deployment options."""
111+
accel_labels = self.get_placement_group_labels(accelerator_type_str)
112+
if not accel_labels:
113+
return
114+
115+
existing_selectors = (
116+
deployment_options.get("placement_group_bundle_label_selector") or []
117+
)
118+
merged_selectors = []
119+
120+
for i in range(num_bundles):
121+
labels = (
122+
existing_selectors[i].copy()
123+
if i < len(existing_selectors) and existing_selectors[i]
124+
else {}
125+
)
126+
labels.update(accel_labels)
127+
merged_selectors.append(labels)
128+
129+
deployment_options["placement_group_bundle_label_selector"] = merged_selectors
102130

103131
@property
104132
@abstractmethod
@@ -180,10 +208,21 @@ def __init__(self, config: TPUConfig):
180208
def default_bundles(
181209
self, *, num_devices: int, accelerator_type_str: Optional[str] = None
182210
):
183-
bundle = {"TPU": 1}
211+
if self._config.topology and accelerator_type_str:
212+
version = get_tpu_version_from_type(accelerator_type_str)
213+
chips_per_host = get_chips_per_host(self._config.topology, version)
214+
215+
num_bundles = max(1, num_devices // chips_per_host)
216+
bundle = {"TPU": chips_per_host}
217+
else:
218+
# Fallback to single-chip/single-host scheduling
219+
num_bundles = num_devices
220+
bundle = {"TPU": 1}
221+
184222
if accelerator_type_str:
185223
bundle[format_ray_accelerator_resource(accelerator_type_str)] = 0.001
186-
return [bundle.copy() for _ in range(num_devices)]
224+
225+
return [bundle.copy() for _ in range(num_bundles)]
187226

188227
def create_placement_group(
189228
self,
@@ -239,15 +278,15 @@ def create_placement_group(
239278
)
240279
return self._slice_pg_wrapper.placement_group
241280

242-
@property
243-
def requires_deferred_placement_group(self) -> bool:
244-
"""
245-
If a TPU topology is specified, we defer PG creation so the replica can
246-
provision a `SlicePlacementGroup` at runtime. This ensures multi-host
247-
TPU slices are gang-scheduled atomically according to their physical
248-
topology rather than fragmented across the cluster.
249-
"""
250-
return bool(self._config.topology)
281+
def get_placement_group_labels(
282+
self, accelerator_type_str: Optional[str] = None
283+
) -> Optional[Dict[str, str]]:
284+
if self._config.topology and accelerator_type_str:
285+
return {
286+
"ray.io/tpu-topology": self._config.topology,
287+
"ray.io/accelerator-type": accelerator_type_str,
288+
}
289+
return None
251290

252291
@property
253292
def requires_remote_initialization(self) -> bool:

python/ray/llm/_internal/serve/core/server/llm_server.py

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -706,27 +706,40 @@ def get_deployment_options(cls, llm_config: "LLMConfig"):
706706
# deployment_options
707707
ray_actor_options = deployment_options.get("ray_actor_options", {})
708708

709-
if not engine_config.accelerator.requires_deferred_placement_group:
710-
replica_actor_resources = {
711-
"CPU": ray_actor_options.get("num_cpus", 1),
712-
"GPU": ray_actor_options.get("num_gpus", 0),
713-
**ray_actor_options.get("resources", {}),
714-
}
715-
if "memory" in ray_actor_options:
716-
replica_actor_resources["memory"] = ray_actor_options["memory"]
709+
replica_actor_resources = {
710+
"CPU": ray_actor_options.get("num_cpus", 1),
711+
"GPU": ray_actor_options.get("num_gpus", 0),
712+
**ray_actor_options.get("resources", {}),
713+
}
714+
if "memory" in ray_actor_options:
715+
replica_actor_resources["memory"] = ray_actor_options["memory"]
717716

718-
# TODO: Move this _merge_replica_actor_and_child_actor_bundles to a
719-
# more generic place.
720-
pg_bundles = _merge_replica_actor_and_child_actor_bundles(
721-
engine_config.placement_bundles, replica_actor_resources
722-
)
717+
# TODO: Move this _merge_replica_actor_and_child_actor_bundles to a
718+
# more generic place.
719+
pg_bundles = _merge_replica_actor_and_child_actor_bundles(
720+
engine_config.placement_bundles, replica_actor_resources
721+
)
722+
723+
deployment_options.update(
724+
{
725+
"placement_group_bundles": pg_bundles,
726+
"placement_group_strategy": engine_config.placement_strategy,
727+
}
728+
)
723729

724-
deployment_options.update(
725-
{
726-
"placement_group_bundles": pg_bundles,
727-
"placement_group_strategy": engine_config.placement_strategy,
728-
}
730+
# Append hardware-specific `bundle_label_selectors` to the deployment options if needed
731+
accelerator_type_str = (
732+
getattr(
733+
llm_config.accelerator_type, "value", str(llm_config.accelerator_type)
729734
)
735+
if llm_config.accelerator_type
736+
else None
737+
)
738+
engine_config.accelerator.apply_placement_group_bundle_labels(
739+
deployment_options=deployment_options,
740+
accelerator_type_str=accelerator_type_str,
741+
num_bundles=len(pg_bundles),
742+
)
730743

731744
# Handle env vars from runtime_env
732745
default_runtime_env = ray.get_runtime_context().runtime_env
@@ -735,7 +748,6 @@ def get_deployment_options(cls, llm_config: "LLMConfig"):
735748
"worker_process_setup_hook"
736749
] = "ray.llm._internal.serve._worker_process_setup_hook"
737750

738-
ray_actor_options = deployment_options.get("ray_actor_options", {})
739751
ray_actor_options["runtime_env"] = {
740752
**default_runtime_env,
741753
# Existing runtime_env should take precedence over the default.

python/ray/llm/tests/serve/cpu/configs/test_models.py

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66

77
from ray.llm._internal.common.utils.download_utils import NodeModelDownloadable
88
from ray.llm._internal.serve.core.configs.accelerators import (
9-
CPUAccelerator,
10-
GPUAccelerator,
119
TPUAccelerator,
1210
TPUConfig,
1311
)
@@ -403,19 +401,16 @@ def test_engine_config_infers_tpu_from_accelerator_type_string(self):
403401
assert isinstance(engine_config.accelerator, TPUAccelerator)
404402
assert engine_config.accelerator_type == "TPU-V6E"
405403

406-
def test_requires_deferred_placement_group(self):
407-
"""Test that requires_deferred_placement_group correctly identifies deferred PG requirements."""
408-
cpu_accel = CPUAccelerator()
409-
assert cpu_accel.requires_deferred_placement_group is False
404+
def test_tpu_accelerator_get_placement_group_labels(self):
405+
"""Test that TPUAccelerator correctly generates topology labels for Serve."""
406+
tpu_accel_no_topology = TPUAccelerator(TPUConfig(kind="tpu"))
407+
assert tpu_accel_no_topology.get_placement_group_labels("TPU-V6E") is None
410408

411-
gpu_accel = GPUAccelerator()
412-
assert gpu_accel.requires_deferred_placement_group is False
413-
414-
tpu_accel_no_topo = TPUAccelerator(TPUConfig(kind="tpu"))
415-
assert tpu_accel_no_topo.requires_deferred_placement_group is False
416-
417-
tpu_accel_with_topo = TPUAccelerator(TPUConfig(kind="tpu", topology="4x4"))
418-
assert tpu_accel_with_topo.requires_deferred_placement_group is True
409+
tpu_accel_with_topology = TPUAccelerator(TPUConfig(kind="tpu", topology="4x4"))
410+
assert tpu_accel_with_topology.get_placement_group_labels("TPU-V6E") == {
411+
"ray.io/tpu-topology": "4x4",
412+
"ray.io/accelerator-type": "TPU-V6E",
413+
}
419414

420415
def test_tpu_accelerator_get_remote_options(self):
421416
"""Test that TPUAccelerator get_remote_options returns an empty resources dict and label selector."""
@@ -427,7 +422,6 @@ def test_tpu_accelerator_get_remote_options(self):
427422
options_with_type = tpu_accel.get_remote_options("TPU-V6E")
428423
assert options_with_type == {
429424
"resources": {},
430-
"accelerator_type": "TPU-V6E",
431425
"label_selector": {"ray.io/accelerator-type": "TPU-V6E"},
432426
}
433427

python/ray/llm/tests/serve/cpu/deployments/conftest.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import pytest
22

33
import ray
4+
from ray import serve
45
from ray.tests.conftest import _ray_start_cluster
56

67

@@ -15,6 +16,19 @@ def llm_config_with_mock_engine(llm_config):
1516
yield llm_config
1617

1718

19+
@pytest.fixture(autouse=True)
20+
def isolated_serve_state(ray_tpu_cluster):
21+
"""
22+
Automatically runs before and after every test to ensure Serve
23+
starts on a random port and wipes its state, while reusing the
24+
module scoped cluster.
25+
"""
26+
serve.shutdown()
27+
serve.start(http_options={"port": 0})
28+
yield
29+
serve.shutdown()
30+
31+
1832
@pytest.fixture(scope="module")
1933
def ray_tpu_cluster():
2034
"""
@@ -36,6 +50,7 @@ def ray_tpu_cluster():
3650
"ray.io/tpu-slice-name": "test-slice",
3751
"ray.io/tpu-worker-id": str(i),
3852
"ray.io/tpu-pod-type": pod_type,
53+
"ray.io/accelerator-type": "TPU-V6E",
3954
}
4055
resources = {"TPU": 4, "accelerator_type:TPU-V6E": 4}
4156

0 commit comments

Comments
 (0)