Skip to content

Commit f9ab30b

Browse files
committed
test: add integration tests for CWL submission pipeline
1 parent 1355892 commit f9ab30b

1 file changed

Lines changed: 200 additions & 0 deletions

File tree

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
from __future__ import annotations
2+
3+
from pathlib import Path
4+
from unittest.mock import AsyncMock, MagicMock, patch
5+
6+
import yaml
7+
8+
from diracx.cli._submission.pipeline import submit_cwl
9+
10+
11+
@staticmethod
12+
def _cwl_workflow(tmp_path: Path) -> Path:
13+
"""CWL with File input + string input."""
14+
cwl = {
15+
"cwlVersion": "v1.2",
16+
"class": "CommandLineTool",
17+
"label": "integration-test",
18+
"hints": [
19+
{
20+
"class": "dirac:Job",
21+
"schema_version": "1.0",
22+
"type": "User",
23+
"input_sandbox": [{"source": "script"}],
24+
}
25+
],
26+
"inputs": [
27+
{"id": "script", "type": "File"},
28+
{"id": "message", "type": "string"},
29+
],
30+
"outputs": [],
31+
"baseCommand": ["python"],
32+
"$namespaces": {"dirac": "https://diracgrid.org/cwl#"},
33+
}
34+
f = tmp_path / "workflow.cwl"
35+
f.write_text(yaml.dump(cwl))
36+
return f
37+
38+
39+
class TestIntegration:
40+
async def test_submit_with_local_file_sandbox(self, tmp_path):
41+
"""Submit a job with a local File input → should upload sandbox and rewrite."""
42+
workflow = _cwl_workflow(tmp_path)
43+
local_script = tmp_path / "run.py"
44+
local_script.write_text("print('hello')")
45+
46+
inputs_file = tmp_path / "inputs.yaml"
47+
inputs_file.write_text(
48+
yaml.dump(
49+
{
50+
"script": {"class": "File", "path": str(local_script)},
51+
"message": "integration test",
52+
}
53+
)
54+
)
55+
56+
mock_client = AsyncMock()
57+
mock_client.jobs.submit_cwl_jobs = AsyncMock(
58+
return_value=[
59+
MagicMock(
60+
job_id=2001, status="Submitting", minor_status="Initializing Job"
61+
)
62+
]
63+
)
64+
65+
fake_pfn = "SB:SandboxSE|/S3/bucket/sha256:abc123.tar.zst"
66+
67+
with (
68+
patch(
69+
"diracx.cli._submission.pipeline.AsyncDiracClient"
70+
) as mock_client_cls,
71+
patch(
72+
"diracx.api.jobs.create_sandbox",
73+
new_callable=AsyncMock,
74+
return_value=fake_pfn,
75+
),
76+
):
77+
mock_client_cls.return_value.__aenter__ = AsyncMock(
78+
return_value=mock_client
79+
)
80+
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
81+
82+
results = await submit_cwl(
83+
workflow=workflow,
84+
input_files=[inputs_file],
85+
cli_args=[],
86+
range_spec=None,
87+
yes=True,
88+
)
89+
90+
assert len(results) == 1
91+
call_body = mock_client.jobs.submit_cwl_jobs.call_args[0][0]
92+
submitted_inputs = call_body.inputs[0]
93+
assert submitted_inputs["script"]["path"] == fake_pfn
94+
assert submitted_inputs["message"] == "integration test"
95+
96+
async def test_submit_with_lfn_no_sandbox(self, tmp_path):
97+
"""Submit with LFN input → no sandbox upload."""
98+
workflow = _cwl_workflow(tmp_path)
99+
100+
inputs_file = tmp_path / "inputs.yaml"
101+
inputs_file.write_text(
102+
yaml.dump(
103+
{
104+
"script": {"class": "File", "path": "LFN:/lhcb/scripts/run.py"},
105+
"message": "lfn test",
106+
}
107+
)
108+
)
109+
110+
mock_client = AsyncMock()
111+
mock_client.jobs.submit_cwl_jobs = AsyncMock(
112+
return_value=[
113+
MagicMock(
114+
job_id=2002, status="Submitting", minor_status="Initializing Job"
115+
)
116+
]
117+
)
118+
119+
with patch(
120+
"diracx.cli._submission.pipeline.AsyncDiracClient"
121+
) as mock_client_cls:
122+
mock_client_cls.return_value.__aenter__ = AsyncMock(
123+
return_value=mock_client
124+
)
125+
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
126+
127+
await submit_cwl(
128+
workflow=workflow,
129+
input_files=[inputs_file],
130+
cli_args=[],
131+
range_spec=None,
132+
yes=True,
133+
)
134+
135+
call_body = mock_client.jobs.submit_cwl_jobs.call_args[0][0]
136+
submitted_inputs = call_body.inputs[0]
137+
assert submitted_inputs["script"]["path"] == "LFN:/lhcb/scripts/run.py"
138+
139+
async def test_multi_doc_yaml_parametric(self, tmp_path):
140+
"""Multi-doc YAML creates multiple jobs sharing one sandbox."""
141+
workflow = _cwl_workflow(tmp_path)
142+
local_script = tmp_path / "run.py"
143+
local_script.write_text("print('hello')")
144+
145+
sweep_file = tmp_path / "sweep.yaml"
146+
doc1 = {
147+
"script": {"class": "File", "path": str(local_script)},
148+
"message": "job 1",
149+
}
150+
doc2 = {
151+
"script": {"class": "File", "path": str(local_script)},
152+
"message": "job 2",
153+
}
154+
sweep_file.write_text(yaml.dump(doc1) + "---\n" + yaml.dump(doc2))
155+
156+
mock_client = AsyncMock()
157+
mock_client.jobs.submit_cwl_jobs = AsyncMock(
158+
return_value=[
159+
MagicMock(
160+
job_id=3001, status="Submitting", minor_status="Initializing Job"
161+
),
162+
MagicMock(
163+
job_id=3002, status="Submitting", minor_status="Initializing Job"
164+
),
165+
]
166+
)
167+
168+
fake_pfn = "SB:SandboxSE|/S3/bucket/sha256:shared.tar.zst"
169+
170+
with (
171+
patch(
172+
"diracx.cli._submission.pipeline.AsyncDiracClient"
173+
) as mock_client_cls,
174+
patch(
175+
"diracx.api.jobs.create_sandbox",
176+
new_callable=AsyncMock,
177+
return_value=fake_pfn,
178+
) as mock_create_sb,
179+
):
180+
mock_client_cls.return_value.__aenter__ = AsyncMock(
181+
return_value=mock_client
182+
)
183+
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
184+
185+
await submit_cwl(
186+
workflow=workflow,
187+
input_files=[sweep_file],
188+
cli_args=[],
189+
range_spec=None,
190+
yes=True,
191+
)
192+
193+
# Sandbox uploaded once (both jobs share same local file)
194+
mock_create_sb.assert_called_once()
195+
196+
# Two jobs submitted
197+
call_body = mock_client.jobs.submit_cwl_jobs.call_args[0][0]
198+
assert len(call_body.inputs) == 2
199+
assert call_body.inputs[0]["message"] == "job 1"
200+
assert call_body.inputs[1]["message"] == "job 2"

0 commit comments

Comments
 (0)