Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions src/rail/cli/rail_project/project_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,17 +187,6 @@ def subsample_command(
return ok


@project_cli.command(name="sbatch")
@project_options.run_mode()
@project_options.site()
@project_options.args()
def sbatch_command(
run_mode: project_options.RunMode, site: str, args: list[str]
) -> int: # pragma: no cover
"""Wrap a rail_pipe command with site-based arguements for slurm"""
return execution.sbatch_wrap(run_mode, site, args)


@project_cli.command(name="reduce")
@project_options.config_file()
@project_options.run_mode()
Expand Down
313 changes: 232 additions & 81 deletions src/rail/projects/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import enum
import os
import subprocess
from pathlib import Path
import time
from typing import Any


class RunMode(enum.Enum):
Expand All @@ -14,29 +16,44 @@ class RunMode(enum.Enum):
slurm = 2


S3DF_SLURM_OPTIONS: list[str] = [
"-p",
"milano",
"--account",
"rubin:commissioning@milano",
"--mem",
"16G",
"--parsable",
]
PERLMUTTER_SLURM_OPTIONS: list[str] = [
"--account",
"m1727",
"--constraint",
"cpu",
"--qos",
"regular",
"--parsable",
]

SLURM_OPTIONS = {
"s3df": S3DF_SLURM_OPTIONS,
"perlmutter": PERLMUTTER_SLURM_OPTIONS,
}
S3DF_SITE_CONFIG: dict[str, Any] = dict(
slurm_batch_size=8,
slurm_options=[
"-p=milano",
"--account=rubin:commissioning@milano",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'll always be against having site-specific stuff in what is meant to be a general codebase. I'm not sure that's possible if you want a script that does the slurm submission internal to RAIL

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Ultimately it doesn't matter because I'm not the one using this software 😅)

"--mem=16G",
"--parsable",
],
srun_command="srun",
sbatch_commands=["sbatch"],
)
PERLMUTTER_SITE_CONFIG: dict[str, Any] = dict(
slurm_batch_size=32,
slurm_options=[
"--account=m1727",
"--constraint=cpu",
"--qos=regular",
"--parsable",
],
srun_command="srun",
sbatch_commands=["sbatch"],
)
TEST_SITE_CONFIG: dict[str, Any] = dict(
slurm_batch_size=4,
slurm_options=[
"--dummy=test",
],
srun_command="echo 0 srun",
sbatch_commands=["echo", "0", "sbatch"],
)

DEFAULT_SITE_CONFIGS: dict[str, dict[str, Any]] = dict(
test=TEST_SITE_CONFIG,
perlmutter=PERLMUTTER_SITE_CONFIG,
s3df=S3DF_SITE_CONFIG,
)

BASH_LINE = "#!/usr/bin/bash"


def handle_command(
Expand Down Expand Up @@ -86,8 +103,6 @@ def handle_command(
def handle_commands(
run_mode: RunMode,
command_lines: list[list[str]],
script_path: str | None = None,
site: str | None = None,
) -> int: # pragma: no cover
"""Run a multiple commands in the mode requested

Expand All @@ -99,95 +114,231 @@ def handle_commands(
command_lines: list[list[str]]
List of commands to run, each one is the list of tokens in the command line

script_path: str | None
Path to write the slurm submit script to

Returns
-------
int:
Status returned by the commands. 0 for success, exit code otherwise
"""

if run_mode in [RunMode.dry_run, RunMode.bash]:
for command_ in command_lines:
retcode = handle_command(run_mode, command_)
if retcode:
return retcode
return 0

# At this point we are using slurm and need a script to send to batch
if script_path is None:
raise ValueError(
"handle_commands with run_mode == RunMode.slurm requires a path to a script to write",
)
raise RuntimeError(
"handle_commands should only be called with run_mode in [RunMode.dry_run, RunMode.bash]",
)

if site is None:
raise ValueError(
"handle_commands with run_mode == RunMode.slurm requires a site",
)

slurm_options = SLURM_OPTIONS[site]
def write_run_script(
command_lines: list[list[str]],
script_path: Path,
) -> None: # pragma: no cover
"""Write a script to run multiple commands

Parameters
----------
command_lines:
List of commands to run, each one is the list of tokens in the command line

script_path:
Path to write the script to
"""
try:
os.makedirs(os.path.dirname(script_path))
except FileExistsError:
pass
with open(script_path, "w", encoding="utf-8") as fout:
fout.write("#!/usr/bin/bash\n\n")
for command_ in command_lines:
com_line = " ".join(command_)
fout.write(f"{com_line}\n")

script_out = script_path.replace(".sh", ".out")
contents = f"{BASH_LINE}\n\n"
for command_ in command_lines:
com_line = " ".join(command_)
contents += f"{com_line}\n"
contents += "echo Done!\n"
script_path.write_text(contents)
script_path.chmod(0o755)

command_line = (
["srun"] + slurm_options + ["--output", script_out, "--error", script_path]
)

def write_submit_script(
scripts_in_batch: list[str],
batch_submit_script: Path,
slurm_options: list[str],
exec_command: str = "srun",
) -> None: # pragma: no cover
"""Write a script to run multiple commands

Parameters
----------
scripts_in_batch:
List of scripts we are going to run

batch_submit_script:
Path to write the script to

slurm_options:
List of options for slurm

exec_commands:
Command used to execute commands
"""
try:
with subprocess.Popen(
command_line,
stdout=subprocess.PIPE,
) as srun:
assert srun.stdout
line = srun.stdout.read().decode().strip()
ret_val = int(line.split("|")[0])
except TypeError as msg:
raise TypeError(f"Bad slurm submit: {msg}") from msg
os.makedirs(os.path.dirname(batch_submit_script))
except FileExistsError:
pass

contents = f"{BASH_LINE}\n\n"
for opt_ in slurm_options:
contents += f"#SBATCH {opt_}\n"

for script_ in scripts_in_batch:
contents += f"{exec_command} {script_}\n"

contents += "echo Done!\n"
batch_submit_script.write_text(contents)
batch_submit_script.chmod(0o755)


return ret_val
def submit_slurm_job(
script_path: Path | str,
sbatch_commands: list[str],
) -> str:
"""Submit a SLURM job and return the job ID."""
result = subprocess.run(
sbatch_commands + [str(script_path)],
capture_output=True,
text=True,
check=False,
)

if result.returncode == 0:
# Parse job ID from output like "Submitted batch job 12345"
job_id = result.stdout.strip().split()[-1]
return job_id
raise RuntimeError(f"SLURM submission failed: {result.stderr}")


def sbatch_wrap(
run_mode: RunMode, site: str, args: list[str]
def handle_all_commands(
run_mode: RunMode,
all_commands: list[tuple[list[list[str]], str]],
script_path: str | None = None,
site_config: dict[str, Any] | None = None,
) -> int: # pragma: no cover
"""Wrap a rail_pipe command with site-based arguements
"""Run all the commands in the mode requested

Parameters
----------
run_mode: RunMode
run_mode:
How to run the command, e.g., dry_run, bash or slurm

site: str
Execution site, used to set sbatch options
all_commands:
List of commands and associated place to write scripts

args: list[str]
Additional arguments
script_path:
Path to write the slurm submit script to

site_config:
Config for site we are running at

Returns
-------
int
Status. 0 for success, exit code otherwise
int:
Status returned by the commands. 0 for success, exit code otherwise
"""
try:
slurm_options = SLURM_OPTIONS[site]
except KeyError as msg:
raise KeyError(
f"{site} is not a recognized site, options are {SLURM_OPTIONS.keys()}"
) from msg
command_line = (
["sbatch"]
+ slurm_options
+ ["rail-project", "--run_mode", "slurm"]
+ list(args)
)
return handle_command(run_mode, command_line)
if run_mode in [RunMode.dry_run, RunMode.bash]:
ok = 0
for commands_, _script_path in all_commands:
try:
handle_commands(
run_mode,
commands_,
)
except Exception as msg: # pragma: no cover
print(msg)
ok |= 1

return ok

# At this point we are using slurm and need a script to send to batch
if script_path is None:
raise ValueError(
"handle_all_commands with run_mode == RunMode.slurm requires a path to a script to write",
)

assert site_config is not None
return run_batches(all_commands, Path(script_path), site_config)


def run_batches(
all_commands: list[tuple[list[list[str]], str]],
script_path: Path,
site_config: dict[str, Any],
) -> int: # pragma: no cover
"""Run all the commands in the mode requested

Parameters
----------
all_commands:
List of commands lists and associated locations for scripts

script_path:
Path to write the slurm submit script to

site_config:
Which site we are running at

Returns
-------
int:
Status returned by the commands. 0 for success, exit code otherwise
"""
slurm_options = site_config.get("slurm_options", []).copy()
batch_size = site_config.get("slurm_batch_size", 4)
srun_command = site_config.get("srun_command", "srun")
sbatch_commands = site_config.get("sbatch_commands", ["sbatch"])

job_idx = 0
start = 0
stop = len(all_commands)
status = 0

while start < stop:
command_batch = all_commands[start : start + batch_size]

batch_submit_script = Path(str(script_path).replace(".sh", f"_{job_idx}.sh"))
batch_log = Path(str(script_path).replace(".sh", f"_{job_idx}.log"))
batch_err_log = Path(str(script_path).replace(".sh", f"_{job_idx}.err"))
scripts_in_batch: list[str] = []

for commands_, script_path_ in command_batch:
try:
write_run_script(
commands_,
Path(script_path_),
)
scripts_in_batch.append(script_path_)
except Exception as msg: # pragma: no cover
print(msg)
status |= 1

try:
slurm_options += [
f"--output={batch_log}",
f"--error={batch_err_log}",
f"--ntasks={batch_size}",
]

write_submit_script(
scripts_in_batch,
batch_submit_script,
slurm_options,
srun_command,
)
submit_slurm_job(batch_submit_script, sbatch_commands)
except Exception as msg: # pragma: no cover
print(msg)
status |= 1

start += batch_size
job_idx += 1

return status
Loading