diff --git a/src/rail/cli/rail_project/project_commands.py b/src/rail/cli/rail_project/project_commands.py index 6b92bad..22b79ea 100644 --- a/src/rail/cli/rail_project/project_commands.py +++ b/src/rail/cli/rail_project/project_commands.py @@ -187,17 +187,6 @@ def subsample_command( return ok -@project_cli.command(name="sbatch") -@project_options.run_mode() -@project_options.site() -@project_options.args() -def sbatch_command( - run_mode: project_options.RunMode, site: str, args: list[str] -) -> int: # pragma: no cover - """Wrap a rail_pipe command with site-based arguements for slurm""" - return execution.sbatch_wrap(run_mode, site, args) - - @project_cli.command(name="reduce") @project_options.config_file() @project_options.run_mode() diff --git a/src/rail/projects/execution.py b/src/rail/projects/execution.py index 2091adf..9ec6c86 100644 --- a/src/rail/projects/execution.py +++ b/src/rail/projects/execution.py @@ -3,7 +3,9 @@ import enum import os import subprocess +from pathlib import Path import time +from typing import Any class RunMode(enum.Enum): @@ -14,29 +16,44 @@ class RunMode(enum.Enum): slurm = 2 -S3DF_SLURM_OPTIONS: list[str] = [ - "-p", - "milano", - "--account", - "rubin:commissioning@milano", - "--mem", - "16G", - "--parsable", -] -PERLMUTTER_SLURM_OPTIONS: list[str] = [ - "--account", - "m1727", - "--constraint", - "cpu", - "--qos", - "regular", - "--parsable", -] - -SLURM_OPTIONS = { - "s3df": S3DF_SLURM_OPTIONS, - "perlmutter": PERLMUTTER_SLURM_OPTIONS, -} +S3DF_SITE_CONFIG: dict[str, Any] = dict( + slurm_batch_size=8, + slurm_options=[ + "-p=milano", + "--account=rubin:commissioning@milano", + "--mem=16G", + "--parsable", + ], + srun_command="srun", + sbatch_commands=["sbatch"], +) +PERLMUTTER_SITE_CONFIG: dict[str, Any] = dict( + slurm_batch_size=32, + slurm_options=[ + "--account=m1727", + "--constraint=cpu", + "--qos=regular", + "--parsable", + ], + srun_command="srun", + sbatch_commands=["sbatch"], +) +TEST_SITE_CONFIG: dict[str, Any] = dict( + slurm_batch_size=4, + slurm_options=[ + "--dummy=test", + ], + srun_command="echo 0 srun", + sbatch_commands=["echo", "0", "sbatch"], +) + +DEFAULT_SITE_CONFIGS: dict[str, dict[str, Any]] = dict( + test=TEST_SITE_CONFIG, + perlmutter=PERLMUTTER_SITE_CONFIG, + s3df=S3DF_SITE_CONFIG, +) + +BASH_LINE = "#!/usr/bin/bash" def handle_command( @@ -86,8 +103,6 @@ def handle_command( def handle_commands( run_mode: RunMode, command_lines: list[list[str]], - script_path: str | None = None, - site: str | None = None, ) -> int: # pragma: no cover """Run a multiple commands in the mode requested @@ -99,15 +114,11 @@ def handle_commands( command_lines: list[list[str]] List of commands to run, each one is the list of tokens in the command line - script_path: str | None - Path to write the slurm submit script to - Returns ------- int: Status returned by the commands. 0 for success, exit code otherwise """ - if run_mode in [RunMode.dry_run, RunMode.bash]: for command_ in command_lines: retcode = handle_command(run_mode, command_) @@ -115,79 +126,219 @@ def handle_commands( return retcode return 0 - # At this point we are using slurm and need a script to send to batch - if script_path is None: - raise ValueError( - "handle_commands with run_mode == RunMode.slurm requires a path to a script to write", - ) + raise RuntimeError( + "handle_commands should only be called with run_mode in [RunMode.dry_run, RunMode.bash]", + ) - if site is None: - raise ValueError( - "handle_commands with run_mode == RunMode.slurm requires a site", - ) - slurm_options = SLURM_OPTIONS[site] +def write_run_script( + command_lines: list[list[str]], + script_path: Path, +) -> None: # pragma: no cover + """Write a script to run multiple commands + + Parameters + ---------- + command_lines: + List of commands to run, each one is the list of tokens in the command line + script_path: + Path to write the script to + """ try: os.makedirs(os.path.dirname(script_path)) except FileExistsError: pass - with open(script_path, "w", encoding="utf-8") as fout: - fout.write("#!/usr/bin/bash\n\n") - for command_ in command_lines: - com_line = " ".join(command_) - fout.write(f"{com_line}\n") - script_out = script_path.replace(".sh", ".out") + contents = f"{BASH_LINE}\n\n" + for command_ in command_lines: + com_line = " ".join(command_) + contents += f"{com_line}\n" + contents += "echo Done!\n" + script_path.write_text(contents) + script_path.chmod(0o755) - command_line = ( - ["srun"] + slurm_options + ["--output", script_out, "--error", script_path] - ) + +def write_submit_script( + scripts_in_batch: list[str], + batch_submit_script: Path, + slurm_options: list[str], + exec_command: str = "srun", +) -> None: # pragma: no cover + """Write a script to run multiple commands + + Parameters + ---------- + scripts_in_batch: + List of scripts we are going to run + + batch_submit_script: + Path to write the script to + + slurm_options: + List of options for slurm + + exec_commands: + Command used to execute commands + """ try: - with subprocess.Popen( - command_line, - stdout=subprocess.PIPE, - ) as srun: - assert srun.stdout - line = srun.stdout.read().decode().strip() - ret_val = int(line.split("|")[0]) - except TypeError as msg: - raise TypeError(f"Bad slurm submit: {msg}") from msg + os.makedirs(os.path.dirname(batch_submit_script)) + except FileExistsError: + pass + + contents = f"{BASH_LINE}\n\n" + for opt_ in slurm_options: + contents += f"#SBATCH {opt_}\n" + + for script_ in scripts_in_batch: + contents += f"{exec_command} {script_}\n" + + contents += "echo Done!\n" + batch_submit_script.write_text(contents) + batch_submit_script.chmod(0o755) + - return ret_val +def submit_slurm_job( + script_path: Path | str, + sbatch_commands: list[str], +) -> str: + """Submit a SLURM job and return the job ID.""" + result = subprocess.run( + sbatch_commands + [str(script_path)], + capture_output=True, + text=True, + check=False, + ) + + if result.returncode == 0: + # Parse job ID from output like "Submitted batch job 12345" + job_id = result.stdout.strip().split()[-1] + return job_id + raise RuntimeError(f"SLURM submission failed: {result.stderr}") -def sbatch_wrap( - run_mode: RunMode, site: str, args: list[str] +def handle_all_commands( + run_mode: RunMode, + all_commands: list[tuple[list[list[str]], str]], + script_path: str | None = None, + site_config: dict[str, Any] | None = None, ) -> int: # pragma: no cover - """Wrap a rail_pipe command with site-based arguements + """Run all the commands in the mode requested Parameters ---------- - run_mode: RunMode + run_mode: How to run the command, e.g., dry_run, bash or slurm - site: str - Execution site, used to set sbatch options + all_commands: + List of commands and associated place to write scripts - args: list[str] - Additional arguments + script_path: + Path to write the slurm submit script to + + site_config: + Config for site we are running at Returns ------- - int - Status. 0 for success, exit code otherwise + int: + Status returned by the commands. 0 for success, exit code otherwise """ - try: - slurm_options = SLURM_OPTIONS[site] - except KeyError as msg: - raise KeyError( - f"{site} is not a recognized site, options are {SLURM_OPTIONS.keys()}" - ) from msg - command_line = ( - ["sbatch"] - + slurm_options - + ["rail-project", "--run_mode", "slurm"] - + list(args) - ) - return handle_command(run_mode, command_line) + if run_mode in [RunMode.dry_run, RunMode.bash]: + ok = 0 + for commands_, _script_path in all_commands: + try: + handle_commands( + run_mode, + commands_, + ) + except Exception as msg: # pragma: no cover + print(msg) + ok |= 1 + + return ok + + # At this point we are using slurm and need a script to send to batch + if script_path is None: + raise ValueError( + "handle_all_commands with run_mode == RunMode.slurm requires a path to a script to write", + ) + + assert site_config is not None + return run_batches(all_commands, Path(script_path), site_config) + + +def run_batches( + all_commands: list[tuple[list[list[str]], str]], + script_path: Path, + site_config: dict[str, Any], +) -> int: # pragma: no cover + """Run all the commands in the mode requested + + Parameters + ---------- + all_commands: + List of commands lists and associated locations for scripts + + script_path: + Path to write the slurm submit script to + + site_config: + Which site we are running at + + Returns + ------- + int: + Status returned by the commands. 0 for success, exit code otherwise + """ + slurm_options = site_config.get("slurm_options", []).copy() + batch_size = site_config.get("slurm_batch_size", 4) + srun_command = site_config.get("srun_command", "srun") + sbatch_commands = site_config.get("sbatch_commands", ["sbatch"]) + + job_idx = 0 + start = 0 + stop = len(all_commands) + status = 0 + + while start < stop: + command_batch = all_commands[start : start + batch_size] + + batch_submit_script = Path(str(script_path).replace(".sh", f"_{job_idx}.sh")) + batch_log = Path(str(script_path).replace(".sh", f"_{job_idx}.log")) + batch_err_log = Path(str(script_path).replace(".sh", f"_{job_idx}.err")) + scripts_in_batch: list[str] = [] + + for commands_, script_path_ in command_batch: + try: + write_run_script( + commands_, + Path(script_path_), + ) + scripts_in_batch.append(script_path_) + except Exception as msg: # pragma: no cover + print(msg) + status |= 1 + + try: + slurm_options += [ + f"--output={batch_log}", + f"--error={batch_err_log}", + f"--ntasks={batch_size}", + ] + + write_submit_script( + scripts_in_batch, + batch_submit_script, + slurm_options, + srun_command, + ) + submit_slurm_job(batch_submit_script, sbatch_commands) + except Exception as msg: # pragma: no cover + print(msg) + status |= 1 + + start += batch_size + job_idx += 1 + + return status diff --git a/src/rail/projects/pipeline_holder.py b/src/rail/projects/pipeline_holder.py index 2222fb7..3f1811a 100644 --- a/src/rail/projects/pipeline_holder.py +++ b/src/rail/projects/pipeline_holder.py @@ -965,7 +965,7 @@ def make_pipeline_catalog_commands( sink_dir = os.path.dirname(sink_catalog) script_path = os.path.join( sink_dir, - f"submit_{pipeline_name}_{selection}_{flavor}.sh", + f"run_{pipeline_name}_{selection}_{flavor}.sh", ) ceci_commands = project.generate_ceci_command( pipeline_path=pipeline_path, diff --git a/src/rail/projects/project.py b/src/rail/projects/project.py index b341576..4fb69af 100644 --- a/src/rail/projects/project.py +++ b/src/rail/projects/project.py @@ -84,6 +84,7 @@ class RailProject(Configurable): # pylint: disable=too-many-public-methods config_options: dict[str, StageParameter] = dict( Name=StageParameter(str, None, fmt="%s", required=True, msg="Project name"), + SiteConfig=StageParameter(dict, {}, fmt="%s", msg="Site configuration options"), Includes=StageParameter(list, [], fmt="%s", msg="Files to include"), Baseline=StageParameter( dict, None, fmt="%s", required=True, msg="Baseline analysis configuration" @@ -724,21 +725,18 @@ def run_pipeline_single( kwcopy = kwargs.copy() flavor = kwcopy.pop("flavor") sink_dir = self.get_path("ceci_output_dir", flavor=flavor, **kwcopy) - script_path = os.path.join(sink_dir, f"submit_{pipeline_name}.sh") + script_path = os.path.join(sink_dir, f"run_{pipeline_name}.sh") + top_script_path = os.path.join(sink_dir, f"submit_{pipeline_name}.sh") commands = self.make_pipeline_single_input_command( pipeline_name, flavor, **kwcopy ) - try: - statuscode = execution.handle_commands( - run_mode, - [commands], - script_path, - site=kwargs.get("site", None), - ) - except Exception as msg: # pragma: no cover - print(msg) - statuscode = 1 - return statuscode + site_config = self.get_site_config(kwcopy.get("site")) + return execution.handle_all_commands( + run_mode, + [([commands], script_path)], + top_script_path, + site_config=site_config, + ) def run_pipeline_catalog( self, @@ -767,24 +765,28 @@ def run_pipeline_catalog( kwcopy = kwargs.copy() flavor = kwcopy.pop("flavor") - all_commands = self.make_pipeline_catalog_commands( - pipeline_name, flavor, **kwcopy - ) - ok = 0 - for commands, script_path in all_commands: - try: - execution.handle_commands( - run_mode, - commands, - script_path, - site=kwargs.get("site", None), + sink_dir = self.get_path("ceci_output_dir", flavor=flavor, **kwcopy) + submit_script_path = os.path.join(sink_dir, f"submit_{pipeline_name}.sh") + + if run_mode in [execution.RunMode.slurm]: + if kwargs.get("site") is None: + raise ValueError( + "Running with --run-mode slurm requires setting the --site. " + f"Possible values are {list(execution.DEFAULT_SITE_CONFIGS.keys())}" ) - except Exception as msg: # pragma: no cover - print(msg) - ok |= 1 - return ok + site_config = self.get_site_config(kwcopy.get("site")) + + all_commands = self.make_pipeline_catalog_commands( + pipeline_name, flavor, **kwcopy + ) + return execution.handle_all_commands( + run_mode, + all_commands, + submit_script_path, + site_config=site_config, + ) def add_flavor(self, name: str, **kwargs: Any) -> RailFlavor: """Add a new flavor to the Project""" @@ -800,6 +802,16 @@ def add_flavor(self, name: str, **kwargs: Any) -> RailFlavor: self._flavors[new_flavor.config.name] = new_flavor return new_flavor + def get_site_config(self, site: str | None) -> dict: + """Get the site configuration for a particular site""" + if site is None: + return {} + if site in self.config.SiteConfig: + return self.config.SiteConfig[site] + if site in execution.DEFAULT_SITE_CONFIGS: + return execution.DEFAULT_SITE_CONFIGS[site] + raise KeyError(f"Could not get configuration for site: {site}") + def write_yaml(self, yaml_file: str) -> None: """Write this project to a yaml file""" the_dict = self.to_yaml_dict()