diff --git a/src/dirac_cwl/submission_models.py b/src/dirac_cwl/submission_models.py index 3033629..53fc620 100644 --- a/src/dirac_cwl/submission_models.py +++ b/src/dirac_cwl/submission_models.py @@ -13,6 +13,7 @@ from cwl_utils.parser.cwl_v1_2 import ( CommandLineTool, ExpressionTool, + ResourceRequirement, Workflow, ) from pydantic import BaseModel, ConfigDict, field_serializer, model_validator @@ -55,6 +56,23 @@ class BaseJobModel(BaseModel): task: CommandLineTool | Workflow | ExpressionTool + @model_validator(mode="before") + def validate_job(cls, values): + """Validate job workflow. + + :param values: Model values dictionary. + :return: Validated values dictionary. + """ + task = values.get("task") + + # ResourceRequirement validation + validate_resource_requirements(task) + + # Hints validation + ExecutionHooksHint.from_cwl(task), SchedulingHint.from_cwl(task) + + return values + @field_serializer("task") def serialize_task(self, value): """Serialize CWL task object to dictionary. @@ -68,17 +86,6 @@ def serialize_task(self, value): else: raise TypeError(f"Cannot serialize type {type(value)}") - @model_validator(mode="before") - def validate_hints(cls, values): - """Validate execution hooks and scheduling hints in the task. - - :param values: Model values dictionary. - :return: Validated values dictionary. - """ - task = values.get("task") - ExecutionHooksHint.from_cwl(task), SchedulingHint.from_cwl(task) - return values - class JobSubmissionModel(BaseJobModel): """Job definition sent to the router.""" @@ -119,14 +126,20 @@ def serialize_task(self, value): raise TypeError(f"Cannot serialize type {type(value)}") @model_validator(mode="before") - def validate_hints(cls, values): - """Validate transformation execution hooks and scheduling hints in the task. + def validate_transformation(cls, values): + """Validate transformation workflow. :param values: Model values dictionary. :return: Validated values dictionary. """ task = values.get("task") + + # ResourceRequirement validation + validate_resource_requirements(task) + + # Hints validation TransformationExecutionHooksHint.from_cwl(task), SchedulingHint.from_cwl(task) + return values @@ -155,3 +168,56 @@ def serialize_task(self, value): return save(value) else: raise TypeError(f"Cannot serialize type {type(value)}") + + @model_validator(mode="before") + def validate_production(cls, values): + """Validate production workflow.""" + task = values.get("task") + + # ResourceRequirement validation + validate_resource_requirements(task) + + return values + + +# ----------------------------------------------------------------------------- +# ResourceRequirement validations +# ----------------------------------------------------------------------------- +# Temporary code, waiting on cwltool PR: https://github.com/common-workflow-language/cwltool/pull/2179. + + +def validate_resource_requirements(task: CommandLineTool | Workflow | ExpressionTool): + """Validate ResourceRequirements of a task recursively. + + :param task: The task to validate. + :raises ValueError: If any ResourceRequirement has min > max. + """ + # Validate task-level requirements + for req in getattr(task, "requirements", None) or []: + if isinstance(req, ResourceRequirement): + _validate_min_max(req) + + # Recurse into workflow steps + if isinstance(task, Workflow): + for step in task.steps or []: + for req in getattr(step, "requirements", None) or []: + if isinstance(req, ResourceRequirement): + _validate_min_max(req) + if step.run: + validate_resource_requirements(step.run) + + +def _validate_min_max(req: ResourceRequirement): + """Check that min does not exceed max for any resource. + + :param req: The ResourceRequirement to validate. + :raises ValueError: If min > max for any resource. + """ + for name, lo, hi in [ + ("cores", req.coresMin, req.coresMax), + ("ram", req.ramMin, req.ramMax), + ("tmpdir", req.tmpdirMin, req.tmpdirMax), + ("outdir", req.outdirMin, req.outdirMax), + ]: + if lo and hi and lo > hi: + raise ValueError(f"{name}Min ({lo}) exceeds {name}Max ({hi})") diff --git a/test/test_resource_requirements.py b/test/test_resource_requirements.py new file mode 100644 index 0000000..2465e9b --- /dev/null +++ b/test/test_resource_requirements.py @@ -0,0 +1,143 @@ +"""Integration tests for CWL Resource Requirements validation.""" + +from typing import Optional + +import pytest +from cwl_utils.parser.cwl_v1_2 import CommandLineTool, ExpressionTool, ResourceRequirement, Workflow, WorkflowStep + +from dirac_cwl.submission_models import JobSubmissionModel, ProductionSubmissionModel, TransformationSubmissionModel + +# ----------------------------------------------------------------------------- +# Helper functions +# ----------------------------------------------------------------------------- + + +def create_commandlinetool( + requirements: Optional[list] = None, + inputs: Optional[list] = None, + outputs: Optional[list] = None, +) -> CommandLineTool: + """Create a CommandLineTool with the given requirements, inputs, and outputs.""" + return CommandLineTool( + requirements=requirements or [], + inputs=inputs or [], + outputs=outputs or [], + ) + + +def create_workflow( + requirements: Optional[list] = None, + steps: Optional[list[WorkflowStep]] = None, + inputs: Optional[list] = None, + outputs: Optional[list] = None, +) -> Workflow: + """Create a Workflow with the given requirements, steps, inputs, and outputs.""" + return Workflow( + requirements=requirements or [], + steps=steps or [], + inputs=inputs or [], + outputs=outputs or [], + ) + + +def create_step( + requirements: Optional[list] = None, + run: Optional[CommandLineTool | Workflow] = None, + in_: Optional[list] = None, + out: Optional[list] = None, +) -> WorkflowStep: + """Create a WorkflowStep with the given requirements, run, inputs, and outputs.""" + return WorkflowStep( + requirements=requirements or [], + run=run, + in_=in_ or [], + out=out or [], + ) + + +def create_expressiontool( + requirements: Optional[list] = None, + inputs: Optional[list] = None, + outputs: Optional[list] = None, +) -> ExpressionTool: + """Create an ExpressionTool with the given requirements, inputs, and outputs.""" + return ExpressionTool( + expression="", + requirements=requirements or [], + inputs=inputs or [], + outputs=outputs or [], + ) + + +def assert_submission_fails(task): + """Assert that submission fails with ValueError for Job and Transformation models with bad resource requirements. + + :param: CWL task to submit (Workflow, WorkflowStep, CommandLineTool, etc.) + """ + with pytest.raises(ValueError): + JobSubmissionModel(task=task) + with pytest.raises(ValueError): + TransformationSubmissionModel(task=task) + with pytest.raises(ValueError): + ProductionSubmissionModel(task=task) + + +# ----------------------------------------------------------------------------- +# Resource requirements tests +# ----------------------------------------------------------------------------- +@pytest.mark.parametrize( + "bad_min_max_reqs", + [ + ResourceRequirement(coresMin=4, coresMax=2), + ResourceRequirement(ramMin=2048, ramMax=1024), + ResourceRequirement(tmpdirMin=1024, tmpdirMax=512), + ResourceRequirement(outdirMin=512, outdirMax=256), + ], +) +def test_bad_min_max_resource_reqs(bad_min_max_reqs): + """Test invalid min/max resource requirements in CWL objects.""" + # CommandlineTool with bad minmax reqs + clt = create_commandlinetool(requirements=[bad_min_max_reqs]) + assert_submission_fails(clt) + + # ExpressionTool with bad minmax reqs + expression_tool = create_expressiontool(requirements=[bad_min_max_reqs]) + assert_submission_fails(expression_tool) + + # WorkflowStep.run with bad minmax reqs + step_bad_run = create_step(run=clt) + workflow = create_workflow(steps=[step_bad_run]) + assert_submission_fails(workflow) + + step_bad_run = create_step(run=expression_tool) + workflow = create_workflow(steps=[step_bad_run]) + assert_submission_fails(workflow) + + # WorkflowStep with bad minmax reqs + clt = create_commandlinetool() + step = create_step(run=clt, requirements=[bad_min_max_reqs]) + workflow = create_workflow(steps=[step]) + assert_submission_fails(workflow) + + expression_tool = create_commandlinetool() + step = create_step(run=expression_tool, requirements=[bad_min_max_reqs]) + workflow = create_workflow(steps=[step]) + assert_submission_fails(workflow) + + # Workflow with bad minmax reqs + workflow = create_workflow(requirements=[bad_min_max_reqs]) + assert_submission_fails(workflow) + + # NestedWorkflow with bad minmax reqs + nest_workflow = create_workflow(requirements=[bad_min_max_reqs]) + step = create_step(run=nest_workflow) + workflow = create_workflow(steps=[step]) + assert_submission_fails(workflow) + + # DeepNestedWorkflow with bad minmax reqs + deep_workflow = create_workflow(requirements=[bad_min_max_reqs]) + deep_step = create_step(run=deep_workflow) + nest_workflow = create_workflow(steps=[deep_step]) + step = create_step(run=nest_workflow) + workflow = create_workflow(steps=[step]) + assert_submission_fails(workflow) diff --git a/test/test_workflows.py b/test/test_workflows.py index 0963908..22d8226 100644 --- a/test/test_workflows.py +++ b/test/test_workflows.py @@ -186,7 +186,7 @@ def test_run_job_success(cli_runner, cleanup, pi_test_files, cwl_file, inputs): ), # The description file points to itself (another circular dependency) ( - "test/workflows/bad_references/reference_circular1.cwl", + "test/workflows/bad_references/reference_itself.cwl", [], "Recursingintostep", ), @@ -463,7 +463,7 @@ def run_and_capture(): ), # The description file points to itself (another circular dependency) ( - "test/workflows/bad_references/reference_circular1.cwl", + "test/workflows/bad_references/reference_itself.cwl", "Recursingintostep", ), ],