Skip to content

Commit 3227b4a

Browse files
authored
feat: get the slurm stuff working (#56)
* feat: get the slurm stuff working * wip: refactor execution and slurm stuff * wip: fixing up paths in slurm handling * fix: tweak execution.py * fix: more tweaking execution * wip: tweak run script file name * Added site config to rail project yaml * wip: fixing slurm_options merging * wip: fixing slurm_options merging
1 parent d894471 commit 3227b4a

4 files changed

Lines changed: 272 additions & 120 deletions

File tree

src/rail/cli/rail_project/project_commands.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -187,17 +187,6 @@ def subsample_command(
187187
return ok
188188

189189

190-
@project_cli.command(name="sbatch")
191-
@project_options.run_mode()
192-
@project_options.site()
193-
@project_options.args()
194-
def sbatch_command(
195-
run_mode: project_options.RunMode, site: str, args: list[str]
196-
) -> int: # pragma: no cover
197-
"""Wrap a rail_pipe command with site-based arguements for slurm"""
198-
return execution.sbatch_wrap(run_mode, site, args)
199-
200-
201190
@project_cli.command(name="reduce")
202191
@project_options.config_file()
203192
@project_options.run_mode()

src/rail/projects/execution.py

Lines changed: 232 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
import enum
44
import os
55
import subprocess
6+
from pathlib import Path
67
import time
8+
from typing import Any
79

810

911
class RunMode(enum.Enum):
@@ -14,29 +16,44 @@ class RunMode(enum.Enum):
1416
slurm = 2
1517

1618

17-
S3DF_SLURM_OPTIONS: list[str] = [
18-
"-p",
19-
"milano",
20-
"--account",
21-
"rubin:commissioning@milano",
22-
"--mem",
23-
"16G",
24-
"--parsable",
25-
]
26-
PERLMUTTER_SLURM_OPTIONS: list[str] = [
27-
"--account",
28-
"m1727",
29-
"--constraint",
30-
"cpu",
31-
"--qos",
32-
"regular",
33-
"--parsable",
34-
]
35-
36-
SLURM_OPTIONS = {
37-
"s3df": S3DF_SLURM_OPTIONS,
38-
"perlmutter": PERLMUTTER_SLURM_OPTIONS,
39-
}
19+
S3DF_SITE_CONFIG: dict[str, Any] = dict(
20+
slurm_batch_size=8,
21+
slurm_options=[
22+
"-p=milano",
23+
"--account=rubin:commissioning@milano",
24+
"--mem=16G",
25+
"--parsable",
26+
],
27+
srun_command="srun",
28+
sbatch_commands=["sbatch"],
29+
)
30+
PERLMUTTER_SITE_CONFIG: dict[str, Any] = dict(
31+
slurm_batch_size=32,
32+
slurm_options=[
33+
"--account=m1727",
34+
"--constraint=cpu",
35+
"--qos=regular",
36+
"--parsable",
37+
],
38+
srun_command="srun",
39+
sbatch_commands=["sbatch"],
40+
)
41+
TEST_SITE_CONFIG: dict[str, Any] = dict(
42+
slurm_batch_size=4,
43+
slurm_options=[
44+
"--dummy=test",
45+
],
46+
srun_command="echo 0 srun",
47+
sbatch_commands=["echo", "0", "sbatch"],
48+
)
49+
50+
DEFAULT_SITE_CONFIGS: dict[str, dict[str, Any]] = dict(
51+
test=TEST_SITE_CONFIG,
52+
perlmutter=PERLMUTTER_SITE_CONFIG,
53+
s3df=S3DF_SITE_CONFIG,
54+
)
55+
56+
BASH_LINE = "#!/usr/bin/bash"
4057

4158

4259
def handle_command(
@@ -86,8 +103,6 @@ def handle_command(
86103
def handle_commands(
87104
run_mode: RunMode,
88105
command_lines: list[list[str]],
89-
script_path: str | None = None,
90-
site: str | None = None,
91106
) -> int: # pragma: no cover
92107
"""Run a multiple commands in the mode requested
93108
@@ -99,95 +114,231 @@ def handle_commands(
99114
command_lines: list[list[str]]
100115
List of commands to run, each one is the list of tokens in the command line
101116
102-
script_path: str | None
103-
Path to write the slurm submit script to
104-
105117
Returns
106118
-------
107119
int:
108120
Status returned by the commands. 0 for success, exit code otherwise
109121
"""
110-
111122
if run_mode in [RunMode.dry_run, RunMode.bash]:
112123
for command_ in command_lines:
113124
retcode = handle_command(run_mode, command_)
114125
if retcode:
115126
return retcode
116127
return 0
117128

118-
# At this point we are using slurm and need a script to send to batch
119-
if script_path is None:
120-
raise ValueError(
121-
"handle_commands with run_mode == RunMode.slurm requires a path to a script to write",
122-
)
129+
raise RuntimeError(
130+
"handle_commands should only be called with run_mode in [RunMode.dry_run, RunMode.bash]",
131+
)
123132

124-
if site is None:
125-
raise ValueError(
126-
"handle_commands with run_mode == RunMode.slurm requires a site",
127-
)
128133

129-
slurm_options = SLURM_OPTIONS[site]
134+
def write_run_script(
135+
command_lines: list[list[str]],
136+
script_path: Path,
137+
) -> None: # pragma: no cover
138+
"""Write a script to run multiple commands
139+
140+
Parameters
141+
----------
142+
command_lines:
143+
List of commands to run, each one is the list of tokens in the command line
130144
145+
script_path:
146+
Path to write the script to
147+
"""
131148
try:
132149
os.makedirs(os.path.dirname(script_path))
133150
except FileExistsError:
134151
pass
135-
with open(script_path, "w", encoding="utf-8") as fout:
136-
fout.write("#!/usr/bin/bash\n\n")
137-
for command_ in command_lines:
138-
com_line = " ".join(command_)
139-
fout.write(f"{com_line}\n")
140152

141-
script_out = script_path.replace(".sh", ".out")
153+
contents = f"{BASH_LINE}\n\n"
154+
for command_ in command_lines:
155+
com_line = " ".join(command_)
156+
contents += f"{com_line}\n"
157+
contents += "echo Done!\n"
158+
script_path.write_text(contents)
159+
script_path.chmod(0o755)
142160

143-
command_line = (
144-
["srun"] + slurm_options + ["--output", script_out, "--error", script_path]
145-
)
161+
162+
def write_submit_script(
163+
scripts_in_batch: list[str],
164+
batch_submit_script: Path,
165+
slurm_options: list[str],
166+
exec_command: str = "srun",
167+
) -> None: # pragma: no cover
168+
"""Write a script to run multiple commands
169+
170+
Parameters
171+
----------
172+
scripts_in_batch:
173+
List of scripts we are going to run
174+
175+
batch_submit_script:
176+
Path to write the script to
177+
178+
slurm_options:
179+
List of options for slurm
180+
181+
exec_commands:
182+
Command used to execute commands
183+
"""
146184
try:
147-
with subprocess.Popen(
148-
command_line,
149-
stdout=subprocess.PIPE,
150-
) as srun:
151-
assert srun.stdout
152-
line = srun.stdout.read().decode().strip()
153-
ret_val = int(line.split("|")[0])
154-
except TypeError as msg:
155-
raise TypeError(f"Bad slurm submit: {msg}") from msg
185+
os.makedirs(os.path.dirname(batch_submit_script))
186+
except FileExistsError:
187+
pass
188+
189+
contents = f"{BASH_LINE}\n\n"
190+
for opt_ in slurm_options:
191+
contents += f"#SBATCH {opt_}\n"
192+
193+
for script_ in scripts_in_batch:
194+
contents += f"{exec_command} {script_}\n"
195+
196+
contents += "echo Done!\n"
197+
batch_submit_script.write_text(contents)
198+
batch_submit_script.chmod(0o755)
199+
156200

157-
return ret_val
201+
def submit_slurm_job(
202+
script_path: Path | str,
203+
sbatch_commands: list[str],
204+
) -> str:
205+
"""Submit a SLURM job and return the job ID."""
206+
result = subprocess.run(
207+
sbatch_commands + [str(script_path)],
208+
capture_output=True,
209+
text=True,
210+
check=False,
211+
)
212+
213+
if result.returncode == 0:
214+
# Parse job ID from output like "Submitted batch job 12345"
215+
job_id = result.stdout.strip().split()[-1]
216+
return job_id
217+
raise RuntimeError(f"SLURM submission failed: {result.stderr}")
158218

159219

160-
def sbatch_wrap(
161-
run_mode: RunMode, site: str, args: list[str]
220+
def handle_all_commands(
221+
run_mode: RunMode,
222+
all_commands: list[tuple[list[list[str]], str]],
223+
script_path: str | None = None,
224+
site_config: dict[str, Any] | None = None,
162225
) -> int: # pragma: no cover
163-
"""Wrap a rail_pipe command with site-based arguements
226+
"""Run all the commands in the mode requested
164227
165228
Parameters
166229
----------
167-
run_mode: RunMode
230+
run_mode:
168231
How to run the command, e.g., dry_run, bash or slurm
169232
170-
site: str
171-
Execution site, used to set sbatch options
233+
all_commands:
234+
List of commands and associated place to write scripts
172235
173-
args: list[str]
174-
Additional arguments
236+
script_path:
237+
Path to write the slurm submit script to
238+
239+
site_config:
240+
Config for site we are running at
175241
176242
Returns
177243
-------
178-
int
179-
Status. 0 for success, exit code otherwise
244+
int:
245+
Status returned by the commands. 0 for success, exit code otherwise
180246
"""
181-
try:
182-
slurm_options = SLURM_OPTIONS[site]
183-
except KeyError as msg:
184-
raise KeyError(
185-
f"{site} is not a recognized site, options are {SLURM_OPTIONS.keys()}"
186-
) from msg
187-
command_line = (
188-
["sbatch"]
189-
+ slurm_options
190-
+ ["rail-project", "--run_mode", "slurm"]
191-
+ list(args)
192-
)
193-
return handle_command(run_mode, command_line)
247+
if run_mode in [RunMode.dry_run, RunMode.bash]:
248+
ok = 0
249+
for commands_, _script_path in all_commands:
250+
try:
251+
handle_commands(
252+
run_mode,
253+
commands_,
254+
)
255+
except Exception as msg: # pragma: no cover
256+
print(msg)
257+
ok |= 1
258+
259+
return ok
260+
261+
# At this point we are using slurm and need a script to send to batch
262+
if script_path is None:
263+
raise ValueError(
264+
"handle_all_commands with run_mode == RunMode.slurm requires a path to a script to write",
265+
)
266+
267+
assert site_config is not None
268+
return run_batches(all_commands, Path(script_path), site_config)
269+
270+
271+
def run_batches(
272+
all_commands: list[tuple[list[list[str]], str]],
273+
script_path: Path,
274+
site_config: dict[str, Any],
275+
) -> int: # pragma: no cover
276+
"""Run all the commands in the mode requested
277+
278+
Parameters
279+
----------
280+
all_commands:
281+
List of commands lists and associated locations for scripts
282+
283+
script_path:
284+
Path to write the slurm submit script to
285+
286+
site_config:
287+
Which site we are running at
288+
289+
Returns
290+
-------
291+
int:
292+
Status returned by the commands. 0 for success, exit code otherwise
293+
"""
294+
slurm_options = site_config.get("slurm_options", []).copy()
295+
batch_size = site_config.get("slurm_batch_size", 4)
296+
srun_command = site_config.get("srun_command", "srun")
297+
sbatch_commands = site_config.get("sbatch_commands", ["sbatch"])
298+
299+
job_idx = 0
300+
start = 0
301+
stop = len(all_commands)
302+
status = 0
303+
304+
while start < stop:
305+
command_batch = all_commands[start : start + batch_size]
306+
307+
batch_submit_script = Path(str(script_path).replace(".sh", f"_{job_idx}.sh"))
308+
batch_log = Path(str(script_path).replace(".sh", f"_{job_idx}.log"))
309+
batch_err_log = Path(str(script_path).replace(".sh", f"_{job_idx}.err"))
310+
scripts_in_batch: list[str] = []
311+
312+
for commands_, script_path_ in command_batch:
313+
try:
314+
write_run_script(
315+
commands_,
316+
Path(script_path_),
317+
)
318+
scripts_in_batch.append(script_path_)
319+
except Exception as msg: # pragma: no cover
320+
print(msg)
321+
status |= 1
322+
323+
try:
324+
slurm_options += [
325+
f"--output={batch_log}",
326+
f"--error={batch_err_log}",
327+
f"--ntasks={batch_size}",
328+
]
329+
330+
write_submit_script(
331+
scripts_in_batch,
332+
batch_submit_script,
333+
slurm_options,
334+
srun_command,
335+
)
336+
submit_slurm_job(batch_submit_script, sbatch_commands)
337+
except Exception as msg: # pragma: no cover
338+
print(msg)
339+
status |= 1
340+
341+
start += batch_size
342+
job_idx += 1
343+
344+
return status

0 commit comments

Comments
 (0)