Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
f98d5af
Change default bundles constructed for TPU in LLM to per-host and fix…
ryanaoleary May 8, 2026
6a1511d
Improve lifecycle handling of SlicePlacementGroup and support explici…
ryanaoleary May 6, 2026
5ec15c0
Add AcceleratorConfig to Serve and fix gang scheduling
ryanaoleary May 8, 2026
1c1dda8
fix tests, change discriminator to 'kind', and fix cleanup logic
ryanaoleary May 9, 2026
649229e
fix import and var name
ryanaoleary May 9, 2026
779d95d
add missing import
ryanaoleary May 9, 2026
3a1d724
lint and remove unused type alias
ryanaoleary May 11, 2026
da95aac
Merge branch 'master' into e-serve-accelerator-config
ryanaoleary May 11, 2026
d603123
add comment to inline import
ryanaoleary May 11, 2026
80162c2
Tighten typing for placement-group fields after PR restructure
ryanaoleary May 11, 2026
e45ee82
remove added whitespace
ryanaoleary May 11, 2026
f96ef1e
fix external placement group function override
ryanaoleary May 11, 2026
5d95c78
add resources_per_bundle and fix bundles defaulting logic, also add t…
ryanaoleary May 11, 2026
afb07a0
Safely unwrap ReplicaPlacementGroup for gangs and fix type alias
ryanaoleary May 12, 2026
70b6a6f
Fix placement group leakage on actor creation failure for custom over…
ryanaoleary May 12, 2026
89dc61f
Release TPU reservation holders in cross-language replica startup pat…
ryanaoleary May 12, 2026
f82eab9
Remove redundant replica_pg reassignment in deployment scheduler
ryanaoleary May 12, 2026
19c8aeb
Safeguard check_stopped placement group teardown with robust exceptio…
ryanaoleary May 12, 2026
8eab85a
fix gang pg cleanup to fix tests
ryanaoleary May 12, 2026
c3fdab9
Merge branch 'master' into e-serve-accelerator-config
ryanaoleary May 12, 2026
e84850a
add check in api for accelerator_config and gang at same time
ryanaoleary May 12, 2026
8540e43
Merge branch 'master' into e-serve-accelerator-config
ryanaoleary May 14, 2026
abf974b
Merge branch 'master' into e-serve-accelerator-config
ryanaoleary May 14, 2026
acefb06
Apply suggestions from code review
ryanaoleary May 21, 2026
2deba42
Merge branch 'master' into e-serve-accelerator-config
ryanaoleary May 21, 2026
deb9767
remove circular dependency / import, add constants for commonly used …
ryanaoleary May 21, 2026
2f8282d
run linter and remove empty type checking block
ryanaoleary May 21, 2026
a95f272
run lint again and remove unneeded comment
ryanaoleary May 21, 2026
26ae3e2
move constant to constants.py, remove release_reservation_holders and…
ryanaoleary May 21, 2026
234f19b
remove unused function
ryanaoleary May 21, 2026
3d1bc7c
fix missing import, resolve circular dependency
ryanaoleary May 21, 2026
fb157a7
Merge branch 'master' into e-serve-accelerator-config
ryanaoleary May 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions python/ray/serve/_private/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -891,17 +891,6 @@ class ReplicaQueueLengthInfo:
num_ongoing_requests: int


@dataclass(frozen=True)
class CreatePlacementGroupRequest:
bundles: List[Dict[str, float]]
strategy: str
target_node_id: str
name: str
runtime_env: Optional[str] = None
bundle_label_selector: Optional[List[Dict[str, str]]] = None
fallback_strategy: Optional[List[Dict[str, Any]]] = None


@dataclass
class GangPlacementGroupRequest:
"""Request to reserve gang placement groups for a deployment."""
Expand Down
12 changes: 12 additions & 0 deletions python/ray/serve/_private/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
)
from ray.serve._private.utils import DEFAULT, DeploymentOptionUpdateType
from ray.serve.config import (
AcceleratorConfig,
AggregationFunction,
AutoscalingConfig,
DeploymentActorConfig,
Expand Down Expand Up @@ -190,6 +191,10 @@ class DeploymentConfig(BaseModel):
update_type=DeploymentOptionUpdateType.NeedsActorReconfigure,
)

accelerator_config: Optional[AcceleratorConfig] = Field(
default=None, update_type=DeploymentOptionUpdateType.HeavyWeight
Comment thread
ryanaoleary marked this conversation as resolved.
)
Comment thread
ryanaoleary marked this conversation as resolved.

# This flag is used to let replica know they are deployed from
# a different language.
is_cross_language: bool = False
Expand Down Expand Up @@ -322,6 +327,8 @@ def needs_pickle(self):

def to_proto(self):
data = self.model_dump()
if data.get("accelerator_config") is not None:
data["accelerator_config"] = cloudpickle.dumps(self.accelerator_config)
if data.get("user_config") is not None:
if self.needs_pickle():
data["user_config"] = cloudpickle.dumps(data["user_config"])
Expand Down Expand Up @@ -429,6 +436,11 @@ def from_proto(cls, proto: DeploymentConfigProto):
data["is_cross_language"] if "is_cross_language" in data else False
)
needs_pickle = _needs_pickle(deployment_language, is_cross_language)
if "accelerator_config" in data:
if data["accelerator_config"] != b"":
data["accelerator_config"] = cloudpickle.loads(proto.accelerator_config)
else:
data["accelerator_config"] = None
if "user_config" in data:
if data["user_config"] != b"":
if needs_pickle:
Expand Down
2 changes: 2 additions & 0 deletions python/ray/serve/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
#: Ray namespace used for all Serve actors
SERVE_NAMESPACE = "serve"

ACCELERATOR_KIND_TPU = "tpu"

DEFAULT_HTTP_HOST = os.environ.get("RAY_SERVE_DEFAULT_HTTP_HOST")

#: HTTP Port
Expand Down
27 changes: 10 additions & 17 deletions python/ray/serve/_private/default_impl.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio
from typing import Callable, Optional, Tuple
from typing import Callable, Optional, Tuple, Union

import ray
from ray._common.constants import HEAD_NODE_RESOURCE_NAME
Expand All @@ -9,7 +9,6 @@
DefaultClusterNodeInfoCache,
)
from ray.serve._private.common import (
CreatePlacementGroupRequest,
DeploymentHandleSource,
DeploymentID,
EndpointInfo,
Expand All @@ -33,6 +32,11 @@
from ray.serve._private.event_loop_monitoring import EventLoopMonitor
from ray.serve._private.grpc_util import gRPCGenericServer
from ray.serve._private.handle_options import DynamicHandleOptions, InitHandleOptions
from ray.serve._private.placement_group_utils import (
CreatePlacementGroupRequest,
ReplicaPlacementGroup,
_create_replica_placement_group,
)
from ray.serve._private.router import CurrentLoopRouter, Router, SingletonThreadRouter
from ray.serve._private.utils import (
asyncio_grpc_exception_handler,
Expand All @@ -56,20 +60,9 @@ def create_cluster_node_info_cache(gcs_client: GcsClient) -> ClusterNodeInfoCach
return DefaultClusterNodeInfoCache(gcs_client)


CreatePlacementGroupFn = Callable[[CreatePlacementGroupRequest], PlacementGroup]


def _default_create_placement_group(
request: CreatePlacementGroupRequest,
) -> PlacementGroup:
return ray.util.placement_group(
request.bundles,
request.strategy,
_soft_target_node_id=request.target_node_id,
name=request.name,
lifetime="detached",
bundle_label_selector=request.bundle_label_selector,
)
CreatePlacementGroupFn = Callable[
[CreatePlacementGroupRequest], Union[PlacementGroup, ReplicaPlacementGroup]
]


def create_deployment_scheduler(
Expand All @@ -82,7 +75,7 @@ def create_deployment_scheduler(
cluster_node_info_cache,
head_node_id,
create_placement_group_fn=create_placement_group_fn_override
or _default_create_placement_group,
or _create_replica_placement_group,
)


Expand Down
70 changes: 61 additions & 9 deletions python/ray/serve/_private/deployment_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,24 @@
from ray.serve._private.cluster_node_info_cache import ClusterNodeInfoCache
from ray.serve._private.common import (
GANG_PG_NAME_PREFIX,
CreatePlacementGroupRequest,
DeploymentID,
GangPlacementGroupRequest,
GangReservationResult,
ReplicaID,
)
from ray.serve._private.config import ReplicaConfig
from ray.serve._private.constants import (
ACCELERATOR_KIND_TPU,
RAY_SERVE_HIGH_PRIORITY_CUSTOM_RESOURCES,
RAY_SERVE_USE_COMPACT_SCHEDULING_STRATEGY,
RAY_SERVE_USE_PACK_SCHEDULING_STRATEGY,
SERVE_LOGGER_NAME,
)
from ray.serve._private.placement_group_utils import (
CreatePlacementGroupRequest,
ReplicaPlacementGroup,
)
from ray.serve.config import AcceleratorConfig
from ray.util.placement_group import PlacementGroup
from ray.util.scheduling_strategies import (
LabelMatchExpressionsT,
Expand Down Expand Up @@ -198,6 +203,7 @@ class ReplicaSchedulingRequest:
placement_group_strategy: Optional[str] = None
placement_group_bundle_label_selector: Optional[List[Dict[str, str]]] = None
placement_group_fallback_strategy: Optional[List[Dict[str, Any]]] = None
accelerator_config: Optional[AcceleratorConfig] = None
max_replicas_per_node: Optional[int] = None
# Gang scheduling fields -- if set, replica should be scheduled on
# the reserved gang placement group at the specified bundle index.
Expand Down Expand Up @@ -636,12 +642,16 @@ def _schedule_replica(
replica_id = scheduling_request.replica_id
deployment_id = replica_id.deployment_id
placement_group = None
replica_pg = None

scheduling_strategy = default_scheduling_strategy

if scheduling_request.gang_placement_group is not None:
# Gang scheduling -- use the reserved gang placement group
# Gang scheduling -- use the reserved gang placement group.
# Gang PGs are always bare PlacementGroup objects; accelerator
# deployments bypass gang scheduling entirely (see deployment_state).
placement_group = scheduling_request.gang_placement_group

scheduling_strategy = PlacementGroupSchedulingStrategy(
placement_group=placement_group,
placement_group_bundle_index=scheduling_request.gang_pg_index,
Expand All @@ -650,22 +660,42 @@ def _schedule_replica(
# TODO (jeffreywang): Add support for target labels and node affinity
target_labels = None
target_node_id = None
elif scheduling_request.placement_group_bundles is not None:
elif (
scheduling_request.placement_group_bundles is not None
or scheduling_request.accelerator_config is not None
):
# Per-replica PG path. Entered when either:
# - The user provided explicit bundles (CPU/GPU deployments), or
# - The user provided an accelerator_config that derives its own
# bundles from structured fields (e.g. TPUAcceleratorConfig
# derives bundles from topology via slice_placement_group).
Comment thread
cursor[bot] marked this conversation as resolved.
placement_group_strategy = (
scheduling_request.placement_group_strategy
if scheduling_request.placement_group_strategy
else "PACK"
else (
"SPREAD"
if getattr(scheduling_request.accelerator_config, "kind", None)
== ACCELERATOR_KIND_TPU
else "PACK"
)
)
try:
pg = self._create_placement_group_fn(
pg_result = self._create_placement_group_fn(
CreatePlacementGroupRequest(
bundles=scheduling_request.placement_group_bundles,
strategy=placement_group_strategy,
target_node_id=target_node_id,
name=scheduling_request.actor_options["name"],
bundle_label_selector=scheduling_request.placement_group_bundle_label_selector,
)
accelerator_config=scheduling_request.accelerator_config,
),
Comment thread
ryanaoleary marked this conversation as resolved.
Comment thread
ryanaoleary marked this conversation as resolved.
)
if isinstance(pg_result, ReplicaPlacementGroup):
placement_group = pg_result.placement_group
replica_pg = pg_result
else:
placement_group = pg_result
replica_pg = None
except Exception:
# We add a defensive exception here, so the controller can
# make progress even if the placement group isn't created.
Expand All @@ -678,7 +708,7 @@ def _schedule_replica(
)
return False
scheduling_strategy = PlacementGroupSchedulingStrategy(
placement_group=pg,
placement_group=placement_group,
placement_group_capture_child_tasks=True,
)
target_labels = None
Expand Down Expand Up @@ -720,6 +750,14 @@ def _schedule_replica(
scheduling_request.status = (
ReplicaSchedulingRequestStatus.ACTOR_CREATION_FAILED
)

# Only clean up single-replica PGs. Gang PGs are managed elsewhere.
if scheduling_request.gang_placement_group is None:
if replica_pg is not None:
replica_pg.shutdown()
elif placement_group is not None:
ray.util.remove_placement_group(placement_group)
Comment thread
ryanaoleary marked this conversation as resolved.

return False

del self._pending_replicas[deployment_id][replica_id]
Expand All @@ -731,7 +769,11 @@ def _schedule_replica(
placement_group = scheduling_strategy.placement_group

scheduling_request.status = ReplicaSchedulingRequestStatus.SUCCEEDED
scheduling_request.on_scheduled(actor_handle, placement_group=placement_group)
Comment thread
ryanaoleary marked this conversation as resolved.
scheduling_request.on_scheduled(
actor_handle,
placement_group=placement_group,
placement_group_manager=replica_pg,
)
Comment thread
cursor[bot] marked this conversation as resolved.
return True

@abstractmethod
Expand Down Expand Up @@ -859,7 +901,7 @@ def _prepare_gangs_for_deployment(
)

try:
pg = self._create_placement_group_fn(
pg_result = self._create_placement_group_fn(
CreatePlacementGroupRequest(
bundles=bundles,
strategy=request.gang_placement_strategy,
Expand All @@ -869,6 +911,16 @@ def _prepare_gangs_for_deployment(
fallback_strategy=fallback_strategy,
)
)

# Unwrap the ReplicaPlacementGroup to get the underyling PlacementGroup.
# Gang scheduling currently does not support accelerator_config (since it's
# handled by the specific accelerator backend), so we don't need the
# wrapper.
if isinstance(pg_result, ReplicaPlacementGroup):
pg = pg_result.placement_group
else:
pg = pg_result

gang_pgs.append(pg)
gang_ids.append(gang_id)
gang_pg_names.append(pg_name)
Expand Down
53 changes: 46 additions & 7 deletions python/ray/serve/_private/deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
)
from ray.serve._private.exceptions import DeploymentIsBeingDeletedError
from ray.serve._private.long_poll import LongPollHost, LongPollNamespace
from ray.serve._private.placement_group_utils import ReplicaPlacementGroup
from ray.serve._private.storage.kv_store import KVStoreBase
from ray.serve._private.usage import ServeUsageTag
from ray.serve._private.utils import (
Expand Down Expand Up @@ -779,6 +780,9 @@ def __init__(
self._last_record_routing_stats_time: float = 0.0
self._has_user_routing_stats_method: bool = False
self._ingress: bool = False
self._replica_pg: Optional["ReplicaPlacementGroup"] = None
self._gang_placement_group = None
self._gang_pg_index = None

# Outbound deployments polling state
self._outbound_deployments: Optional[List[DeploymentID]] = None
Expand Down Expand Up @@ -1152,6 +1156,7 @@ def start(
placement_group_fallback_strategy=(
deployment_info.replica_config.placement_group_fallback_strategy
),
accelerator_config=deployment_info.deployment_config.accelerator_config,
max_replicas_per_node=(
deployment_info.replica_config.max_replicas_per_node
),
Expand All @@ -1164,9 +1169,11 @@ def on_scheduled(
self,
actor_handle: ActorHandle,
placement_group: Optional[PlacementGroup] = None,
placement_group_manager: Optional["ReplicaPlacementGroup"] = None,
):
self._actor_handle = actor_handle
self._placement_group = placement_group
self._replica_pg = placement_group_manager

if self._is_cross_language:
self._actor_handle = JavaActorHandleProxy(self._actor_handle)
Expand Down Expand Up @@ -1519,15 +1526,42 @@ def check_stopped(self) -> bool:
finally:
# Remove the placement group both if the actor has already been deleted or
# it was just killed above.
if stopped and self._placement_group is not None:
if stopped:
try:
ray.util.remove_placement_group(self._placement_group)
except ValueError:
# ValueError thrown from ray.util.remove_placement_group means the
# placement group has already been removed.
logger.debug(
f"Placement group for {self._replica_id} was already removed."
# Teardown shared gang placement group. The first replica in the
# gang to stop deletes it. Subsequent replicas catch ValueError.
if self._gang_placement_group is not None:
try:
ray.util.remove_placement_group(self._gang_placement_group)
except ValueError:
# Already removed by another replica in this gang.
logger.debug(
f"Gang placement group for {self._replica_id} was already removed."
)

# Replicas with accelerator/wrapper PGs handle their own shutdown.
elif self._replica_pg is not None:
self._replica_pg.shutdown()

# Standard single-replica placement groups.
elif self._placement_group is not None:
try:
ray.util.remove_placement_group(self._placement_group)
except ValueError:
# ValueError thrown from ray.util.remove_placement_group means the
# placement group has already been removed.
logger.debug(
f"Placement group for {self._replica_id} was already removed."
)
except Exception:
logger.exception(
f"Unexpected error shutting down placement groups for {self._replica_id}."
)
finally:
# Clear references to prevent memory leaks and dangling state.
self._gang_placement_group = None
self._replica_pg = None
self._placement_group = None
Comment thread
ryanaoleary marked this conversation as resolved.

return stopped

Expand Down Expand Up @@ -3240,6 +3274,11 @@ def get_gang_config(self):
@property
def _is_gang_deployment(self) -> bool:
"""Returns True if this deployment uses gang scheduling."""
if (
self._target_state is not None
and self._target_state.info.deployment_config.accelerator_config is not None
):
return False
return self.get_gang_config() is not None

def _get_target_replica_delta(self) -> int:
Expand Down
Loading
Loading