diff --git a/python/ray/serve/_private/common.py b/python/ray/serve/_private/common.py index 29cd797791a3..9b18e1bca057 100644 --- a/python/ray/serve/_private/common.py +++ b/python/ray/serve/_private/common.py @@ -891,17 +891,6 @@ class ReplicaQueueLengthInfo: num_ongoing_requests: int -@dataclass(frozen=True) -class CreatePlacementGroupRequest: - bundles: List[Dict[str, float]] - strategy: str - target_node_id: str - name: str - runtime_env: Optional[str] = None - bundle_label_selector: Optional[List[Dict[str, str]]] = None - fallback_strategy: Optional[List[Dict[str, Any]]] = None - - @dataclass class GangPlacementGroupRequest: """Request to reserve gang placement groups for a deployment.""" diff --git a/python/ray/serve/_private/config.py b/python/ray/serve/_private/config.py index f226809b539b..7e38f81e13cc 100644 --- a/python/ray/serve/_private/config.py +++ b/python/ray/serve/_private/config.py @@ -32,6 +32,7 @@ ) from ray.serve._private.utils import DEFAULT, DeploymentOptionUpdateType from ray.serve.config import ( + AcceleratorConfig, AggregationFunction, AutoscalingConfig, DeploymentActorConfig, @@ -190,6 +191,10 @@ class DeploymentConfig(BaseModel): update_type=DeploymentOptionUpdateType.NeedsActorReconfigure, ) + accelerator_config: Optional[AcceleratorConfig] = Field( + default=None, update_type=DeploymentOptionUpdateType.HeavyWeight + ) + # This flag is used to let replica know they are deployed from # a different language. is_cross_language: bool = False @@ -322,6 +327,8 @@ def needs_pickle(self): def to_proto(self): data = self.model_dump() + if data.get("accelerator_config") is not None: + data["accelerator_config"] = cloudpickle.dumps(self.accelerator_config) if data.get("user_config") is not None: if self.needs_pickle(): data["user_config"] = cloudpickle.dumps(data["user_config"]) @@ -429,6 +436,11 @@ def from_proto(cls, proto: DeploymentConfigProto): data["is_cross_language"] if "is_cross_language" in data else False ) needs_pickle = _needs_pickle(deployment_language, is_cross_language) + if "accelerator_config" in data: + if data["accelerator_config"] != b"": + data["accelerator_config"] = cloudpickle.loads(proto.accelerator_config) + else: + data["accelerator_config"] = None if "user_config" in data: if data["user_config"] != b"": if needs_pickle: diff --git a/python/ray/serve/_private/constants.py b/python/ray/serve/_private/constants.py index a89326f06bf3..3c5dbe816643 100644 --- a/python/ray/serve/_private/constants.py +++ b/python/ray/serve/_private/constants.py @@ -46,6 +46,8 @@ #: Ray namespace used for all Serve actors SERVE_NAMESPACE = "serve" +ACCELERATOR_KIND_TPU = "tpu" + DEFAULT_HTTP_HOST = os.environ.get("RAY_SERVE_DEFAULT_HTTP_HOST") #: HTTP Port diff --git a/python/ray/serve/_private/default_impl.py b/python/ray/serve/_private/default_impl.py index a0710553a0b4..b741e8187481 100644 --- a/python/ray/serve/_private/default_impl.py +++ b/python/ray/serve/_private/default_impl.py @@ -1,5 +1,5 @@ import asyncio -from typing import Callable, Optional, Tuple +from typing import Callable, Optional, Tuple, Union import ray from ray._common.constants import HEAD_NODE_RESOURCE_NAME @@ -9,7 +9,6 @@ DefaultClusterNodeInfoCache, ) from ray.serve._private.common import ( - CreatePlacementGroupRequest, DeploymentHandleSource, DeploymentID, EndpointInfo, @@ -33,6 +32,11 @@ from ray.serve._private.event_loop_monitoring import EventLoopMonitor from ray.serve._private.grpc_util import gRPCGenericServer from ray.serve._private.handle_options import DynamicHandleOptions, InitHandleOptions +from ray.serve._private.placement_group_utils import ( + CreatePlacementGroupRequest, + ReplicaPlacementGroup, + _create_replica_placement_group, +) from ray.serve._private.router import CurrentLoopRouter, Router, SingletonThreadRouter from ray.serve._private.utils import ( asyncio_grpc_exception_handler, @@ -56,20 +60,9 @@ def create_cluster_node_info_cache(gcs_client: GcsClient) -> ClusterNodeInfoCach return DefaultClusterNodeInfoCache(gcs_client) -CreatePlacementGroupFn = Callable[[CreatePlacementGroupRequest], PlacementGroup] - - -def _default_create_placement_group( - request: CreatePlacementGroupRequest, -) -> PlacementGroup: - return ray.util.placement_group( - request.bundles, - request.strategy, - _soft_target_node_id=request.target_node_id, - name=request.name, - lifetime="detached", - bundle_label_selector=request.bundle_label_selector, - ) +CreatePlacementGroupFn = Callable[ + [CreatePlacementGroupRequest], Union[PlacementGroup, ReplicaPlacementGroup] +] def create_deployment_scheduler( @@ -82,7 +75,7 @@ def create_deployment_scheduler( cluster_node_info_cache, head_node_id, create_placement_group_fn=create_placement_group_fn_override - or _default_create_placement_group, + or _create_replica_placement_group, ) diff --git a/python/ray/serve/_private/deployment_scheduler.py b/python/ray/serve/_private/deployment_scheduler.py index 8cd17ef5d175..2106be1cfdc9 100644 --- a/python/ray/serve/_private/deployment_scheduler.py +++ b/python/ray/serve/_private/deployment_scheduler.py @@ -14,7 +14,6 @@ from ray.serve._private.cluster_node_info_cache import ClusterNodeInfoCache from ray.serve._private.common import ( GANG_PG_NAME_PREFIX, - CreatePlacementGroupRequest, DeploymentID, GangPlacementGroupRequest, GangReservationResult, @@ -22,11 +21,17 @@ ) from ray.serve._private.config import ReplicaConfig from ray.serve._private.constants import ( + ACCELERATOR_KIND_TPU, RAY_SERVE_HIGH_PRIORITY_CUSTOM_RESOURCES, RAY_SERVE_USE_COMPACT_SCHEDULING_STRATEGY, RAY_SERVE_USE_PACK_SCHEDULING_STRATEGY, SERVE_LOGGER_NAME, ) +from ray.serve._private.placement_group_utils import ( + CreatePlacementGroupRequest, + ReplicaPlacementGroup, +) +from ray.serve.config import AcceleratorConfig from ray.util.placement_group import PlacementGroup from ray.util.scheduling_strategies import ( LabelMatchExpressionsT, @@ -198,6 +203,7 @@ class ReplicaSchedulingRequest: placement_group_strategy: Optional[str] = None placement_group_bundle_label_selector: Optional[List[Dict[str, str]]] = None placement_group_fallback_strategy: Optional[List[Dict[str, Any]]] = None + accelerator_config: Optional[AcceleratorConfig] = None max_replicas_per_node: Optional[int] = None # Gang scheduling fields -- if set, replica should be scheduled on # the reserved gang placement group at the specified bundle index. @@ -636,12 +642,16 @@ def _schedule_replica( replica_id = scheduling_request.replica_id deployment_id = replica_id.deployment_id placement_group = None + replica_pg = None scheduling_strategy = default_scheduling_strategy if scheduling_request.gang_placement_group is not None: - # Gang scheduling -- use the reserved gang placement group + # Gang scheduling -- use the reserved gang placement group. + # Gang PGs are always bare PlacementGroup objects; accelerator + # deployments bypass gang scheduling entirely (see deployment_state). placement_group = scheduling_request.gang_placement_group + scheduling_strategy = PlacementGroupSchedulingStrategy( placement_group=placement_group, placement_group_bundle_index=scheduling_request.gang_pg_index, @@ -650,22 +660,42 @@ def _schedule_replica( # TODO (jeffreywang): Add support for target labels and node affinity target_labels = None target_node_id = None - elif scheduling_request.placement_group_bundles is not None: + elif ( + scheduling_request.placement_group_bundles is not None + or scheduling_request.accelerator_config is not None + ): + # Per-replica PG path. Entered when either: + # - The user provided explicit bundles (CPU/GPU deployments), or + # - The user provided an accelerator_config that derives its own + # bundles from structured fields (e.g. TPUAcceleratorConfig + # derives bundles from topology via slice_placement_group). placement_group_strategy = ( scheduling_request.placement_group_strategy if scheduling_request.placement_group_strategy - else "PACK" + else ( + "SPREAD" + if getattr(scheduling_request.accelerator_config, "kind", None) + == ACCELERATOR_KIND_TPU + else "PACK" + ) ) try: - pg = self._create_placement_group_fn( + pg_result = self._create_placement_group_fn( CreatePlacementGroupRequest( bundles=scheduling_request.placement_group_bundles, strategy=placement_group_strategy, target_node_id=target_node_id, name=scheduling_request.actor_options["name"], bundle_label_selector=scheduling_request.placement_group_bundle_label_selector, - ) + accelerator_config=scheduling_request.accelerator_config, + ), ) + if isinstance(pg_result, ReplicaPlacementGroup): + placement_group = pg_result.placement_group + replica_pg = pg_result + else: + placement_group = pg_result + replica_pg = None except Exception: # We add a defensive exception here, so the controller can # make progress even if the placement group isn't created. @@ -678,7 +708,7 @@ def _schedule_replica( ) return False scheduling_strategy = PlacementGroupSchedulingStrategy( - placement_group=pg, + placement_group=placement_group, placement_group_capture_child_tasks=True, ) target_labels = None @@ -720,6 +750,14 @@ def _schedule_replica( scheduling_request.status = ( ReplicaSchedulingRequestStatus.ACTOR_CREATION_FAILED ) + + # Only clean up single-replica PGs. Gang PGs are managed elsewhere. + if scheduling_request.gang_placement_group is None: + if replica_pg is not None: + replica_pg.shutdown() + elif placement_group is not None: + ray.util.remove_placement_group(placement_group) + return False del self._pending_replicas[deployment_id][replica_id] @@ -731,7 +769,11 @@ def _schedule_replica( placement_group = scheduling_strategy.placement_group scheduling_request.status = ReplicaSchedulingRequestStatus.SUCCEEDED - scheduling_request.on_scheduled(actor_handle, placement_group=placement_group) + scheduling_request.on_scheduled( + actor_handle, + placement_group=placement_group, + placement_group_manager=replica_pg, + ) return True @abstractmethod @@ -859,7 +901,7 @@ def _prepare_gangs_for_deployment( ) try: - pg = self._create_placement_group_fn( + pg_result = self._create_placement_group_fn( CreatePlacementGroupRequest( bundles=bundles, strategy=request.gang_placement_strategy, @@ -869,6 +911,16 @@ def _prepare_gangs_for_deployment( fallback_strategy=fallback_strategy, ) ) + + # Unwrap the ReplicaPlacementGroup to get the underyling PlacementGroup. + # Gang scheduling currently does not support accelerator_config (since it's + # handled by the specific accelerator backend), so we don't need the + # wrapper. + if isinstance(pg_result, ReplicaPlacementGroup): + pg = pg_result.placement_group + else: + pg = pg_result + gang_pgs.append(pg) gang_ids.append(gang_id) gang_pg_names.append(pg_name) diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index 856e8183ebe0..262524da94b3 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -76,6 +76,7 @@ ) from ray.serve._private.exceptions import DeploymentIsBeingDeletedError from ray.serve._private.long_poll import LongPollHost, LongPollNamespace +from ray.serve._private.placement_group_utils import ReplicaPlacementGroup from ray.serve._private.storage.kv_store import KVStoreBase from ray.serve._private.usage import ServeUsageTag from ray.serve._private.utils import ( @@ -779,6 +780,9 @@ def __init__( self._last_record_routing_stats_time: float = 0.0 self._has_user_routing_stats_method: bool = False self._ingress: bool = False + self._replica_pg: Optional["ReplicaPlacementGroup"] = None + self._gang_placement_group = None + self._gang_pg_index = None # Outbound deployments polling state self._outbound_deployments: Optional[List[DeploymentID]] = None @@ -1152,6 +1156,7 @@ def start( placement_group_fallback_strategy=( deployment_info.replica_config.placement_group_fallback_strategy ), + accelerator_config=deployment_info.deployment_config.accelerator_config, max_replicas_per_node=( deployment_info.replica_config.max_replicas_per_node ), @@ -1164,9 +1169,11 @@ def on_scheduled( self, actor_handle: ActorHandle, placement_group: Optional[PlacementGroup] = None, + placement_group_manager: Optional["ReplicaPlacementGroup"] = None, ): self._actor_handle = actor_handle self._placement_group = placement_group + self._replica_pg = placement_group_manager if self._is_cross_language: self._actor_handle = JavaActorHandleProxy(self._actor_handle) @@ -1519,15 +1526,42 @@ def check_stopped(self) -> bool: finally: # Remove the placement group both if the actor has already been deleted or # it was just killed above. - if stopped and self._placement_group is not None: + if stopped: try: - ray.util.remove_placement_group(self._placement_group) - except ValueError: - # ValueError thrown from ray.util.remove_placement_group means the - # placement group has already been removed. - logger.debug( - f"Placement group for {self._replica_id} was already removed." + # Teardown shared gang placement group. The first replica in the + # gang to stop deletes it. Subsequent replicas catch ValueError. + if self._gang_placement_group is not None: + try: + ray.util.remove_placement_group(self._gang_placement_group) + except ValueError: + # Already removed by another replica in this gang. + logger.debug( + f"Gang placement group for {self._replica_id} was already removed." + ) + + # Replicas with accelerator/wrapper PGs handle their own shutdown. + elif self._replica_pg is not None: + self._replica_pg.shutdown() + + # Standard single-replica placement groups. + elif self._placement_group is not None: + try: + ray.util.remove_placement_group(self._placement_group) + except ValueError: + # ValueError thrown from ray.util.remove_placement_group means the + # placement group has already been removed. + logger.debug( + f"Placement group for {self._replica_id} was already removed." + ) + except Exception: + logger.exception( + f"Unexpected error shutting down placement groups for {self._replica_id}." ) + finally: + # Clear references to prevent memory leaks and dangling state. + self._gang_placement_group = None + self._replica_pg = None + self._placement_group = None return stopped @@ -3240,6 +3274,11 @@ def get_gang_config(self): @property def _is_gang_deployment(self) -> bool: """Returns True if this deployment uses gang scheduling.""" + if ( + self._target_state is not None + and self._target_state.info.deployment_config.accelerator_config is not None + ): + return False return self.get_gang_config() is not None def _get_target_replica_delta(self) -> int: diff --git a/python/ray/serve/_private/placement_group_utils.py b/python/ray/serve/_private/placement_group_utils.py new file mode 100644 index 000000000000..dbfd86342c26 --- /dev/null +++ b/python/ray/serve/_private/placement_group_utils.py @@ -0,0 +1,123 @@ +import logging +from dataclasses import dataclass +from typing import Any, Dict, List, Optional + +import ray +from ray.serve._private.constants import ACCELERATOR_KIND_TPU, SERVE_LOGGER_NAME +from ray.util.placement_group import PlacementGroup, remove_placement_group +from ray.util.tpu import SlicePlacementGroup, slice_placement_group + +logger = logging.getLogger(SERVE_LOGGER_NAME) + +# NOTE: Please read carefully before changing! +# +# Similar to `default_impl.py`, methods like `_default_create_placement_group` are +# common extension points and should be treated as a Developer API. + + +@dataclass(frozen=True) +class CreatePlacementGroupRequest: + """Internal request for creating a per-replica placement group. + + Either ``bundles`` or ``accelerator_config`` must be provided: + - For plain CPU/GPU deployments, the caller provides ``bundles`` and the + default path creates a standard PlacementGroup. + - For accelerator deployments (e.g. TPU), the caller provides + ``accelerator_config`` and the dispatch derives bundles from the + structured config (e.g. TPU topology -> per-host bundles). + """ + + bundles: Optional[List[Dict[str, float]]] = None + strategy: str = "PACK" + target_node_id: Optional[str] = None + name: str = "" + runtime_env: Optional[str] = None + bundle_label_selector: Optional[List[Dict[str, str]]] = None + fallback_strategy: Optional[List[Dict[str, Any]]] = None + accelerator_config: Optional[Any] = None + + +@dataclass +class ReplicaPlacementGroup: + """Internal Serve handle for a replica's placement group(s). + + Wraps the worker PG and any accelerator-specific cleanup hooks so the + controller doesn't need to know whether the underlying request was a + plain CPU/GPU PG or a TPU slice reservation. + """ + + placement_group: Optional[PlacementGroup] + _slice_pg: Optional[SlicePlacementGroup] = None + + def shutdown(self) -> None: + """Tear down the replica's PG(s). Idempotent.""" + if self._slice_pg is not None: + self._slice_pg.shutdown() + self._slice_pg = None + self.placement_group = None + elif self.placement_group is not None: + try: + remove_placement_group(self.placement_group) + except Exception: + logger.exception("Failed to remove placement group.") + finally: + self.placement_group = None + + +def _default_create_placement_group( + request: CreatePlacementGroupRequest, +) -> PlacementGroup: + return ray.util.placement_group( + request.bundles, + request.strategy, + _soft_target_node_id=request.target_node_id, + name=request.name, + lifetime="detached", + bundle_label_selector=request.bundle_label_selector, + ) + + +def _create_replica_placement_group( + request: CreatePlacementGroupRequest, +) -> ReplicaPlacementGroup: + """Internal entry point that supports accelerator-specific dispatch. + + Dispatches on ``request.accelerator_config``: + - TPUAcceleratorConfig: derive bundles from topology via + slice_placement_group; ``request.bundles`` is ignored. + - None: use ``request.bundles`` to create a standard PlacementGroup. + + Raises ValueError if neither bundles nor a recognized accelerator + config is provided - this catches users setting an unrecognized + accelerator_config type without explicit bundles, which would + otherwise schedule with no PG at all. + """ + accelerator_config = request.accelerator_config + + if getattr(accelerator_config, "kind", None) == ACCELERATOR_KIND_TPU: + slice_pg = slice_placement_group( + topology=accelerator_config.topology, + accelerator_version=accelerator_config.accelerator_version, + num_slices=accelerator_config.num_slices, + chips_per_vm=accelerator_config.chips_per_vm, + resources_per_bundle=accelerator_config.resources_per_bundle, + strategy=request.strategy, + name=request.name, + lifetime="detached", + bundle_label_selector=request.bundle_label_selector, + ) + return ReplicaPlacementGroup( + placement_group=slice_pg.placement_group, + _slice_pg=slice_pg, + ) + + if request.bundles is None: + raise ValueError( + "CreatePlacementGroupRequest requires either non-None bundles " + "or a recognized accelerator_config. Got accelerator_config=" + f"{type(accelerator_config).__name__ if accelerator_config else None}, " + "bundles=None." + ) + + pg = _default_create_placement_group(request) + return ReplicaPlacementGroup(placement_group=pg) diff --git a/python/ray/serve/_private/test_utils.py b/python/ray/serve/_private/test_utils.py index 3f2686ad7112..56f5f9322020 100644 --- a/python/ray/serve/_private/test_utils.py +++ b/python/ray/serve/_private/test_utils.py @@ -26,7 +26,6 @@ from ray.actor import ActorHandle from ray.serve._private.client import ServeControllerClient from ray.serve._private.common import ( - CreatePlacementGroupRequest, DeploymentID, DeploymentStatus, ReplicaID, @@ -45,6 +44,7 @@ ReplicaStartupStatus, ReplicaState, ) +from ray.serve._private.placement_group_utils import CreatePlacementGroupRequest from ray.serve._private.proxy import DRAINING_MESSAGE from ray.serve._private.replica_result import ReplicaResult from ray.serve._private.request_router import ( diff --git a/python/ray/serve/_private/version.py b/python/ray/serve/_private/version.py index a0aea7a2a63d..2171006c4778 100644 --- a/python/ray/serve/_private/version.py +++ b/python/ray/serve/_private/version.py @@ -79,6 +79,7 @@ def requires_actor_restart(self, new_version): or self.max_replicas_per_node != new_version.max_replicas_per_node or self.gang_scheduling_config_hash != new_version.gang_scheduling_config_hash + or self.accelerator_config_hash != new_version.accelerator_config_hash ) def requires_actor_reconfigure(self, new_version): @@ -124,6 +125,12 @@ def compute_hashes(self): else {} ) self.gang_scheduling_config_hash = crc32(serialized_gang_scheduling_config) + serialized_accelerator_config = ( + self.deployment_config.accelerator_config.model_dump_json().encode("utf-8") + if self.deployment_config.accelerator_config is not None + else b"" + ) + self.accelerator_config_hash = crc32(serialized_accelerator_config) # Include app-level route prefix in the version hashes so changing # it triggers an in-place reconfigure of running replicas. serialized_route_prefix = _serialize(self.route_prefix) @@ -152,6 +159,7 @@ def compute_hashes(self): ] ) + serialized_gang_scheduling_config + + serialized_accelerator_config ) def to_proto(self) -> bytes: diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index aa701df51164..bdf93b40839d 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -40,6 +40,7 @@ wait_for_interrupt, ) from ray.serve.config import ( + AcceleratorConfig, AutoscalingConfig, ControllerOptions, DeploymentActorConfig, @@ -47,6 +48,7 @@ HTTPOptions, ProxyLocation, RequestRouterConfig, + _resolve_accelerator_config, gRPCOptions, ) from ray.serve.context import ( @@ -484,6 +486,7 @@ def deployment( user_config: Default[Optional[Any]] = DEFAULT.VALUE, max_ongoing_requests: Default[int] = DEFAULT.VALUE, max_queued_requests: Default[int] = DEFAULT.VALUE, + accelerator_config: Default[Union[Dict, AcceleratorConfig, None]] = DEFAULT.VALUE, autoscaling_config: Default[Union[Dict, AutoscalingConfig, None]] = DEFAULT.VALUE, graceful_shutdown_wait_loop_s: Default[float] = DEFAULT.VALUE, graceful_shutdown_timeout_s: Default[float] = DEFAULT.VALUE, @@ -554,6 +557,9 @@ class MyDeployment: Once this limit is reached, subsequent requests will raise a BackPressureError (for handles) or return an HTTP 503 status code (for HTTP requests). Defaults to -1 (no limit). + accelerator_config: Configuration for hardware accelerators, such as TPUs. + Can be passed as an unstructured dictionary or a structured `AcceleratorConfig` + subclass (e.g. `TPUAcceleratorConfig`). See `AcceleratorConfig` for options. autoscaling_config: Parameters to configure autoscaling behavior. If this is set, `num_replicas` should be "auto" or not set. graceful_shutdown_wait_loop_s: Duration that replicas wait until there is @@ -654,11 +660,32 @@ class MyDeployment: if isinstance(logging_config, LoggingConfig): logging_config = logging_config.model_dump() + if accelerator_config is not DEFAULT.VALUE and accelerator_config is not None: + accelerator_config = _resolve_accelerator_config(accelerator_config) + + if ( + gang_scheduling_config is not DEFAULT.VALUE + and gang_scheduling_config is not None + ): + # TODO(ryanaoleary@): Revisit this mutual exclusivity restriction once + # Data Parallel (DP) attention or more complex multi-slice gang + # scheduling is supported for TPUs. + # + # The only supported accelerator_config currently is for TPU, which utilizes + # SlicePlacementGroup internally for atomic scheduling of SPMD workers. This + # check can be loosened if additional accelerator configs are added in the + # future that don't manage their own gang scheduling. + raise ValueError( + "Cannot specify both `accelerator_config` and `gang_scheduling_config`. " + "Accelerator configurations automatically manage their own gang scheduling." + ) + deployment_config = DeploymentConfig.from_default( num_replicas=num_replicas if num_replicas is not None else 1, user_config=user_config, max_ongoing_requests=max_ongoing_requests, max_queued_requests=max_queued_requests, + accelerator_config=accelerator_config, autoscaling_config=autoscaling_config, graceful_shutdown_wait_loop_s=graceful_shutdown_wait_loop_s, graceful_shutdown_timeout_s=graceful_shutdown_timeout_s, diff --git a/python/ray/serve/config.py b/python/ray/serve/config.py index 75609c4757be..e61261d8acdb 100644 --- a/python/ray/serve/config.py +++ b/python/ray/serve/config.py @@ -4,7 +4,7 @@ import warnings from enum import Enum from functools import cached_property -from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Union from pydantic import ( BaseModel, @@ -27,6 +27,7 @@ # Import types needed for AutoscalingContext from ray.serve._private.common import DeploymentID, ReplicaID, TimeSeries from ray.serve._private.constants import ( + ACCELERATOR_KIND_TPU, DEFAULT_AUTOSCALING_POLICY_NAME, DEFAULT_GRPC_PORT, DEFAULT_HTTP_HOST, @@ -709,6 +710,68 @@ def get_target_ongoing_requests(self) -> PositiveFloat: return self.target_ongoing_requests +@PublicAPI(stability="alpha") +class AcceleratorConfig(BaseModel): + """Base class for structured accelerator configurations. + + Use a concrete subclass — e.g. :class:`TPUAcceleratorConfig` — when + declaring a deployment's accelerator requirements via + ``serve.deployment(accelerator_config=...)``. + """ + + kind: str = Field( + ..., description="Discriminator identifying the accelerator config type." + ) + + model_config = {"frozen": True, "extra": "forbid"} + + +@PublicAPI(stability="alpha") +class TPUAcceleratorConfig(AcceleratorConfig): + """TPU slice specification for a Serve deployment. + + Mirrors the parameters of :func:`ray.util.tpu.slice_placement_group`. + Ray Serve uses this config to provision a TPU slice placement group + per replica and to manage its lifecycle through the controller. + + When set on a deployment, this config drives placement-group creation + entirely. The deployment's ``placement_group_bundles`` and + ``placement_group_strategy`` fields are ignored - the bundles are + derived from ``topology`` (or optionally ``resources_per_bundle``), + and the strategy is chosen internally to honor slice gang scheduling. + + Example: + >>> from ray.serve.config import TPUAcceleratorConfig + >>> config = TPUAcceleratorConfig(topology="4x4", accelerator_version="v6e") + """ + + kind: Literal["tpu"] = ACCELERATOR_KIND_TPU + + topology: str = Field( + ..., description="TPU pod topology, e.g. '2x2', '4x4', '2x2x2'." + ) + accelerator_version: str = Field( + ..., description="TPU accelerator version, e.g. 'v4', 'v5p', 'v6e'." + ) + num_slices: int = Field(default=1, ge=1, description="Number of slices to reserve.") + chips_per_vm: Optional[int] = Field( + default=None, + description=( + "Override for chips per host. Defaults to the canonical value " + "for the given accelerator_version." + ), + ) + resources_per_bundle: Optional[Dict[str, float]] = Field( + default=None, + description=( + "Resources to include in every worker bundle. When unspecified, " + "SlicePlacementGroup defaults to one bundle per TPU host with " + "the bundle resources set to the number of chips on that host. " + "See ray.util.tpu.slice_placement_group for details." + ), + ) + + @PublicAPI(stability="stable") class ProxyLocation(str, Enum): """Config for where to run proxies to receive ingress traffic to the cluster. @@ -1165,3 +1228,19 @@ def _validate_runtime_failure_policy(cls, v): "RESTART_REPLICA policy is not yet implemented. File a GitHub issue if you need this feature." ) return v + + +def _resolve_accelerator_config( + value: Union[Dict, AcceleratorConfig, None], +) -> Optional[AcceleratorConfig]: + + if value is None or isinstance(value, AcceleratorConfig): + return value + if isinstance(value, dict): + kind = value.get("kind") + if kind == ACCELERATOR_KIND_TPU: + return TPUAcceleratorConfig(**value) + raise ValueError(f"Unknown accelerator kind {kind!r}. Supported types: 'tpu'.") + raise TypeError( + f"accelerator_config must be a dict or AcceleratorConfig, got {type(value)}." + ) diff --git a/python/ray/serve/deployment.py b/python/ray/serve/deployment.py index 783b413d1b3c..4b55aa71a42f 100644 --- a/python/ray/serve/deployment.py +++ b/python/ray/serve/deployment.py @@ -13,9 +13,11 @@ from ray.serve._private.usage import ServeUsageTag from ray.serve._private.utils import DEFAULT, Default from ray.serve.config import ( + AcceleratorConfig, AutoscalingConfig, DeploymentActorConfig, GangSchedulingConfig, + _resolve_accelerator_config, ) from ray.serve.schema import DeploymentSchema, LoggingConfig, RayActorOptionsSchema from ray.util.annotations import PublicAPI @@ -257,6 +259,9 @@ def options( deployment_actors: Default[ Optional[List[Union[Dict, DeploymentActorConfig]]] ] = DEFAULT.VALUE, + accelerator_config: Default[ + Union[Dict, AcceleratorConfig, None] + ] = DEFAULT.VALUE, ) -> "Deployment": """Return a copy of this deployment with updated options. @@ -408,6 +413,18 @@ def options( if gang_scheduling_config is not DEFAULT.VALUE: new_deployment_config.gang_scheduling_config = gang_scheduling_config + if accelerator_config is not DEFAULT.VALUE: + if accelerator_config is not None: + accelerator_config = _resolve_accelerator_config(accelerator_config) + new_deployment_config.accelerator_config = accelerator_config + + ac = new_deployment_config.accelerator_config + gc = new_deployment_config.gang_scheduling_config + if ac is not None and gc is not None: + raise ValueError( + "Cannot specify both `accelerator_config` and `gang_scheduling_config`." + ) + if deployment_actors is not DEFAULT.VALUE: new_deployment_config.deployment_actors = deployment_actors @@ -513,6 +530,7 @@ def deployment_to_schema(d: Deployment) -> DeploymentSchema: "gang_scheduling_config": d._deployment_config.gang_scheduling_config, "deployment_actors": d._deployment_config.deployment_actors, "rolling_update_percentage": d._deployment_config.rolling_update_percentage, + "accelerator_config": d._deployment_config.accelerator_config, } # Let non-user-configured options be set to defaults. If the schema @@ -577,6 +595,7 @@ def schema_to_deployment(s: DeploymentSchema) -> Deployment: gang_scheduling_config=s.gang_scheduling_config, deployment_actors=s.deployment_actors, rolling_update_percentage=s.rolling_update_percentage, + accelerator_config=s.accelerator_config, ) deployment_config.user_configured_option_names = ( s._get_user_configured_option_names() diff --git a/python/ray/serve/schema.py b/python/ray/serve/schema.py index 2b9e417550e5..3f008ae2871c 100644 --- a/python/ray/serve/schema.py +++ b/python/ray/serve/schema.py @@ -39,6 +39,7 @@ from ray.serve._private.deployment_info import DeploymentInfo from ray.serve._private.utils import DEFAULT, validate_ssl_config from ray.serve.config import ( + AcceleratorConfig, AutoscalingConfig, AutoscalingPolicy, ControllerOptions, @@ -46,6 +47,7 @@ GangSchedulingConfig, ProxyLocation, RequestRouterConfig, + _resolve_accelerator_config, ) from ray.util.annotations import PublicAPI @@ -479,6 +481,10 @@ class DeploymentSchema(BaseModel): gt=0.0, le=1.0, ) + accelerator_config: Optional[Union[Dict, AcceleratorConfig]] = Field( + default=DEFAULT.VALUE, + description="Structured accelerator configuration for the deployment replicas.", + ) @model_validator(mode="before") @classmethod @@ -503,6 +509,20 @@ def validate_num_replicas_and_autoscaling_config(cls, values): return values + @model_validator(mode="before") + @classmethod + def validate_accelerator_config(cls, values): + accelerator_config = values.get("accelerator_config", None) + if accelerator_config in [None, DEFAULT.VALUE]: + return values + + if isinstance(accelerator_config, dict): + values["accelerator_config"] = _resolve_accelerator_config( + accelerator_config + ) + + return values + @model_validator(mode="before") @classmethod def validate_gang_scheduling_config(cls, values): @@ -627,6 +647,21 @@ def validate_placement_group_strategy_and_gang_scheduling_config(self): return self + @model_validator(mode="after") + def validate_accelerator_config_and_gang_scheduling_config(self): + accelerator_config = self.accelerator_config + gang_scheduling_config = self.gang_scheduling_config + + if accelerator_config not in [ + DEFAULT.VALUE, + None, + ] and gang_scheduling_config not in [DEFAULT.VALUE, None]: + raise ValueError( + "Cannot specify both `accelerator_config` and `gang_scheduling_config`." + ) + + return self + @model_validator(mode="after") def validate_max_queued_requests(self): max_queued_requests = self.max_queued_requests @@ -690,6 +725,9 @@ def _deployment_info_to_schema(name: str, info: DeploymentInfo) -> DeploymentSch info.deployment_config.gang_scheduling_config.model_dump() ) + if info.deployment_config.accelerator_config is not None: + schema.accelerator_config = info.deployment_config.accelerator_config + if info.deployment_config.deployment_actors is not None: deployment_actors = [] for cfg in info.deployment_config.deployment_actors: diff --git a/python/ray/serve/tests/BUILD.bazel b/python/ray/serve/tests/BUILD.bazel index 3a5ac195b2d7..28d258f76174 100644 --- a/python/ray/serve/tests/BUILD.bazel +++ b/python/ray/serve/tests/BUILD.bazel @@ -101,6 +101,7 @@ py_test_module_list( py_test_module_list( size = "medium", files = [ + "test_accelerator_config.py", "test_actor_replica_wrapper.py", "test_backpressure.py", "test_backpressure_grpc.py", diff --git a/python/ray/serve/tests/test_accelerator_config.py b/python/ray/serve/tests/test_accelerator_config.py new file mode 100644 index 000000000000..7ef34d276dfd --- /dev/null +++ b/python/ray/serve/tests/test_accelerator_config.py @@ -0,0 +1,126 @@ +import sys +from unittest.mock import MagicMock, patch + +import pytest + +import ray +from ray import serve +from ray.cluster_utils import Cluster +from ray.serve._private.placement_group_utils import ( + CreatePlacementGroupRequest, + ReplicaPlacementGroup, + _create_replica_placement_group, +) +from ray.serve.config import TPUAcceleratorConfig + + +@pytest.fixture(scope="module") +def mock_tpu_cluster(): + # Simulates a Ray cluster with a multi-host TPU v6e-16 slice (4x4 topology). + pod_type = "v6e-16" + topology = "4x4" + cluster = Cluster() + # Head node + cluster.add_node(num_cpus=4) + + # TPU nodes: A 4x4 v6e slice has 16 chips. We simulate 4 hosts with 4 chips each. + for i in range(4): + env_vars = { + "TPU_NAME": "test-slice", + "TPU_WORKER_ID": str(i), + "TPU_ACCELERATOR_TYPE": pod_type, + "TPU_TOPOLOGY": topology, + } + labels = { + "ray.io/tpu-slice-name": "test-slice", + "ray.io/tpu-worker-id": str(i), + "ray.io/tpu-pod-type": pod_type, + } + resources = {"TPU": 4, "accelerator_type:TPU-V6E": 4} + + # The first node is the "head" of the slice + if i == 0: + resources[f"TPU-{pod_type}-head"] = 1 + + cluster.add_node( + num_cpus=8, + resources=resources, + labels=labels, + env_vars=env_vars, + ) + + cluster.wait_for_nodes() + ray.init(address=cluster.address, ignore_reinit_error=True) + serve.start() + yield cluster + serve.shutdown() + ray.shutdown() + cluster.shutdown() + + +def test_tpu_accelerator_config_integration(mock_tpu_cluster): + """Test that AcceleratorConfig correctly creates SlicePlacementGroup in a mock cluster.""" + + tpu_config = TPUAcceleratorConfig(topology="4x4", accelerator_version="v6e") + + request = CreatePlacementGroupRequest( + bundles=[{"CPU": 1}], + strategy="SPREAD", + target_node_id=None, + name="test-tpu-pg", + accelerator_config=tpu_config, + ) + + # This should call _create_tpu_placement_group and return a wrapper + replica_pg = _create_replica_placement_group(request) + + assert isinstance(replica_pg, ReplicaPlacementGroup) + assert replica_pg._slice_pg is not None + + # Verify the placement group is ready + ray.get(replica_pg.placement_group.ready(), timeout=20) + + # Verify cleanup + replica_pg.shutdown() + assert replica_pg._slice_pg is None + + # Verify idempotency of shutdown logic + replica_pg.shutdown() + assert replica_pg._slice_pg is None + + +def test_tpu_accelerator_config_partial_failure_cleanup(mock_tpu_cluster): + """Test that SlicePlacementGroup cleans up head PGs if a multi-slice reservation fails.""" + + # Request 2 slices to test partial failure cleanup + tpu_config = TPUAcceleratorConfig( + topology="4x4", accelerator_version="v6e", num_slices=2 + ) + + request = CreatePlacementGroupRequest( + bundles=[{"CPU": 1}], + strategy="SPREAD", + target_node_id=None, + name="test-tpu-timeout-pg", + accelerator_config=tpu_config, + ) + + # Patch remove_placement_group where it is USED (ray.util.tpu) + with patch("ray.util.tpu.remove_placement_group") as mock_remove: + with patch("ray.util.tpu.reserve_tpu_slice") as mock_reserve: + # Succeed for first slice, fail for second + mock_head_pg = MagicMock() + mock_reserve.side_effect = [ + ("slice-1", mock_head_pg), + TimeoutError("Failed to reserve TPU head"), + ] + + with pytest.raises(TimeoutError, match="Failed to reserve TPU head"): + _create_replica_placement_group(request) + + # Verify that the first slice's head PG was cleanly rolled back + mock_remove.assert_called_once_with(mock_head_pg) + + +if __name__ == "__main__": + sys.exit(pytest.main(["-v", "-s", __file__])) diff --git a/python/ray/serve/tests/test_deployment_scheduler.py b/python/ray/serve/tests/test_deployment_scheduler.py index b3f8688fecc0..7c5ac5bbf2fb 100644 --- a/python/ray/serve/tests/test_deployment_scheduler.py +++ b/python/ray/serve/tests/test_deployment_scheduler.py @@ -68,7 +68,9 @@ def test_spread_deployment_scheduling_policy_upscale( replica_actor_handles = [] replica_placement_groups = [] - def on_scheduled(actor_handle, placement_group): + def on_scheduled( + actor_handle, placement_group=None, placement_group_manager=None + ): replica_actor_handles.append(actor_handle) replica_placement_groups.append(placement_group) diff --git a/python/ray/serve/tests/unit/BUILD.bazel b/python/ray/serve/tests/unit/BUILD.bazel index 9c024f4edc8e..66dd7de1050e 100644 --- a/python/ray/serve/tests/unit/BUILD.bazel +++ b/python/ray/serve/tests/unit/BUILD.bazel @@ -55,6 +55,7 @@ py_test_module_list( "RAY_SERVE_FAIL_ON_RANK_ERROR": "1", }, files = [ + "test_accelerator_config.py", "test_deployment_scheduler.py", "test_deployment_state.py", ], diff --git a/python/ray/serve/tests/unit/test_accelerator_config.py b/python/ray/serve/tests/unit/test_accelerator_config.py new file mode 100644 index 000000000000..5f3756c5b769 --- /dev/null +++ b/python/ray/serve/tests/unit/test_accelerator_config.py @@ -0,0 +1,297 @@ +import sys +from unittest.mock import MagicMock, patch + +import pytest +from pydantic import ValidationError + +from ray.serve._private.placement_group_utils import ( + CreatePlacementGroupRequest, + ReplicaPlacementGroup, + _create_replica_placement_group, +) +from ray.serve.api import deployment +from ray.serve.config import GangSchedulingConfig, TPUAcceleratorConfig +from ray.util.placement_group import PlacementGroup +from ray.util.tpu import SlicePlacementGroup + + +def test_tpu_accelerator_config_construction(): + config = TPUAcceleratorConfig(topology="4x4", accelerator_version="v6e") + assert config.kind == "tpu" + assert config.topology == "4x4" + assert config.num_slices == 1 # default + + +def test_tpu_accelerator_config_immutable(): + config = TPUAcceleratorConfig(topology="4x4", accelerator_version="v6e") + with pytest.raises(ValidationError): + config.topology = "2x2" + + +def test_tpu_accelerator_config_extra_forbid(): + with pytest.raises(ValidationError): + TPUAcceleratorConfig(topology="4x4", accelerator_version="v6e", bogus_field=1) + + +def test_deployment_options_accept_tpu_config_instance(): + config = TPUAcceleratorConfig(topology="4x4", accelerator_version="v6e") + + @deployment(accelerator_config=config) + class D: + pass + + assert isinstance(D._deployment_config.accelerator_config, TPUAcceleratorConfig) + + +def test_deployment_options_accept_dict_form(): + @deployment( + accelerator_config={ + "kind": "tpu", + "topology": "4x4", + "accelerator_version": "v6e", + } + ) + class D: + pass + + cfg = D._deployment_config.accelerator_config + assert isinstance(cfg, TPUAcceleratorConfig) + assert cfg.topology == "4x4" + + +def test_deployment_options_dict_unknown_accelerator_type_raises(): + with pytest.raises(ValueError, match="Unknown accelerator kind"): + + @deployment(accelerator_config={"kind": "xpu"}) + class D: + pass + + +@pytest.mark.parametrize( + "invalid_kwargs", + [ + {"topology": "4x4"}, # missing accelerator_version + {"accelerator_version": "v6e"}, # missing topology + {"topology": 123, "accelerator_version": "v6e"}, # topology should be str + { + "topology": "4x4", + "accelerator_version": "v6e", + "num_slices": "two", + }, # num_slices should be int + { + "topology": "4x4", + "accelerator_version": "v6e", + "num_slices": 0, + }, # num_slices must be >= 1 + ], +) +def test_tpu_accelerator_config_validation(invalid_kwargs): + with pytest.raises(ValidationError): + TPUAcceleratorConfig(**invalid_kwargs) + + +@pytest.mark.parametrize("with_accelerator", [False, True]) +def test_placement_group_creation_types(with_accelerator): + """Verify that _create_replica_placement_group always returns wrappers.""" + + accelerator_config = None + if with_accelerator: + accelerator_config = TPUAcceleratorConfig( + topology="4x4", accelerator_version="v6e" + ) + + request = CreatePlacementGroupRequest( + bundles=[{"CPU": 1.0}], + strategy="SPREAD", + target_node_id="", + name="test", + accelerator_config=accelerator_config, + ) + + mock_pg = MagicMock(spec=PlacementGroup) + + # Accelerator path. Returns a wrapper holding a SlicePlacementGroup. + if with_accelerator: + mock_slice_pg = MagicMock() + mock_slice_pg.placement_group = mock_pg + with patch( + "ray.serve._private.placement_group_utils.slice_placement_group", + return_value=mock_slice_pg, + ): + result = _create_replica_placement_group(request) + # Non-accelerator path. Returns a wrapper holding a regular PG. + else: + with patch("ray.util.placement_group", return_value=mock_pg): + result = _create_replica_placement_group(request) + + assert isinstance(result, ReplicaPlacementGroup), ( + "_create_replica_placement_group must always return a ReplicaPlacementGroup, " + "regardless of whether accelerator_config is set." + ) + assert result.placement_group == mock_pg + + if with_accelerator: + assert ( + result._slice_pg is not None + ), "Accelerator path must set _slice_pg for cleanup tracking." + else: + assert result._slice_pg is None, "Non-accelerator path must not set _slice_pg." + + +@pytest.mark.parametrize("with_accelerator", [False, True]) +def test_replica_pg_shutdown_idempotent(with_accelerator): + """Test that ReplicaPlacementGroup shutdown is idempotent.""" + mock_pg = MagicMock() + + if with_accelerator: + mock_slice_pg = MagicMock() + adapter = ReplicaPlacementGroup( + placement_group=mock_pg, _slice_pg=mock_slice_pg + ) + + adapter.shutdown() + mock_slice_pg.shutdown.assert_called_once() + assert adapter._slice_pg is None + + adapter.shutdown() + assert mock_slice_pg.shutdown.call_count == 1 + else: + adapter = ReplicaPlacementGroup(placement_group=mock_pg) + + with patch( + "ray.serve._private.placement_group_utils.remove_placement_group" + ) as mock_remove: + adapter.shutdown() + mock_remove.assert_called_once_with(mock_pg) + + adapter.shutdown() + assert mock_remove.call_count == 1 + + +def test_create_replica_placement_group_rejects_no_bundles_no_config(): + """Without bundles or a recognized accelerator_config, raises ValueError. + + Catches future accelerator types added to AcceleratorConfig but not + wired into _create_replica_placement_group. + """ + request = CreatePlacementGroupRequest( + bundles=None, + strategy="PACK", + target_node_id="", + name="test", + accelerator_config=None, + ) + with pytest.raises(ValueError, match="requires either non-None bundles"): + _create_replica_placement_group(request) + + +def test_create_replica_placement_group_tpu_ignores_bundles(): + """TPU dispatch ignores request.bundles -- they're derived from topology.""" + request = CreatePlacementGroupRequest( + bundles=[{"CPU": 1}], + strategy="PACK", + target_node_id="", + name="test", + accelerator_config=TPUAcceleratorConfig( + topology="2x2", accelerator_version="v6e" + ), + ) + + mock_slice_pg = MagicMock() + mock_slice_pg.placement_group = MagicMock(spec=PlacementGroup) + + with patch( + "ray.serve._private.placement_group_utils.slice_placement_group", + return_value=mock_slice_pg, + ) as mock_slice_pg_func: + result = _create_replica_placement_group(request) + + mock_slice_pg_func.assert_called_once() + + assert result.placement_group == mock_slice_pg.placement_group + + +def test_tpu_config_resources_per_bundle_forwarded_to_slice_pg(monkeypatch): + """The resources_per_bundle field is forwarded to slice_placement_group.""" + captured = {} + + # Mock slice_placement_group to capture the arguments it receives. + def mock_slice_pg(**kwargs): + captured.update(kwargs) + mock = MagicMock(spec=SlicePlacementGroup) + mock.placement_group = MagicMock() + return mock + + monkeypatch.setattr( + "ray.serve._private.placement_group_utils.slice_placement_group", + mock_slice_pg, + ) + + # Create a config with custom resources per bundle. + config = TPUAcceleratorConfig( + topology="4x4", + accelerator_version="v6e", + resources_per_bundle={"TPU": 1, "memory": 1_000_000}, + ) + request = CreatePlacementGroupRequest( + accelerator_config=config, + name="test", + ) + + # Call the dispatch function. + _create_replica_placement_group(request) + + # Verify that resources_per_bundle and other fields were forwarded correctly. + assert captured["resources_per_bundle"] == {"TPU": 1, "memory": 1_000_000} + assert captured["topology"] == "4x4" + assert captured["accelerator_version"] == "v6e" + + +@pytest.mark.parametrize( + "options, should_raise", + [ + ({}, False), + ( + { + "accelerator_config": TPUAcceleratorConfig( + topology="2x2", accelerator_version="v6e" + ) + }, + False, + ), + ( + { + "gang_scheduling_config": GangSchedulingConfig(gang_size=2), + "num_replicas": 2, + }, + False, + ), + ( + { + "accelerator_config": TPUAcceleratorConfig( + topology="2x2", accelerator_version="v6e" + ), + "gang_scheduling_config": GangSchedulingConfig(gang_size=2), + "num_replicas": 2, + }, + True, + ), + ], +) +def test_deployment_config_mutual_exclusivity(options, should_raise): + """accelerator_config and gang_scheduling_config validation matrix.""" + + def create_deployment(): + @deployment(**options) + class D: + pass + + if should_raise: + with pytest.raises(ValueError, match="Cannot specify both"): + create_deployment() + else: + create_deployment() + + +if __name__ == "__main__": + sys.exit(pytest.main(["-v", "-s", __file__])) diff --git a/python/ray/serve/tests/unit/test_deployment_scheduler.py b/python/ray/serve/tests/unit/test_deployment_scheduler.py index 0aebe93c5fa2..287cf8420099 100644 --- a/python/ray/serve/tests/unit/test_deployment_scheduler.py +++ b/python/ray/serve/tests/unit/test_deployment_scheduler.py @@ -12,7 +12,6 @@ from ray.serve._private import default_impl from ray.serve._private.common import ( GANG_PG_NAME_PREFIX, - CreatePlacementGroupRequest, DeploymentID, GangPlacementGroupRequest, ReplicaID, @@ -32,11 +31,13 @@ SpreadDeploymentSchedulingPolicy, ) from ray.serve._private.deployment_state import DeploymentStateManager +from ray.serve._private.placement_group_utils import CreatePlacementGroupRequest from ray.serve._private.test_utils import ( MockActorClass, MockClusterNodeInfoCache, MockPlacementGroup, ) +from ray.serve.config import TPUAcceleratorConfig from ray.tests.conftest import * # noqa from ray.util.scheduling_strategies import ( In, @@ -570,7 +571,9 @@ def test_schedule_replica(): scheduler = default_impl.create_deployment_scheduler( cluster_node_info_cache, head_node_id_override="fake-head-node-id", - create_placement_group_fn_override=lambda request: MockPlacementGroup(request), + create_placement_group_fn_override=lambda request: default_impl.ReplicaPlacementGroup( + placement_group=MockPlacementGroup(request) + ), ) scheduler.on_deployment_created(d_id, SpreadDeploymentSchedulingPolicy()) @@ -578,7 +581,9 @@ def test_schedule_replica(): scheduling_strategy = None - def set_scheduling_strategy(actor_handle, placement_group): + def set_scheduling_strategy( + actor_handle, placement_group=None, placement_group_manager=None + ): nonlocal scheduling_strategy scheduling_strategy = actor_handle._options["scheduling_strategy"] @@ -708,6 +713,182 @@ def set_scheduling_strategy(actor_handle, placement_group): } +@pytest.mark.parametrize( + "bundles, acc_config_present, expect_pg_created", + [ + # Accelerator config only -> enters PG path + (None, True, True), + # Bundles only -> enters PG path + ([{"CPU": 1}], False, True), + # Both set -> enters PG path + ([{"CPU": 1}], True, True), + # Neither set -> falls through to default scheduling + (None, False, False), + ], +) +def test_schedule_replica_dispatch(bundles, acc_config_present, expect_pg_created): + """Validate that scheduler routes to PG path correctly based on bundles and config.""" + # Setup deployment IDs and cache. + d_id = DeploymentID("deployment_test", "app1") + cluster_node_info_cache = MockClusterNodeInfoCache() + captured_requests = [] + + # Mock create_placement_group_fn to record what it received. + def mock_create_pg(request): + captured_requests.append(request) + return default_impl.ReplicaPlacementGroup( + placement_group=MockPlacementGroup(request) + ) + + # Initialize scheduler with mock PG creator. + scheduler = default_impl.create_deployment_scheduler( + cluster_node_info_cache, + head_node_id_override="fake-head-node-id", + create_placement_group_fn_override=mock_create_pg, + ) + scheduler.on_deployment_created(d_id, SpreadDeploymentSchedulingPolicy()) + scheduler.on_deployment_deployed(d_id, rconfig(ray_actor_options={"num_cpus": 1})) + + r0_id = ReplicaID(unique_id="r0", deployment_id=d_id) + + acc_config = None + if acc_config_present: + acc_config = TPUAcceleratorConfig(topology="2x2", accelerator_version="v6e") + + scheduling_strategy = None + + def set_scheduling_strategy(actor_handle, *args, **kwargs): + nonlocal scheduling_strategy + scheduling_strategy = actor_handle._options["scheduling_strategy"] + + # Construct the scheduling request. + scheduling_request = ReplicaSchedulingRequest( + replica_id=r0_id, + actor_def=MockActorClass(), + actor_resources={"CPU": 1}, + placement_group_bundles=bundles, + accelerator_config=acc_config, + actor_options={"name": "r0"}, + actor_init_args=(), + on_scheduled=set_scheduling_strategy, + ) + + scheduler._pending_replicas[d_id][r0_id] = scheduling_request + + # Call _schedule_replica. + scheduler._schedule_replica( + scheduling_request=scheduling_request, + default_scheduling_strategy="some_default", + target_node_id=None, + target_labels=None, + ) + + # Verify scheduling params are as expected. + if expect_pg_created: + assert len(captured_requests) == 1 + assert captured_requests[0].accelerator_config == acc_config + assert captured_requests[0].bundles == bundles + else: + assert len(captured_requests) == 0 + assert scheduling_strategy == "some_default" + + +def test_placement_group_strategy_defaulting(): + """Validate that placement group strategy defaults to SPREAD for TPU configs and PACK for standard.""" + d_id = DeploymentID("strategy_test", "app1") + cluster_node_info_cache = MockClusterNodeInfoCache() + captured_requests = [] + + def mock_create_pg(request): + captured_requests.append(request) + return default_impl.ReplicaPlacementGroup( + placement_group=MockPlacementGroup(request) + ) + + scheduler = default_impl.create_deployment_scheduler( + cluster_node_info_cache, + head_node_id_override="fake-head-node-id", + create_placement_group_fn_override=mock_create_pg, + ) + scheduler.on_deployment_created(d_id, SpreadDeploymentSchedulingPolicy()) + scheduler.on_deployment_deployed(d_id, rconfig(ray_actor_options={"num_cpus": 1})) + + # Case 1: TPU Accelerator Config is set, placement_group_strategy is not. + # Expect strategy defaults to "SPREAD". + r0_id = ReplicaID(unique_id="r0", deployment_id=d_id) + acc_config = TPUAcceleratorConfig(topology="2x2", accelerator_version="v6e") + req_tpu = ReplicaSchedulingRequest( + replica_id=r0_id, + actor_def=MockActorClass(), + actor_resources={"CPU": 1}, + placement_group_bundles=None, + accelerator_config=acc_config, + placement_group_strategy=None, + actor_options={"name": "r0"}, + actor_init_args=(), + on_scheduled=lambda *args, **kwargs: None, + ) + scheduler._pending_replicas[d_id][r0_id] = req_tpu + scheduler._schedule_replica( + scheduling_request=req_tpu, + default_scheduling_strategy="some_default", + target_node_id=None, + target_labels=None, + ) + assert len(captured_requests) == 1 + assert captured_requests[0].strategy == "SPREAD" + + captured_requests.clear() + + # Case 2: TPU Accelerator Config is set, and placement_group_strategy is explicitly provided. + # Expect strategy is respected. + req_tpu_explicit = ReplicaSchedulingRequest( + replica_id=r0_id, + actor_def=MockActorClass(), + actor_resources={"CPU": 1}, + placement_group_bundles=None, + accelerator_config=acc_config, + placement_group_strategy="STRICT_PACK", + actor_options={"name": "r0"}, + actor_init_args=(), + on_scheduled=lambda *args, **kwargs: None, + ) + scheduler._pending_replicas[d_id][r0_id] = req_tpu_explicit + scheduler._schedule_replica( + scheduling_request=req_tpu_explicit, + default_scheduling_strategy="some_default", + target_node_id=None, + target_labels=None, + ) + assert len(captured_requests) == 1 + assert captured_requests[0].strategy == "STRICT_PACK" + + captured_requests.clear() + + # Case 3: Standard GPU/CPU config (bundles set), placement_group_strategy is not. + # Expect strategy defaults to "PACK". + req_std = ReplicaSchedulingRequest( + replica_id=r0_id, + actor_def=MockActorClass(), + actor_resources={"CPU": 1}, + placement_group_bundles=[{"CPU": 1}], + accelerator_config=None, + placement_group_strategy=None, + actor_options={"name": "r0"}, + actor_init_args=(), + on_scheduled=lambda *args, **kwargs: None, + ) + scheduler._pending_replicas[d_id][r0_id] = req_std + scheduler._schedule_replica( + scheduling_request=req_std, + default_scheduling_strategy="some_default", + target_node_id=None, + target_labels=None, + ) + assert len(captured_requests) == 1 + assert captured_requests[0].strategy == "PACK" + + def test_downscale_multiple_deployments(): """Test to make sure downscale prefers replicas without node id and then replicas on a node with fewest replicas of all deployments. @@ -921,7 +1102,7 @@ def test_downscale_single_deployment(): actor_resources={"CPU": 1}, actor_options={}, actor_init_args=(), - on_scheduled=lambda actor_handle, placement_group: actor_handle, + on_scheduled=lambda actor_handle, *args, **kwargs: actor_handle, ), ] }, @@ -1246,7 +1427,10 @@ def test_basic(self): assert len(on_scheduled_mock.call_args_list) == 2 for call in on_scheduled_mock.call_args_list: - assert call.kwargs == {"placement_group": None} + assert call.kwargs == { + "placement_group": None, + "placement_group_manager": None, + } assert len(call.args) == 1 scheduling_strategy = call.args[0]._options["scheduling_strategy"] assert isinstance(scheduling_strategy, NodeAffinitySchedulingStrategy) @@ -1254,7 +1438,7 @@ def test_basic(self): assert len(on_scheduled_mock2.call_args_list) == 1 call = on_scheduled_mock2.call_args_list[0] - assert call.kwargs == {"placement_group": None} + assert call.kwargs == {"placement_group": None, "placement_group_manager": None} assert len(call.args) == 1 scheduling_strategy = call.args[0]._options["scheduling_strategy"] assert isinstance(scheduling_strategy, NodeAffinitySchedulingStrategy) @@ -1270,9 +1454,9 @@ def test_placement_groups(self): scheduler = default_impl.create_deployment_scheduler( cluster_node_info_cache, head_node_id_override="fake-head-node-id", - create_placement_group_fn_override=lambda *args, **kwargs: MockPlacementGroup( # noqa - *args, **kwargs - ), + create_placement_group_fn_override=lambda *args, **kwargs: default_impl.ReplicaPlacementGroup( + placement_group=MockPlacementGroup(*args, **kwargs) + ), # noqa ) _ = ray.util.placement_group @@ -1413,7 +1597,10 @@ def test_heterogeneous_resources(self): scheduling_strategy = call.args[0]._options["scheduling_strategy"] assert isinstance(scheduling_strategy, NodeAffinitySchedulingStrategy) assert scheduling_strategy.node_id == node_id_1 - assert call.kwargs == {"placement_group": None} + assert call.kwargs == { + "placement_group": None, + "placement_group_manager": None, + } def test_max_replicas_per_node(self): """Test that at most `max_replicas_per_node` number of replicas @@ -1431,9 +1618,9 @@ def test_max_replicas_per_node(self): scheduler = default_impl.create_deployment_scheduler( cluster_node_info_cache, head_node_id_override="fake-head-node-id", - create_placement_group_fn_override=lambda *args, **kwargs: MockPlacementGroup( # noqa - *args, **kwargs - ), + create_placement_group_fn_override=lambda *args, **kwargs: default_impl.ReplicaPlacementGroup( + placement_group=MockPlacementGroup(*args, **kwargs) + ), # noqa ) scheduler.on_deployment_created(d_id1, SpreadDeploymentSchedulingPolicy()) scheduler.on_deployment_deployed( @@ -1445,7 +1632,9 @@ def test_max_replicas_per_node(self): state = defaultdict(int) - def on_scheduled(actor_handle, placement_group): + def on_scheduled( + actor_handle, placement_group=None, placement_group_manager=None + ): scheduling_strategy = actor_handle._options["scheduling_strategy"] if isinstance(scheduling_strategy, NodeAffinitySchedulingStrategy): state[scheduling_strategy.node_id] += 1 @@ -1589,9 +1778,9 @@ def test_custom_resources(self): scheduler = default_impl.create_deployment_scheduler( cluster_node_info_cache, head_node_id_override="fake-head-node-id", - create_placement_group_fn_override=lambda *args, **kwargs: MockPlacementGroup( # noqa - *args, **kwargs - ), + create_placement_group_fn_override=lambda *args, **kwargs: default_impl.ReplicaPlacementGroup( + placement_group=MockPlacementGroup(*args, **kwargs) + ), # noqa ) scheduler.on_deployment_created(d_id, SpreadDeploymentSchedulingPolicy()) scheduler.on_deployment_deployed( @@ -1603,7 +1792,9 @@ def test_custom_resources(self): # Despite trying to schedule on node that minimizes fragmentation, # should respect custom resources and schedule onto node2 - def on_scheduled(actor_handle, placement_group): + def on_scheduled( + actor_handle, placement_group=None, placement_group_manager=None + ): scheduling_strategy = actor_handle._options["scheduling_strategy"] assert isinstance(scheduling_strategy, NodeAffinitySchedulingStrategy) assert scheduling_strategy.node_id == node_id_2 @@ -1718,7 +1909,9 @@ def fail_once_create_pg(request): call_count += 1 if call_count == 1: raise RuntimeError("Simulated PG creation failure") - return MockPlacementGroup(request) + return default_impl.ReplicaPlacementGroup( + placement_group=MockPlacementGroup(request) + ) scheduler = default_impl.create_deployment_scheduler( cluster_node_info_cache, @@ -1858,7 +2051,10 @@ def test_pack_prefers_newly_non_idle_node(self): strategy1 = call1.args[0]._options["scheduling_strategy"] assert isinstance(strategy1, NodeAffinitySchedulingStrategy) assert strategy1.node_id == node_id_1 - assert call1.kwargs == {"placement_group": None} + assert call1.kwargs == { + "placement_group": None, + "placement_group_manager": None, + } # The CPU replica should also go to node 1 (now non-idle) rather # than node 2 (idle but tighter fit). The PACK scheduler prefers @@ -1868,7 +2064,10 @@ def test_pack_prefers_newly_non_idle_node(self): strategy2 = call2.args[0]._options["scheduling_strategy"] assert isinstance(strategy2, NodeAffinitySchedulingStrategy) assert strategy2.node_id == node_id_1 - assert call2.kwargs == {"placement_group": None} + assert call2.kwargs == { + "placement_group": None, + "placement_group_manager": None, + } class TestScheduleGangPlacementGroups: diff --git a/python/ray/serve/tests/unit/test_deployment_state.py b/python/ray/serve/tests/unit/test_deployment_state.py index 94c84c93505a..eae9927272d3 100644 --- a/python/ray/serve/tests/unit/test_deployment_state.py +++ b/python/ray/serve/tests/unit/test_deployment_state.py @@ -65,7 +65,11 @@ get_capacity_adjusted_num_replicas, get_random_string, ) -from ray.serve.config import DeploymentActorConfig, GangSchedulingConfig +from ray.serve.config import ( + DeploymentActorConfig, + GangSchedulingConfig, + TPUAcceleratorConfig, +) from ray.serve.schema import ReplicaRank from ray.util.placement_group import validate_placement_group @@ -8674,6 +8678,33 @@ def test_gang_downscale_prefers_pending_gang(self, mock_deployment_state_manager ) assert ds.curr_status_info.status == DeploymentStatus.HEALTHY + def test_accelerator_deployment_skips_gang_setup( + self, mock_deployment_state_manager + ): + """A deployment with accelerator_config should not create gang PG state.""" + create_dsm, _, _, _ = mock_deployment_state_manager + dsm: DeploymentStateManager = create_dsm() + deployment_id = DeploymentID(name="accelerator_skips_gang", app_name="app") + + info, version = deployment_info( + num_replicas=2, + version="v1", + accelerator_config=TPUAcceleratorConfig( + topology="4x4", accelerator_version="v6e" + ), + gang_scheduling_config=GangSchedulingConfig(gang_size=2), + ) + + dsm.deploy(deployment_id, info) + ds = dsm._deployment_states[deployment_id] + ds._add_upscale_gang_replicas = MagicMock() + ds._add_upscale_replicas = MagicMock(return_value=[]) + + dsm.update() + + ds._add_upscale_gang_replicas.assert_not_called() + ds._add_upscale_replicas.assert_called_once() + class TestGangHealthCheck: def _deploy_gang(self, mock_deployment_state_manager, gang_size, num_replicas): diff --git a/python/ray/serve/tests/unit/test_deployment_version.py b/python/ray/serve/tests/unit/test_deployment_version.py index 6a4779769fba..7e7e3b6d3dff 100644 --- a/python/ray/serve/tests/unit/test_deployment_version.py +++ b/python/ray/serve/tests/unit/test_deployment_version.py @@ -2,6 +2,7 @@ from ray.serve._private.config import DeploymentConfig from ray.serve._private.deployment_state import DeploymentVersion +from ray.serve.config import TPUAcceleratorConfig def test_validation(): @@ -406,6 +407,52 @@ def test_requires_long_poll_broadcast(): assert not v1.requires_long_poll_broadcast(v2) +def test_accelerator_config(): + v1 = DeploymentVersion("1", DeploymentConfig(), {"num_cpus": 0.1}) + v2 = DeploymentVersion( + "1", + DeploymentConfig( + accelerator_config=TPUAcceleratorConfig( + topology="4x4", accelerator_version="v6e" + ) + ), + {"num_cpus": 0.1}, + ) + v3 = DeploymentVersion( + "1", + DeploymentConfig( + accelerator_config=TPUAcceleratorConfig( + topology="4x4", accelerator_version="v6e" + ) + ), + {"num_cpus": 0.1}, + ) + v4 = DeploymentVersion( + "1", + DeploymentConfig( + accelerator_config=TPUAcceleratorConfig( + topology="2x2", accelerator_version="v6e" + ) + ), + {"num_cpus": 0.1}, + ) + + # Changing accelerator_config from None -> TPUAcceleratorConfig triggers restart + assert v1 != v2 + assert hash(v1) != hash(v2) + assert v1.requires_actor_restart(v2) + + # Same TPUAcceleratorConfig does not trigger restart + assert v2 == v3 + assert hash(v2) == hash(v3) + assert not v2.requires_actor_restart(v3) + + # Changing topology from 4x4 -> 2x2 triggers restart + assert v3 != v4 + assert hash(v3) != hash(v4) + assert v3.requires_actor_restart(v4) + + if __name__ == "__main__": import sys diff --git a/python/ray/serve/tests/unit/test_schema.py b/python/ray/serve/tests/unit/test_schema.py index 5d2bb53d3558..2a39b92fea53 100644 --- a/python/ray/serve/tests/unit/test_schema.py +++ b/python/ray/serve/tests/unit/test_schema.py @@ -18,6 +18,7 @@ GangPlacementStrategy, GangRuntimeFailurePolicy, GangSchedulingConfig, + TPUAcceleratorConfig, ) from ray.serve.deployment import Deployment, deployment_to_schema, schema_to_deployment from ray.serve.schema import ( @@ -1220,6 +1221,85 @@ def test_schema_to_deployment_gang_scheduling_config_from_dict(): assert dep.num_replicas == 6 +def test_accelerator_config_deployment_schema_roundtrip(): + # Ensure deployment_to_schema -> schema_to_deployment preserves accelerator_config + accelerator_config = TPUAcceleratorConfig(topology="4x4", accelerator_version="v6e") + dc = DeploymentConfig.from_default( + num_replicas=4, + accelerator_config=accelerator_config, + ) + dc.user_configured_option_names = {"num_replicas", "accelerator_config"} + + rc = ReplicaConfig.create(deployment_def="", init_args=(), init_kwargs={}) + dep = Deployment( + name="TpuDep", + deployment_config=dc, + replica_config=rc, + _internal=True, + ) + + schema = deployment_to_schema(dep) + assert isinstance(schema.accelerator_config, TPUAcceleratorConfig) + assert schema.accelerator_config.topology == "4x4" + assert schema.accelerator_config.accelerator_version == "v6e" + assert schema.num_replicas == 4 + + dep2 = schema_to_deployment(schema) + ac2 = dep2._deployment_config.accelerator_config + assert isinstance(ac2, TPUAcceleratorConfig) + assert ac2.topology == "4x4" + assert ac2.accelerator_version == "v6e" + assert dep2.num_replicas == 4 + + +def test_schema_to_deployment_accelerator_config_from_dict(): + # Ensure schema_to_deployment works when accelerator_config comes from a dict + schema = DeploymentSchema.model_validate( + { + "name": "TpuDep", + "num_replicas": 2, + "accelerator_config": { + "kind": "tpu", + "topology": "2x2", + "accelerator_version": "v6e", + }, + } + ) + + assert isinstance(schema.accelerator_config, TPUAcceleratorConfig) + assert schema.accelerator_config.topology == "2x2" + + dep = schema_to_deployment(schema) + ac = dep._deployment_config.accelerator_config + assert isinstance(ac, TPUAcceleratorConfig) + assert ac.topology == "2x2" + assert ac.accelerator_version == "v6e" + assert dep.num_replicas == 2 + + +def test_mutual_exclusivity_accelerator_and_gang(): + # Cannot specify both accelerator_config and gang_scheduling_config + with pytest.raises(ValueError) as e: + DeploymentSchema.model_validate( + { + "name": "TpuDep", + "num_replicas": 2, + "accelerator_config": { + "kind": "tpu", + "topology": "2x2", + "accelerator_version": "v6e", + }, + "gang_scheduling_config": { + "gang_size": 2, + }, + } + ) + assert ( + "Cannot specify both `accelerator_config` and `gang_scheduling_config`" + in str(e.value) + ) + + def test_deployment_actors_deployment_schema_roundtrip(): """Ensure deployment_to_schema -> schema_to_deployment preserves deployment_actors.""" actor_config = DeploymentActorConfig( diff --git a/src/ray/protobuf/serve.proto b/src/ray/protobuf/serve.proto index a92c0ac3248a..01773799add0 100644 --- a/src/ray/protobuf/serve.proto +++ b/src/ray/protobuf/serve.proto @@ -249,6 +249,9 @@ message DeploymentConfig { // rolling upgrade) is distinguishable from an explicit value. When unset, // the Python side falls back to DEFAULT_ROLLING_UPDATE_PERCENTAGE (0.2). optional double rolling_update_percentage = 23; + + // Structured accelerator configuration for a Serve deployment. + bytes accelerator_config = 24; } // Deployment language.