Skip to content

Commit 6ca1ecb

Browse files
committed
Add pod failure policy to termination
1 parent a547925 commit 6ca1ecb

10 files changed

Lines changed: 272 additions & 119 deletions

File tree

cli/polyaxon/_flow/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,11 @@
9999
V1ScheduleKind,
100100
)
101101
from polyaxon._flow.templates import V1Template
102-
from polyaxon._flow.termination import V1Termination
102+
from polyaxon._flow.termination import (
103+
V1Termination,
104+
V1Culling,
105+
V1ActivityProbe,
106+
)
103107
from polyaxon._flow.trigger_policies import V1TriggerPolicy
104108

105109
# Forward references for operations and components

cli/polyaxon/_flow/component/component.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ class V1Component(
392392
"""
393393

394394
_IDENTIFIER = "component"
395-
_CUSTOM_DUMP_FIELDS = {"run"}
395+
_CUSTOM_DUMP_FIELDS = {"run", "termination"}
396396

397397
kind: Literal["component"] = _IDENTIFIER
398398
inputs: Optional[List[V1IO]] = None

cli/polyaxon/_flow/operations/compiled_operation.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
class V1CompiledOperation(BaseOp, RunMixin):
1212
_IDENTIFIER = "compiled_operation"
13-
_CUSTOM_DUMP_FIELDS = {"run"}
13+
_CUSTOM_DUMP_FIELDS = {"run", "termination"}
1414

1515
kind: Literal["compiled_operation"] = _IDENTIFIER
1616
inputs: Optional[List[V1IO]] = None

cli/polyaxon/_flow/operations/operation.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ class V1Operation(BaseOp, TemplateMixinConfig):
530530
"run_patch",
531531
"patch_strategy",
532532
]
533-
_CUSTOM_DUMP_FIELDS = {"component"}
533+
_CUSTOM_DUMP_FIELDS = {"component", "termination"}
534534

535535
kind: Literal["operation"] = _IDENTIFIER
536536
params: Optional[Dict[StrictStr, V1Param]] = None

cli/polyaxon/_flow/run/ray/autoscaler.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
from typing import Optional, Union
22

3-
from clipped.compact.pydantic import Field
3+
from clipped.compact.pydantic import (
4+
Field,
5+
field_validator,
6+
validation_before,
7+
validation_always,
8+
)
49
from clipped.types.ref_or_obj import RefField
510

6-
from polyaxon._k8s import k8s_schemas
11+
from polyaxon._k8s import k8s_schemas, k8s_validation
712
from polyaxon._schemas.base import BaseSchemaModel
813

914

@@ -77,7 +82,14 @@ class V1RayAutoscalerOptions(BaseSchemaModel):
7782
"""
7883

7984
_IDENTIFIER = "ray_autoscaler_options"
85+
_SWAGGER_FIELDS = [
86+
"resources",
87+
]
8088

8189
upscaling_mode: Optional[str] = Field(alias="upscalingMode", default=None)
8290
image_pull_policy: Optional[str] = Field(alias="imagePullPolicy", default=None)
8391
resources: Optional[Union[k8s_schemas.V1ResourceRequirements, RefField]] = None
92+
93+
@field_validator("resources", **validation_always, **validation_before)
94+
def validate_container(cls, v):
95+
return k8s_validation.validate_k8s_resource_requirements(v)

cli/polyaxon/_flow/termination/__init__.py

Lines changed: 77 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -1,118 +1,17 @@
1-
from typing import Optional, List
1+
from typing import Optional, Union
22

3-
from clipped.compact.pydantic import Field
4-
from clipped.types.ref_or_obj import IntOrRef
3+
from clipped.compact.pydantic import (
4+
Field,
5+
field_validator,
6+
validation_always,
7+
validation_before,
8+
)
9+
from clipped.types.ref_or_obj import IntOrRef, RefField
10+
from polyaxon._k8s import k8s_schemas, k8s_validation
511

612
from polyaxon._schemas.base import BaseSchemaModel
713

814

9-
class V1ActivityProbeHttp(BaseSchemaModel):
10-
"""HTTP-based activity probe configuration for detecting service activity.
11-
12-
Used with service culling to check for activity by polling an HTTP endpoint.
13-
Commonly used with Jupyter notebooks to poll the `/api/status` endpoint.
14-
15-
Args:
16-
path: str, optional - The HTTP path to poll for activity status
17-
port: int, optional - The port number where the service is listening
18-
19-
## YAML usage
20-
21-
```yaml
22-
>>> probe:
23-
>>> http:
24-
>>> path: "/api/status"
25-
>>> port: 8888
26-
```
27-
28-
## Python usage
29-
30-
```python
31-
>>> from polyaxon.schemas import V1ActivityProbeHttp
32-
>>> probe = V1ActivityProbeHttp(
33-
>>> path="/api/status",
34-
>>> port=8888
35-
>>> )
36-
```
37-
38-
## Fields
39-
40-
### path
41-
42-
The HTTP path to the activity status endpoint. For Jupyter notebooks,
43-
this is typically `/api/status` which returns information about
44-
last activity, active kernels, and connections.
45-
46-
```yaml
47-
>>> probe:
48-
>>> http:
49-
>>> path: "/api/status"
50-
```
51-
52-
### port
53-
54-
The port number where the service is listening. For Jupyter notebooks,
55-
this is typically 8888.
56-
57-
```yaml
58-
>>> probe:
59-
>>> http:
60-
>>> port: 8888
61-
```
62-
"""
63-
64-
path: Optional[str] = None
65-
port: Optional[int] = None
66-
67-
68-
class V1ActivityProbeExec(BaseSchemaModel):
69-
"""Command-based activity probe configuration for detecting service activity.
70-
71-
Used with service culling to check for activity by executing a custom command.
72-
The command should return exit code 0 if there was activity, or exit code 1 if idle.
73-
74-
Args:
75-
command: List[str], optional - The command to execute for checking activity
76-
77-
## YAML usage
78-
79-
```yaml
80-
>>> probe:
81-
>>> exec:
82-
>>> command: ["bash", "-c", "check-activity.sh"]
83-
```
84-
85-
## Python usage
86-
87-
```python
88-
>>> from polyaxon.schemas import V1ActivityProbeExec
89-
>>> probe = V1ActivityProbeExec(
90-
>>> command=["bash", "-c", "check-activity.sh"]
91-
>>> )
92-
```
93-
94-
## Fields
95-
96-
### command
97-
98-
The command to execute inside the container to check for activity.
99-
The command should return:
100-
- Exit code 0: Activity detected (service is active)
101-
- Exit code 1: No activity detected (service is idle)
102-
103-
The command is executed directly (not in a shell) unless you explicitly
104-
invoke a shell as shown in the example.
105-
106-
```yaml
107-
>>> probe:
108-
>>> exec:
109-
>>> command: ["bash", "-c", "test -f /tmp/activity && exit 0 || exit 1"]
110-
```
111-
"""
112-
113-
command: Optional[List[str]] = None
114-
115-
11615
class V1ActivityProbe(BaseSchemaModel):
11716
"""Activity probe configuration for detecting service activity during culling checks.
11817
@@ -178,8 +77,21 @@ class V1ActivityProbe(BaseSchemaModel):
17877
```
17978
"""
18079

181-
var_exec: Optional[V1ActivityProbeExec] = Field(None, alias="exec")
182-
http: Optional[V1ActivityProbeHttp] = None
80+
_IDENTIFIER = "probe"
81+
_SWAGGER_FIELDS = [
82+
"exec",
83+
"http",
84+
]
85+
var_exec: Optional[k8s_schemas.V1ExecAction] = Field(None, alias="exec")
86+
http: Optional[k8s_schemas.V1HTTPGetAction] = None
87+
88+
@field_validator("var_exec", **validation_always, **validation_before)
89+
def validate_var_exec(cls, v):
90+
return k8s_validation.validate_k8s_exec_action(v)
91+
92+
@field_validator("http", **validation_always, **validation_before)
93+
def validate_http(cls, v):
94+
return k8s_validation.validate_k8s_http_get_action(v)
18395

18496

18597
class V1Culling(BaseSchemaModel):
@@ -244,6 +156,7 @@ class V1Termination(BaseSchemaModel):
244156
timeout: int, optional
245157
culling: V1Culling, optional
246158
probe: V1ActivityProbe, optional
159+
pod_failure_policy: V1PodFailurePolicy, optional
247160
248161
## YAML usage
249162
@@ -258,6 +171,11 @@ class V1Termination(BaseSchemaModel):
258171
>>> http:
259172
>>> path: "/api/status"
260173
>>> port: 8888
174+
>>> podFailurePolicy:
175+
>>> rules:
176+
>>> - action: Ignore
177+
>>> onPodConditions:
178+
>>> - type: DisruptionTarget
261179
```
262180
263181
## Python usage
@@ -399,12 +317,59 @@ class V1Termination(BaseSchemaModel):
399317
400318
See [services timeout preset documentation](/docs/core/scheduling-presets/services-timeout/)
401319
for detailed examples and use cases.
320+
321+
### podFailurePolicy
322+
323+
> **Note**: Available from v2.13. Requires Kubernetes v1.25+.
324+
325+
Pod failure policy configuration that defines fine-grained rules for how pod failures
326+
should be handled. This feature allows you to:
327+
- Fail jobs immediately on certain exit codes (non-retriable errors)
328+
- Ignore failures due to involuntary disruptions (preemption, eviction)
329+
- Control which failures count towards the backoff limit
330+
331+
```yaml
332+
>>> termination:
333+
>>> maxRetries: 3
334+
>>> podFailurePolicy:
335+
>>> rules:
336+
>>> # Fail immediately on exit code 42 (non-retriable error)
337+
>>> - action: FailJob
338+
>>> onExitCodes:
339+
>>> containerName: main
340+
>>> operator: In
341+
>>> values: [42]
342+
>>> # Ignore pod disruptions (preemption, eviction)
343+
>>> - action: Ignore
344+
>>> onPodConditions:
345+
>>> - type: DisruptionTarget
346+
```
347+
348+
Available actions:
349+
- `FailJob`: Mark the job as failed immediately without further retries
350+
- `Ignore`: Don't count this failure towards the backoff limit
351+
- `Count`: Count towards backoff limit (default behavior)
352+
- `FailIndex`: Fail the index for indexed jobs
353+
354+
See [Kubernetes Pod Failure Policy](https://kubernetes.io/docs/tasks/job/pod-failure-policy/)
355+
for more details.
402356
"""
403357

404358
_IDENTIFIER = "termination"
359+
_SWAGGER_FIELDS = [
360+
"podFailurePolicy",
361+
]
362+
_CUSTOM_DUMP_FIELDS = {"probe"}
405363

406364
max_retries: Optional[IntOrRef] = Field(alias="maxRetries", default=None)
407365
ttl: Optional[IntOrRef] = None
408366
timeout: Optional[IntOrRef] = None
409367
culling: Optional[V1Culling] = None
410368
probe: Optional[V1ActivityProbe] = None
369+
pod_failure_policy: Optional[Union[k8s_schemas.V1PodFailurePolicy, RefField]] = (
370+
Field(None, alias="podFailurePolicy")
371+
)
372+
373+
@field_validator("pod_failure_policy", **validation_always, **validation_before)
374+
def validate_pod_failure_policy(cls, v):
375+
return k8s_validation.validate_k8s_pod_failure_policy(v)

cli/polyaxon/_k8s/custom_resources/setter.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ def set_termination(custom_object: Dict, termination: V1Termination) -> Dict:
3434
http_spec["port"] = termination.probe.http.port
3535
probe_spec["http"] = http_spec
3636
termination_spec["probe"] = probe_spec
37+
if termination.pod_failure_policy:
38+
termination_spec["podFailurePolicy"] = termination.pod_failure_policy.to_dict()
3739

3840
custom_object["termination"] = termination_spec
3941
return custom_object

cli/polyaxon/_k8s/k8s_schemas.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,6 @@
2828
V1SecretVolumeSource = client.V1SecretVolumeSource
2929
V1ConfigMapVolumeSource = client.V1ConfigMapVolumeSource
3030
V1LocalObjectReference = client.V1LocalObjectReference
31+
V1PodFailurePolicy = client.V1PodFailurePolicy
32+
V1ExecAction = client.V1ExecAction
33+
V1HTTPGetAction = client.V1HTTPGetAction

cli/polyaxon/_k8s/k8s_validation.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,3 +149,21 @@ def validate_k8s_local_object_reference(
149149
value: Optional[Union[k8s_schemas.V1LocalObjectReference, Dict]],
150150
):
151151
return _validate_schema(value, k8s_schemas.V1LocalObjectReference)
152+
153+
154+
def validate_k8s_pod_failure_policy(
155+
value: Optional[Union[k8s_schemas.V1PodFailurePolicy, Dict]],
156+
):
157+
return _validate_schema(value, k8s_schemas.V1PodFailurePolicy)
158+
159+
160+
def validate_k8s_exec_action(
161+
value: Optional[Union[k8s_schemas.V1ExecAction, Dict]],
162+
):
163+
return _validate_schema(value, k8s_schemas.V1ExecAction)
164+
165+
166+
def validate_k8s_http_get_action(
167+
value: Optional[Union[k8s_schemas.V1HTTPGetAction, Dict]],
168+
):
169+
return _validate_schema(value, k8s_schemas.V1HTTPGetAction)

0 commit comments

Comments
 (0)