Skip to content

Commit 1eec5e9

Browse files
committed
Add autoscaling support for Dask and Ray clusters
1 parent f36159f commit 1eec5e9

10 files changed

Lines changed: 368 additions & 5 deletions

File tree

cli/polyaxon/_flow/run/dask/dask.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
from typing import Optional, Union
22
from typing_extensions import Literal
33

4-
from clipped.types.ref_or_obj import RefField
4+
from clipped.compact.pydantic import Field
5+
from clipped.types.ref_or_obj import IntOrRef, RefField
56

67
from polyaxon._flow.run.base import BaseRun
78
from polyaxon._flow.run.dask.replica import V1DaskReplica
@@ -22,6 +23,8 @@ class V1DaskCluster(BaseRun, DestinationImageMixin):
2223
kind: str, should be equal `daskcluster`
2324
worker: V1DaskReplica, optional - Worker replica specification
2425
scheduler: V1DaskReplica, optional - Scheduler replica specification
26+
min_replicas: int, optional - Minimum number of workers for autoscaling
27+
max_replicas: int, optional - Maximum number of workers for autoscaling
2528
2629
## YAML usage
2730
@@ -44,6 +47,25 @@ class V1DaskCluster(BaseRun, DestinationImageMixin):
4447
>>> memory: 1Gi
4548
>>> cpu: 1
4649
```
50+
51+
## Autoscaling
52+
53+
Enable autoscaling by setting `minReplicas` and `maxReplicas`.
54+
When both are set, a DaskAutoscaler resource will be created.
55+
56+
```yaml
57+
>>> run:
58+
>>> kind: daskcluster
59+
>>> minReplicas: 1
60+
>>> maxReplicas: 10
61+
>>> worker:
62+
>>> replicas: 2
63+
>>> container:
64+
>>> image: daskdev/dask:latest
65+
>>> scheduler:
66+
>>> container:
67+
>>> image: daskdev/dask:latest
68+
```
4769
"""
4870

4971
_IDENTIFIER = V1RunKind.DASKCLUSTER
@@ -52,6 +74,8 @@ class V1DaskCluster(BaseRun, DestinationImageMixin):
5274
kind: Literal[_IDENTIFIER] = _IDENTIFIER
5375
worker: Optional[Union[V1DaskReplica, RefField]] = None
5476
scheduler: Optional[Union[V1DaskReplica, RefField]] = None
77+
min_replicas: Optional[IntOrRef] = Field(alias="minReplicas", default=None)
78+
max_replicas: Optional[IntOrRef] = Field(alias="maxReplicas", default=None)
5579

5680
def apply_image_destination(self, image: str):
5781
if self.worker:
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
1+
from polyaxon._flow.run.ray.autoscaler import V1RayAutoscalerOptions
12
from polyaxon._flow.run.ray.ray import V1RayCluster
23
from polyaxon._flow.run.ray.replica import V1RayReplica
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
from typing import Optional, Union
2+
3+
from clipped.compact.pydantic import Field
4+
from clipped.types.ref_or_obj import RefField
5+
6+
from polyaxon._k8s import k8s_schemas
7+
from polyaxon._schemas.base import BaseSchemaModel
8+
9+
10+
class V1RayAutoscalerOptions(BaseSchemaModel):
11+
"""Ray autoscaler options for configuring automatic scaling behavior.
12+
13+
Args:
14+
upscaling_mode: str, optional - "Conservative", "Default", or "Aggressive"
15+
image_pull_policy: str, optional - "IfNotPresent", "Always", or "Never"
16+
resources: V1ResourceRequirements, optional - Resources for the autoscaler container
17+
18+
## YAML usage
19+
20+
```yaml
21+
>>> run:
22+
>>> kind: raycluster
23+
>>> autoscalerOptions:
24+
>>> upscalingMode: Default
25+
>>> imagePullPolicy: IfNotPresent
26+
```
27+
28+
## Python usage
29+
30+
```python
31+
>>> from polyaxon.schemas import V1RayAutoscalerOptions
32+
>>> autoscaler_options = V1RayAutoscalerOptions(
33+
>>> upscaling_mode="Default",
34+
>>> image_pull_policy="IfNotPresent",
35+
>>> )
36+
```
37+
38+
## Fields
39+
40+
### upscalingMode
41+
42+
Controls how aggressively the autoscaler scales up workers.
43+
44+
- `Conservative`: Upscaling is rate-limited; the number of pending worker pods
45+
is at most the size of the Ray cluster.
46+
- `Default`: Upscaling is not rate-limited.
47+
- `Aggressive`: An alias for Default; upscaling is not rate-limited.
48+
49+
```yaml
50+
>>> autoscalerOptions:
51+
>>> upscalingMode: Default
52+
```
53+
54+
### imagePullPolicy
55+
56+
Image pull policy for the autoscaler container.
57+
58+
```yaml
59+
>>> autoscalerOptions:
60+
>>> imagePullPolicy: IfNotPresent
61+
```
62+
63+
### resources
64+
65+
Resource requirements for the autoscaler container.
66+
67+
```yaml
68+
>>> autoscalerOptions:
69+
>>> resources:
70+
>>> limits:
71+
>>> cpu: "500m"
72+
>>> memory: "512Mi"
73+
>>> requests:
74+
>>> cpu: "250m"
75+
>>> memory: "256Mi"
76+
```
77+
"""
78+
79+
_IDENTIFIER = "ray_autoscaler_options"
80+
81+
upscaling_mode: Optional[str] = Field(alias="upscalingMode", default=None)
82+
image_pull_policy: Optional[str] = Field(alias="imagePullPolicy", default=None)
83+
resources: Optional[Union[k8s_schemas.V1ResourceRequirements, RefField]] = None

cli/polyaxon/_flow/run/ray/ray.py

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22
from typing_extensions import Literal
33

44
from clipped.compact.pydantic import Field
5-
from clipped.types.ref_or_obj import RefField
5+
from clipped.types.ref_or_obj import BoolOrRef, RefField
66

77
from polyaxon._flow.run.base import BaseRun
88
from polyaxon._flow.run.enums import V1RunKind
9+
from polyaxon._flow.run.ray.autoscaler import V1RayAutoscalerOptions
910
from polyaxon._flow.run.ray.replica import V1RayReplica
1011
from polyaxon._flow.run.resources import V1RunResources
1112
from polyaxon._flow.run.utils import DestinationImageMixin
@@ -27,6 +28,8 @@ class V1RayCluster(BaseRun, DestinationImageMixin):
2728
ray_version: str, optional
2829
head: [V1RayReplica](/docs/experimentation/distributed/ray-replica/), optional
2930
workers: Dict[str, [V1RayReplica](/docs/experimentation/distributed/ray-replica/)], optional
31+
enable_in_tree_autoscaling: bool, optional
32+
autoscaler_options: [V1RayAutoscalerOptions](/docs/experimentation/distributed/ray-autoscaler/), optional
3033
3134
3235
## YAML usage
@@ -40,6 +43,8 @@ class V1RayCluster(BaseRun, DestinationImageMixin):
4043
>>> rayVersion:
4144
>>> head:
4245
>>> workers:
46+
>>> enableInTreeAutoscaling:
47+
>>> autoscalerOptions:
4348
```
4449
4550
## Python usage
@@ -52,6 +57,7 @@ class V1RayCluster(BaseRun, DestinationImageMixin):
5257
>>> ray_version="2.5.0",
5358
>>> head=V1RayReplica(...),
5459
>>> worker=V1RayReplica(...),
60+
>>> enable_in_tree_autoscaling=True,
5561
>>> )
5662
```
5763
@@ -132,10 +138,42 @@ class V1RayCluster(BaseRun, DestinationImageMixin):
132138
>>> ...
133139
>>> ...
134140
```
141+
142+
### enableInTreeAutoscaling
143+
144+
Enable the KubeRay in-tree autoscaler for automatic worker scaling.
145+
When enabled, the Ray autoscaler will automatically scale workers based on workload demands.
146+
147+
> **Note**: The `idleTimeoutSeconds` for the autoscaler is derived from `termination.culling.timeout`.
148+
149+
```yaml
150+
>>> run:
151+
>>> kind: raycluster
152+
>>> enableInTreeAutoscaling: true
153+
>>> workers:
154+
>>> gpu-workers:
155+
>>> minReplicas: 0
156+
>>> maxReplicas: 10
157+
>>> ...
158+
```
159+
160+
### autoscalerOptions
161+
162+
Optional configuration for the Ray autoscaler behavior.
163+
164+
```yaml
165+
>>> run:
166+
>>> kind: raycluster
167+
>>> enableInTreeAutoscaling: true
168+
>>> autoscalerOptions:
169+
>>> upscalingMode: Default
170+
>>> imagePullPolicy: IfNotPresent
171+
>>> ...
172+
```
135173
"""
136174

137175
_IDENTIFIER = V1RunKind.RAYCLUSTER
138-
_CUSTOM_DUMP_FIELDS = {"head", "workers"}
176+
_CUSTOM_DUMP_FIELDS = {"head", "workers", "autoscaler_options"}
139177
_FIELDS_DICT_PATCH = ["workers"]
140178

141179
kind: Literal[_IDENTIFIER] = _IDENTIFIER
@@ -147,6 +185,12 @@ class V1RayCluster(BaseRun, DestinationImageMixin):
147185
ray_version: Optional[str] = Field(alias="rayVersion", default=None)
148186
head: Optional[Union[V1RayReplica, RefField]] = None
149187
workers: Optional[Dict[str, Union[V1RayReplica, RefField]]] = Field(default=None)
188+
enable_in_tree_autoscaling: Optional[BoolOrRef] = Field(
189+
alias="enableInTreeAutoscaling", default=None
190+
)
191+
autoscaler_options: Optional[Union[V1RayAutoscalerOptions, RefField]] = Field(
192+
alias="autoscalerOptions", default=None
193+
)
150194

151195
def apply_image_destination(self, image: str):
152196
if self.head:

cli/polyaxon/_k8s/converter/converters/dask_cluster.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,4 +64,6 @@ def _get_replica(replica: Optional[V1DaskReplica]) -> Optional[ReplicaSpec]:
6464
notifications=plugins.notifications,
6565
labels=labels,
6666
annotations=self.annotations,
67+
min_replicas=cluster.min_replicas,
68+
max_replicas=cluster.max_replicas,
6769
)

cli/polyaxon/_k8s/converter/converters/ray_cluster.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,6 @@ def _get_replica(replica: Optional[V1RayReplica]) -> Optional[ReplicaSpec]:
7777
notifications=plugins.notifications,
7878
labels=labels,
7979
annotations=self.annotations,
80+
enable_in_tree_autoscaling=cluster.enable_in_tree_autoscaling,
81+
autoscaler_options=cluster.autoscaler_options,
8082
)

cli/polyaxon/_k8s/custom_resources/dask_cluster.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ def get_dask_cluster_custom_resource(
8585
notifications: List[V1Notification],
8686
labels: Dict[str, str],
8787
annotations: Dict[str, str],
88+
min_replicas: Optional[int] = None,
89+
max_replicas: Optional[int] = None,
8890
) -> Dict:
8991
template_spec = {}
9092
get_dask_replicas_template(
@@ -160,6 +162,10 @@ def get_dask_cluster_custom_resource(
160162
],
161163
)
162164
template_spec = {"replicaSpecs": template_spec, "service": service}
165+
# Add autoscaling configuration if both min and max replicas are set
166+
if min_replicas is not None and max_replicas is not None:
167+
template_spec["minReplicas"] = min_replicas
168+
template_spec["maxReplicas"] = max_replicas
163169
custom_object = {"daskClusterSpec": template_spec}
164170
custom_object = set_termination(
165171
custom_object=custom_object, termination=termination

cli/polyaxon/_k8s/custom_resources/ray_cluster.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from typing import Dict, List, Optional
22

33
from polyaxon._flow import V1Notification, V1Termination
4+
from polyaxon._flow.run.ray.autoscaler import V1RayAutoscalerOptions
45
from polyaxon._k8s import k8s_schemas
56
from polyaxon._k8s.converter.pod.spec import get_pod_spec, get_pod_template_spec
67
from polyaxon._k8s.custom_resources.operation import (
@@ -138,6 +139,24 @@ def get_ray_worker_replicas_template(
138139
template_spec["workers"] = workers
139140

140141

142+
def _get_autoscaler_options(
143+
autoscaler_options: Optional[V1RayAutoscalerOptions],
144+
) -> Optional[Dict]:
145+
"""Build autoscaler options dict from user-provided options."""
146+
if not autoscaler_options:
147+
return None
148+
149+
result = {}
150+
if autoscaler_options.upscaling_mode:
151+
result["upscalingMode"] = autoscaler_options.upscaling_mode
152+
if autoscaler_options.image_pull_policy:
153+
result["imagePullPolicy"] = autoscaler_options.image_pull_policy
154+
if autoscaler_options.resources:
155+
result["resources"] = autoscaler_options.resources
156+
157+
return result if result else None
158+
159+
141160
def get_ray_cluster_custom_resource(
142161
resource_name: str,
143162
namespace: str,
@@ -153,6 +172,8 @@ def get_ray_cluster_custom_resource(
153172
ray_version: Optional[str],
154173
labels: Dict[str, str],
155174
annotations: Dict[str, str],
175+
enable_in_tree_autoscaling: Optional[bool] = None,
176+
autoscaler_options: Optional[V1RayAutoscalerOptions] = None,
156177
) -> Dict:
157178
template_spec = {}
158179

@@ -181,6 +202,14 @@ def get_ray_cluster_custom_resource(
181202
if ray_version:
182203
template_spec["rayVersion"] = ray_version
183204

205+
# Add autoscaling configuration
206+
if enable_in_tree_autoscaling is not None:
207+
template_spec["enableInTreeAutoscaling"] = enable_in_tree_autoscaling
208+
209+
autoscaler_opts = _get_autoscaler_options(autoscaler_options)
210+
if autoscaler_opts:
211+
template_spec["autoscalerOptions"] = autoscaler_opts
212+
184213
custom_object = {"rayClusterSpec": template_spec}
185214
custom_object = set_termination(
186215
custom_object=custom_object, termination=termination

0 commit comments

Comments
 (0)