Skip to content

Commit db72f9b

Browse files
authored
feat: resource requirements validation (#40)
* chore: created CWL examples * chore: added tests * feat: added ResourceRequirement validator * fix: remove test print and pre-commit check * fix: modified exception handling * feat: added Production case * feat: added nested_wf case * fix: pre-commit * fix: refactored code * fix: refactored tests * fix: fixed duplicate test * feat: added Production ResReq check in SubmissionModel * feat: added ResourceRequirement validation in SubmissionModel * fix: rebase and fixed tests * docs: updated docstring * chore: added validation to transformations * fix: fixed tests and added docstring * fix: added ResReq validation for Production * fix: removed validation case * fix: fixed tests * fix: pixi.lock * fix: pixi.lock * fix: fixed mypy * fix: added ExpressionTool and updated code
1 parent 4cf8c29 commit db72f9b

3 files changed

Lines changed: 224 additions & 15 deletions

File tree

src/dirac_cwl/submission_models.py

Lines changed: 79 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from cwl_utils.parser.cwl_v1_2 import (
1414
CommandLineTool,
1515
ExpressionTool,
16+
ResourceRequirement,
1617
Workflow,
1718
)
1819
from pydantic import BaseModel, ConfigDict, field_serializer, model_validator
@@ -55,6 +56,23 @@ class BaseJobModel(BaseModel):
5556

5657
task: CommandLineTool | Workflow | ExpressionTool
5758

59+
@model_validator(mode="before")
60+
def validate_job(cls, values):
61+
"""Validate job workflow.
62+
63+
:param values: Model values dictionary.
64+
:return: Validated values dictionary.
65+
"""
66+
task = values.get("task")
67+
68+
# ResourceRequirement validation
69+
validate_resource_requirements(task)
70+
71+
# Hints validation
72+
ExecutionHooksHint.from_cwl(task), SchedulingHint.from_cwl(task)
73+
74+
return values
75+
5876
@field_serializer("task")
5977
def serialize_task(self, value):
6078
"""Serialize CWL task object to dictionary.
@@ -68,17 +86,6 @@ def serialize_task(self, value):
6886
else:
6987
raise TypeError(f"Cannot serialize type {type(value)}")
7088

71-
@model_validator(mode="before")
72-
def validate_hints(cls, values):
73-
"""Validate execution hooks and scheduling hints in the task.
74-
75-
:param values: Model values dictionary.
76-
:return: Validated values dictionary.
77-
"""
78-
task = values.get("task")
79-
ExecutionHooksHint.from_cwl(task), SchedulingHint.from_cwl(task)
80-
return values
81-
8289

8390
class JobSubmissionModel(BaseJobModel):
8491
"""Job definition sent to the router."""
@@ -119,14 +126,20 @@ def serialize_task(self, value):
119126
raise TypeError(f"Cannot serialize type {type(value)}")
120127

121128
@model_validator(mode="before")
122-
def validate_hints(cls, values):
123-
"""Validate transformation execution hooks and scheduling hints in the task.
129+
def validate_transformation(cls, values):
130+
"""Validate transformation workflow.
124131
125132
:param values: Model values dictionary.
126133
:return: Validated values dictionary.
127134
"""
128135
task = values.get("task")
136+
137+
# ResourceRequirement validation
138+
validate_resource_requirements(task)
139+
140+
# Hints validation
129141
TransformationExecutionHooksHint.from_cwl(task), SchedulingHint.from_cwl(task)
142+
130143
return values
131144

132145

@@ -155,3 +168,56 @@ def serialize_task(self, value):
155168
return save(value)
156169
else:
157170
raise TypeError(f"Cannot serialize type {type(value)}")
171+
172+
@model_validator(mode="before")
173+
def validate_production(cls, values):
174+
"""Validate production workflow."""
175+
task = values.get("task")
176+
177+
# ResourceRequirement validation
178+
validate_resource_requirements(task)
179+
180+
return values
181+
182+
183+
# -----------------------------------------------------------------------------
184+
# ResourceRequirement validations
185+
# -----------------------------------------------------------------------------
186+
# Temporary code, waiting on cwltool PR: https://github.com/common-workflow-language/cwltool/pull/2179.
187+
188+
189+
def validate_resource_requirements(task: CommandLineTool | Workflow | ExpressionTool):
190+
"""Validate ResourceRequirements of a task recursively.
191+
192+
:param task: The task to validate.
193+
:raises ValueError: If any ResourceRequirement has min > max.
194+
"""
195+
# Validate task-level requirements
196+
for req in getattr(task, "requirements", None) or []:
197+
if isinstance(req, ResourceRequirement):
198+
_validate_min_max(req)
199+
200+
# Recurse into workflow steps
201+
if isinstance(task, Workflow):
202+
for step in task.steps or []:
203+
for req in getattr(step, "requirements", None) or []:
204+
if isinstance(req, ResourceRequirement):
205+
_validate_min_max(req)
206+
if step.run:
207+
validate_resource_requirements(step.run)
208+
209+
210+
def _validate_min_max(req: ResourceRequirement):
211+
"""Check that min does not exceed max for any resource.
212+
213+
:param req: The ResourceRequirement to validate.
214+
:raises ValueError: If min > max for any resource.
215+
"""
216+
for name, lo, hi in [
217+
("cores", req.coresMin, req.coresMax),
218+
("ram", req.ramMin, req.ramMax),
219+
("tmpdir", req.tmpdirMin, req.tmpdirMax),
220+
("outdir", req.outdirMin, req.outdirMax),
221+
]:
222+
if lo and hi and lo > hi:
223+
raise ValueError(f"{name}Min ({lo}) exceeds {name}Max ({hi})")

test/test_resource_requirements.py

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
"""Integration tests for CWL Resource Requirements validation."""
2+
3+
from typing import Optional
4+
5+
import pytest
6+
from cwl_utils.parser.cwl_v1_2 import CommandLineTool, ExpressionTool, ResourceRequirement, Workflow, WorkflowStep
7+
8+
from dirac_cwl.submission_models import JobSubmissionModel, ProductionSubmissionModel, TransformationSubmissionModel
9+
10+
# -----------------------------------------------------------------------------
11+
# Helper functions
12+
# -----------------------------------------------------------------------------
13+
14+
15+
def create_commandlinetool(
16+
requirements: Optional[list] = None,
17+
inputs: Optional[list] = None,
18+
outputs: Optional[list] = None,
19+
) -> CommandLineTool:
20+
"""Create a CommandLineTool with the given requirements, inputs, and outputs."""
21+
return CommandLineTool(
22+
requirements=requirements or [],
23+
inputs=inputs or [],
24+
outputs=outputs or [],
25+
)
26+
27+
28+
def create_workflow(
29+
requirements: Optional[list] = None,
30+
steps: Optional[list[WorkflowStep]] = None,
31+
inputs: Optional[list] = None,
32+
outputs: Optional[list] = None,
33+
) -> Workflow:
34+
"""Create a Workflow with the given requirements, steps, inputs, and outputs."""
35+
return Workflow(
36+
requirements=requirements or [],
37+
steps=steps or [],
38+
inputs=inputs or [],
39+
outputs=outputs or [],
40+
)
41+
42+
43+
def create_step(
44+
requirements: Optional[list] = None,
45+
run: Optional[CommandLineTool | Workflow] = None,
46+
in_: Optional[list] = None,
47+
out: Optional[list] = None,
48+
) -> WorkflowStep:
49+
"""Create a WorkflowStep with the given requirements, run, inputs, and outputs."""
50+
return WorkflowStep(
51+
requirements=requirements or [],
52+
run=run,
53+
in_=in_ or [],
54+
out=out or [],
55+
)
56+
57+
58+
def create_expressiontool(
59+
requirements: Optional[list] = None,
60+
inputs: Optional[list] = None,
61+
outputs: Optional[list] = None,
62+
) -> ExpressionTool:
63+
"""Create an ExpressionTool with the given requirements, inputs, and outputs."""
64+
return ExpressionTool(
65+
expression="",
66+
requirements=requirements or [],
67+
inputs=inputs or [],
68+
outputs=outputs or [],
69+
)
70+
71+
72+
def assert_submission_fails(task):
73+
"""Assert that submission fails with ValueError for Job and Transformation models with bad resource requirements.
74+
75+
:param: CWL task to submit (Workflow, WorkflowStep, CommandLineTool, etc.)
76+
"""
77+
with pytest.raises(ValueError):
78+
JobSubmissionModel(task=task)
79+
with pytest.raises(ValueError):
80+
TransformationSubmissionModel(task=task)
81+
with pytest.raises(ValueError):
82+
ProductionSubmissionModel(task=task)
83+
84+
85+
# -----------------------------------------------------------------------------
86+
# Resource requirements tests
87+
# -----------------------------------------------------------------------------
88+
@pytest.mark.parametrize(
89+
"bad_min_max_reqs",
90+
[
91+
ResourceRequirement(coresMin=4, coresMax=2),
92+
ResourceRequirement(ramMin=2048, ramMax=1024),
93+
ResourceRequirement(tmpdirMin=1024, tmpdirMax=512),
94+
ResourceRequirement(outdirMin=512, outdirMax=256),
95+
],
96+
)
97+
def test_bad_min_max_resource_reqs(bad_min_max_reqs):
98+
"""Test invalid min/max resource requirements in CWL objects."""
99+
# CommandlineTool with bad minmax reqs
100+
clt = create_commandlinetool(requirements=[bad_min_max_reqs])
101+
assert_submission_fails(clt)
102+
103+
# ExpressionTool with bad minmax reqs
104+
expression_tool = create_expressiontool(requirements=[bad_min_max_reqs])
105+
assert_submission_fails(expression_tool)
106+
107+
# WorkflowStep.run with bad minmax reqs
108+
step_bad_run = create_step(run=clt)
109+
workflow = create_workflow(steps=[step_bad_run])
110+
assert_submission_fails(workflow)
111+
112+
step_bad_run = create_step(run=expression_tool)
113+
workflow = create_workflow(steps=[step_bad_run])
114+
assert_submission_fails(workflow)
115+
116+
# WorkflowStep with bad minmax reqs
117+
clt = create_commandlinetool()
118+
step = create_step(run=clt, requirements=[bad_min_max_reqs])
119+
workflow = create_workflow(steps=[step])
120+
assert_submission_fails(workflow)
121+
122+
expression_tool = create_commandlinetool()
123+
step = create_step(run=expression_tool, requirements=[bad_min_max_reqs])
124+
workflow = create_workflow(steps=[step])
125+
assert_submission_fails(workflow)
126+
127+
# Workflow with bad minmax reqs
128+
workflow = create_workflow(requirements=[bad_min_max_reqs])
129+
assert_submission_fails(workflow)
130+
131+
# NestedWorkflow with bad minmax reqs
132+
nest_workflow = create_workflow(requirements=[bad_min_max_reqs])
133+
step = create_step(run=nest_workflow)
134+
workflow = create_workflow(steps=[step])
135+
assert_submission_fails(workflow)
136+
137+
# DeepNestedWorkflow with bad minmax reqs
138+
deep_workflow = create_workflow(requirements=[bad_min_max_reqs])
139+
deep_step = create_step(run=deep_workflow)
140+
nest_workflow = create_workflow(steps=[deep_step])
141+
step = create_step(run=nest_workflow)
142+
workflow = create_workflow(steps=[step])
143+
assert_submission_fails(workflow)

test/test_workflows.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ def test_run_job_success(cli_runner, cleanup, pi_test_files, cwl_file, inputs):
186186
),
187187
# The description file points to itself (another circular dependency)
188188
(
189-
"test/workflows/bad_references/reference_circular1.cwl",
189+
"test/workflows/bad_references/reference_itself.cwl",
190190
[],
191191
"Recursingintostep",
192192
),
@@ -463,7 +463,7 @@ def run_and_capture():
463463
),
464464
# The description file points to itself (another circular dependency)
465465
(
466-
"test/workflows/bad_references/reference_circular1.cwl",
466+
"test/workflows/bad_references/reference_itself.cwl",
467467
"Recursingintostep",
468468
),
469469
],

0 commit comments

Comments
 (0)