From b2a749d1f9bb64b9733e9d4c6b8539a736c1e1eb Mon Sep 17 00:00:00 2001 From: Eric Charles Date: Wed, 21 Jan 2026 15:25:22 -0800 Subject: [PATCH 1/9] feat: get the slurm stuff working --- src/rail/projects/execution.py | 24 +++++++++++++++++++++--- src/rail/projects/project.py | 8 ++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/src/rail/projects/execution.py b/src/rail/projects/execution.py index 2091adf..d45fa24 100644 --- a/src/rail/projects/execution.py +++ b/src/rail/projects/execution.py @@ -32,8 +32,13 @@ class RunMode(enum.Enum): "regular", "--parsable", ] +TEST_SLURM_OPTIONS: list[str] = [ + "--dummy", + "test", +] SLURM_OPTIONS = { + "test": TEST_SLURM_OPTIONS, "s3df": S3DF_SLURM_OPTIONS, "perlmutter": PERLMUTTER_SLURM_OPTIONS, } @@ -139,11 +144,18 @@ def handle_commands( fout.write(f"{com_line}\n") script_out = script_path.replace(".sh", ".out") + script_err = script_path.replace(".sh", ".err") + + if site in ['test']: + exec_command = ["echo", "0" "|"] + else: + exec_command = ["srun"] command_line = ( - ["srun"] + slurm_options + ["--output", script_out, "--error", script_path] + exec_command + slurm_options + ["--output", script_out, "--error", script_err, script_path] ) try: + print(command_line) with subprocess.Popen( command_line, stdout=subprocess.PIPE, @@ -184,10 +196,16 @@ def sbatch_wrap( raise KeyError( f"{site} is not a recognized site, options are {SLURM_OPTIONS.keys()}" ) from msg + + if site in ['test']: + exec_command = ["echo", "0" "| " "sbatch"] + else: + exec_command = ["sbatch"] + command_line = ( - ["sbatch"] + exec_command + slurm_options - + ["rail-project", "--run_mode", "slurm"] + + ["rail-project", "--run_mode", "bash", "--site", site] + list(args) ) return handle_command(run_mode, command_line) diff --git a/src/rail/projects/project.py b/src/rail/projects/project.py index b341576..118cbdd 100644 --- a/src/rail/projects/project.py +++ b/src/rail/projects/project.py @@ -767,6 +767,14 @@ def run_pipeline_catalog( kwcopy = kwargs.copy() flavor = kwcopy.pop("flavor") + + if run_mode in [execution.RunMode.slurm]: + if kwargs.get('site') is None: + raise ValueError( + "Running with --run-mode slurm requires setting the --site. " + f"Possible values are {list(execution.SLURM_OPTIONS.keys())}" + ) + all_commands = self.make_pipeline_catalog_commands( pipeline_name, flavor, **kwcopy ) From 6c0c31fd79c062fc114c4cb28f0039f5471862b5 Mon Sep 17 00:00:00 2001 From: Eric Charles Date: Thu, 22 Jan 2026 13:29:48 -0800 Subject: [PATCH 2/9] wip: refactor execution and slurm stuff --- src/rail/cli/rail_project/project_commands.py | 11 - src/rail/projects/execution.py | 271 ++++++++++++------ src/rail/projects/project.py | 45 ++- 3 files changed, 208 insertions(+), 119 deletions(-) diff --git a/src/rail/cli/rail_project/project_commands.py b/src/rail/cli/rail_project/project_commands.py index 6b92bad..22b79ea 100644 --- a/src/rail/cli/rail_project/project_commands.py +++ b/src/rail/cli/rail_project/project_commands.py @@ -187,17 +187,6 @@ def subsample_command( return ok -@project_cli.command(name="sbatch") -@project_options.run_mode() -@project_options.site() -@project_options.args() -def sbatch_command( - run_mode: project_options.RunMode, site: str, args: list[str] -) -> int: # pragma: no cover - """Wrap a rail_pipe command with site-based arguements for slurm""" - return execution.sbatch_wrap(run_mode, site, args) - - @project_cli.command(name="reduce") @project_options.config_file() @project_options.run_mode() diff --git a/src/rail/projects/execution.py b/src/rail/projects/execution.py index d45fa24..e702522 100644 --- a/src/rail/projects/execution.py +++ b/src/rail/projects/execution.py @@ -3,6 +3,7 @@ import enum import os import subprocess +from pathlib import Path import time @@ -14,27 +15,22 @@ class RunMode(enum.Enum): slurm = 2 +# FIXME: we probably want to get this from the config file + S3DF_SLURM_OPTIONS: list[str] = [ - "-p", - "milano", - "--account", - "rubin:commissioning@milano", - "--mem", - "16G", + "-p=milano", + "--account=rubin:commissioning@milano", + "--mem=16G", "--parsable", ] PERLMUTTER_SLURM_OPTIONS: list[str] = [ - "--account", - "m1727", - "--constraint", - "cpu", - "--qos", - "regular", + "--account=m1727", + "--constraint=cpu", + "--qos=regular", "--parsable", ] TEST_SLURM_OPTIONS: list[str] = [ - "--dummy", - "test", + "--dummy=test", ] SLURM_OPTIONS = { @@ -43,6 +39,12 @@ class RunMode(enum.Enum): "perlmutter": PERLMUTTER_SLURM_OPTIONS, } +BATCH_SIZES = { + "test": 8, + "s3df": 8, + "perlmutter": 32, +} + def handle_command( run_mode: RunMode, @@ -91,8 +93,6 @@ def handle_command( def handle_commands( run_mode: RunMode, command_lines: list[list[str]], - script_path: str | None = None, - site: str | None = None, ) -> int: # pragma: no cover """Run a multiple commands in the mode requested @@ -104,15 +104,11 @@ def handle_commands( command_lines: list[list[str]] List of commands to run, each one is the list of tokens in the command line - script_path: str | None - Path to write the slurm submit script to - Returns ------- int: Status returned by the commands. 0 for success, exit code otherwise """ - if run_mode in [RunMode.dry_run, RunMode.bash]: for command_ in command_lines: retcode = handle_command(run_mode, command_) @@ -120,92 +116,205 @@ def handle_commands( return retcode return 0 - # At this point we are using slurm and need a script to send to batch - if script_path is None: - raise ValueError( - "handle_commands with run_mode == RunMode.slurm requires a path to a script to write", - ) + raise RuntimeError( + "handle_commands should only be called with run_mode in [RunMode.dry_run, RunMode.bash]", + ) - if site is None: - raise ValueError( - "handle_commands with run_mode == RunMode.slurm requires a site", - ) - slurm_options = SLURM_OPTIONS[site] +def write_run_script( + command_lines: list[list[str]], + script_path: str | Path, +) -> None: # pragma: no cover + """Write a script to run multiple commands + + Parameters + ---------- + command_lines: + List of commands to run, each one is the list of tokens in the command line + script_path: + Path to write the script to + """ try: os.makedirs(os.path.dirname(script_path)) except FileExistsError: pass + with open(script_path, "w", encoding="utf-8") as fout: fout.write("#!/usr/bin/bash\n\n") for command_ in command_lines: com_line = " ".join(command_) fout.write(f"{com_line}\n") + fout.write("echo Done!\n") - script_out = script_path.replace(".sh", ".out") - script_err = script_path.replace(".sh", ".err") - if site in ['test']: - exec_command = ["echo", "0" "|"] - else: - exec_command = ["srun"] +def write_submit_script( + scripts_in_batch: list[str], + batch_submit_script: str | Path, + slurm_options: list[str], + exec_command: str = "srun", +) -> None: # pragma: no cover + """Write a script to run multiple commands + + Parameters + ---------- + scripts_in_batch: + List of scripts we are going to run + + batch_submit_script: + Path to write the script to + + slurm_options: + List of options for slurm - command_line = ( - exec_command + slurm_options + ["--output", script_out, "--error", script_err, script_path] + exec_commands: + Command used to execute commands + """ + with open(batch_submit_script, "w", encoding="utf-8") as fout: + fout.write("#!/usr/bin/bash\n\n") + for opt_ in slurm_options: + fout.write(f"#SBATCH {opt_}\n") + + for script_ in scripts_in_batch: + fout.write(f"{exec_command} {script_}\n") + + fout.write("echo Done!") + + +def submit_slurm_job(script_path: Path | str) -> str: + """Submit a SLURM job and return the job ID.""" + result = subprocess.run( + ["sbatch", str(script_path)], capture_output=True, text=True, check=False, ) - try: - print(command_line) - with subprocess.Popen( - command_line, - stdout=subprocess.PIPE, - ) as srun: - assert srun.stdout - line = srun.stdout.read().decode().strip() - ret_val = int(line.split("|")[0]) - except TypeError as msg: - raise TypeError(f"Bad slurm submit: {msg}") from msg - - return ret_val - - -def sbatch_wrap( - run_mode: RunMode, site: str, args: list[str] + + if result.returncode == 0: + # Parse job ID from output like "Submitted batch job 12345" + job_id = result.stdout.strip().split()[-1] + return job_id + raise RuntimeError(f"SLURM submission failed: {result.stderr}") + + +def handle_all_commands( + run_mode: RunMode, + all_commands: list[tuple[list[list[str]], str]], + script_path: str | None = None, + site: str | None = None, ) -> int: # pragma: no cover - """Wrap a rail_pipe command with site-based arguements + """Run all the commands in the mode requested Parameters ---------- - run_mode: RunMode + run_mode: How to run the command, e.g., dry_run, bash or slurm - site: str - Execution site, used to set sbatch options + all_commands: + List of commands and associated place to write scripts + + script_path: + Path to write the slurm submit script to - args: list[str] - Additional arguments + site: + Which site we are running at Returns ------- - int - Status. 0 for success, exit code otherwise + int: + Status returned by the commands. 0 for success, exit code otherwise """ - try: - slurm_options = SLURM_OPTIONS[site] - except KeyError as msg: - raise KeyError( - f"{site} is not a recognized site, options are {SLURM_OPTIONS.keys()}" - ) from msg - - if site in ['test']: - exec_command = ["echo", "0" "| " "sbatch"] - else: - exec_command = ["sbatch"] + if run_mode in [RunMode.dry_run, RunMode.bash]: + ok = 0 + for commands_, _script_path in all_commands: + try: + handle_commands( + run_mode, + commands_, + ) + except Exception as msg: # pragma: no cover + print(msg) + ok |= 1 + + return ok - command_line = ( - exec_command - + slurm_options - + ["rail-project", "--run_mode", "bash", "--site", site] - + list(args) - ) - return handle_command(run_mode, command_line) + # At this point we are using slurm and need a script to send to batch + if script_path is None: + raise ValueError( + "handle_all_commands with run_mode == RunMode.slurm requires a path to a script to write", + ) + + if site is None: + raise ValueError( + "handle_all_commands with run_mode == RunMode.slurm requires a site", + ) + + return run_batches(all_commands, script_path, site) + + +def run_batches( + all_commands: list[tuple[list[list[str]], str]], + script_path: str, + site: str, +) -> int: # pragma: no cover + """Run all the commands in the mode requested + + Parameters + ---------- + all_commands: + List of commands lists and associated locations for scripts + + script_path: + Path to write the slurm submit script to + + site: + Which site we are running at + + Returns + ------- + int: + Status returned by the commands. 0 for success, exit code otherwise + """ + slurm_options = SLURM_OPTIONS[site] + batch_size = BATCH_SIZES[site] + + if site in ["test"]: + exec_command = "echo 0 | sbatch" + else: + exec_command = "sbatch" + + job_idx = 0 + start = 0 + stop = len(all_commands) + status = 0 + + while start < stop: + command_batch = all_commands[start : start + batch_size] + + batch_submit_script = script_path.replace(".sh", f"_{job_idx}.sh") + scripts_in_batch: list[str] = [] + + for commands_, script_path_ in command_batch: + try: + write_run_script( + commands_, + script_path_, + ) + scripts_in_batch.append(script_path_) + except Exception as msg: # pragma: no cover + print(msg) + status |= 1 + + try: + write_submit_script( + scripts_in_batch, + batch_submit_script, + slurm_options, + exec_command, + ) + submit_slurm_job(batch_submit_script) + except Exception as msg: # pragma: no cover + print(msg) + status |= 1 + + start += batch_size + job_idx += 1 + + return status diff --git a/src/rail/projects/project.py b/src/rail/projects/project.py index 118cbdd..487b76b 100644 --- a/src/rail/projects/project.py +++ b/src/rail/projects/project.py @@ -724,21 +724,17 @@ def run_pipeline_single( kwcopy = kwargs.copy() flavor = kwcopy.pop("flavor") sink_dir = self.get_path("ceci_output_dir", flavor=flavor, **kwcopy) - script_path = os.path.join(sink_dir, f"submit_{pipeline_name}.sh") + script_path = os.path.join(sink_dir, f"run_{pipeline_name}.sh") + top_script_path = os.path.join(sink_dir, f"submit_{pipeline_name}.sh") commands = self.make_pipeline_single_input_command( pipeline_name, flavor, **kwcopy ) - try: - statuscode = execution.handle_commands( - run_mode, - [commands], - script_path, - site=kwargs.get("site", None), - ) - except Exception as msg: # pragma: no cover - print(msg) - statuscode = 1 - return statuscode + return execution.handle_all_commands( + run_mode, + [([commands], script_path)], + top_script_path, + site=kwcopy.get('site'), + ) def run_pipeline_catalog( self, @@ -768,8 +764,11 @@ def run_pipeline_catalog( kwcopy = kwargs.copy() flavor = kwcopy.pop("flavor") + sink_dir = self.get_path("ceci_output_dir", flavor=flavor, **kwcopy) + submit_script_path = os.path.join(sink_dir, f"submit_{pipeline_name}.sh") + if run_mode in [execution.RunMode.slurm]: - if kwargs.get('site') is None: + if kwargs.get("site") is None: raise ValueError( "Running with --run-mode slurm requires setting the --site. " f"Possible values are {list(execution.SLURM_OPTIONS.keys())}" @@ -779,20 +778,12 @@ def run_pipeline_catalog( pipeline_name, flavor, **kwcopy ) - ok = 0 - for commands, script_path in all_commands: - try: - execution.handle_commands( - run_mode, - commands, - script_path, - site=kwargs.get("site", None), - ) - except Exception as msg: # pragma: no cover - print(msg) - ok |= 1 - - return ok + return execution.handle_all_commands( + run_mode, + all_commands, + submit_script_path, + site=kwcopy.get('site'), + ) def add_flavor(self, name: str, **kwargs: Any) -> RailFlavor: """Add a new flavor to the Project""" From ffbdf66f9236ef5c5b4946d30ca302b65f08d1b0 Mon Sep 17 00:00:00 2001 From: Eric Charles Date: Thu, 22 Jan 2026 13:52:05 -0800 Subject: [PATCH 3/9] wip: fixing up paths in slurm handling --- src/rail/projects/execution.py | 50 ++++++++++++++++++++-------------- src/rail/projects/project.py | 4 +-- 2 files changed, 32 insertions(+), 22 deletions(-) diff --git a/src/rail/projects/execution.py b/src/rail/projects/execution.py index e702522..0f2a7b3 100644 --- a/src/rail/projects/execution.py +++ b/src/rail/projects/execution.py @@ -123,7 +123,7 @@ def handle_commands( def write_run_script( command_lines: list[list[str]], - script_path: str | Path, + script_path: Path, ) -> None: # pragma: no cover """Write a script to run multiple commands @@ -140,17 +140,18 @@ def write_run_script( except FileExistsError: pass - with open(script_path, "w", encoding="utf-8") as fout: - fout.write("#!/usr/bin/bash\n\n") - for command_ in command_lines: - com_line = " ".join(command_) - fout.write(f"{com_line}\n") - fout.write("echo Done!\n") + contents = "#!/usr/bin/bash\n\n" + for command_ in command_lines: + com_line = " ".join(command_) + contents += f"{com_line}\n" + contents += "echo Done!\n" + script_path.write_text(contents) + script_path.chmod(0o755) def write_submit_script( scripts_in_batch: list[str], - batch_submit_script: str | Path, + batch_submit_script: Path, slurm_options: list[str], exec_command: str = "srun", ) -> None: # pragma: no cover @@ -170,21 +171,30 @@ def write_submit_script( exec_commands: Command used to execute commands """ - with open(batch_submit_script, "w", encoding="utf-8") as fout: - fout.write("#!/usr/bin/bash\n\n") - for opt_ in slurm_options: - fout.write(f"#SBATCH {opt_}\n") + try: + os.makedirs(os.path.dirname(batch_submit_script)) + except FileExistsError: + pass + + contents = "#!/usr/bin/bash\n\n" + for opt_ in slurm_options: + contents += f"#SBATCH {opt_}\n" - for script_ in scripts_in_batch: - fout.write(f"{exec_command} {script_}\n") + for script_ in scripts_in_batch: + contents += f"{exec_command} {script_}\n" - fout.write("echo Done!") + contents += "echo Done!" + batch_submit_script.write_text(contents) + batch_submit_script.chmod(0o755) def submit_slurm_job(script_path: Path | str) -> str: """Submit a SLURM job and return the job ID.""" result = subprocess.run( - ["sbatch", str(script_path)], capture_output=True, text=True, check=False, + ["sbatch", str(script_path)], + capture_output=True, + text=True, + check=False, ) if result.returncode == 0: @@ -246,12 +256,12 @@ def handle_all_commands( "handle_all_commands with run_mode == RunMode.slurm requires a site", ) - return run_batches(all_commands, script_path, site) + return run_batches(all_commands, Path(script_path), site) def run_batches( all_commands: list[tuple[list[list[str]], str]], - script_path: str, + script_path: Path, site: str, ) -> int: # pragma: no cover """Run all the commands in the mode requested @@ -288,14 +298,14 @@ def run_batches( while start < stop: command_batch = all_commands[start : start + batch_size] - batch_submit_script = script_path.replace(".sh", f"_{job_idx}.sh") + batch_submit_script = Path(str(script_path).replace(".sh", f"_{job_idx}.sh")) scripts_in_batch: list[str] = [] for commands_, script_path_ in command_batch: try: write_run_script( commands_, - script_path_, + Path(script_path_), ) scripts_in_batch.append(script_path_) except Exception as msg: # pragma: no cover diff --git a/src/rail/projects/project.py b/src/rail/projects/project.py index 487b76b..a7ec2eb 100644 --- a/src/rail/projects/project.py +++ b/src/rail/projects/project.py @@ -733,7 +733,7 @@ def run_pipeline_single( run_mode, [([commands], script_path)], top_script_path, - site=kwcopy.get('site'), + site=kwcopy.get("site"), ) def run_pipeline_catalog( @@ -782,7 +782,7 @@ def run_pipeline_catalog( run_mode, all_commands, submit_script_path, - site=kwcopy.get('site'), + site=kwcopy.get("site"), ) def add_flavor(self, name: str, **kwargs: Any) -> RailFlavor: From 3ea656ed1d81a0b79f604904a70ca24650a3161d Mon Sep 17 00:00:00 2001 From: Eric Charles Date: Thu, 22 Jan 2026 14:01:55 -0800 Subject: [PATCH 4/9] fix: tweak execution.py --- src/rail/projects/execution.py | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/src/rail/projects/execution.py b/src/rail/projects/execution.py index 0f2a7b3..35d01f3 100644 --- a/src/rail/projects/execution.py +++ b/src/rail/projects/execution.py @@ -45,6 +45,9 @@ class RunMode(enum.Enum): "perlmutter": 32, } +BASH_LINE = "#!/usr/bin/bash" + + def handle_command( run_mode: RunMode, @@ -140,7 +143,7 @@ def write_run_script( except FileExistsError: pass - contents = "#!/usr/bin/bash\n\n" + contents = f"{BASH_LINE}\n\n" for command_ in command_lines: com_line = " ".join(command_) contents += f"{com_line}\n" @@ -176,22 +179,25 @@ def write_submit_script( except FileExistsError: pass - contents = "#!/usr/bin/bash\n\n" + contents = f"{BASH_LINE}\n\n" for opt_ in slurm_options: contents += f"#SBATCH {opt_}\n" for script_ in scripts_in_batch: contents += f"{exec_command} {script_}\n" - contents += "echo Done!" + contents += "echo Done!\n" batch_submit_script.write_text(contents) batch_submit_script.chmod(0o755) -def submit_slurm_job(script_path: Path | str) -> str: +def submit_slurm_job( + script_path: Path | str, + sbatch_command: str, +) -> str: """Submit a SLURM job and return the job ID.""" result = subprocess.run( - ["sbatch", str(script_path)], + [sbatch_command, str(script_path)], capture_output=True, text=True, check=False, @@ -286,9 +292,11 @@ def run_batches( batch_size = BATCH_SIZES[site] if site in ["test"]: - exec_command = "echo 0 | sbatch" + srun_command = "echo 0 srun" + sbatch_command = "echo 0 sbatch" else: - exec_command = "sbatch" + srun_command = "srun" + sbatch_command = "sbatch" job_idx = 0 start = 0 @@ -317,9 +325,9 @@ def run_batches( scripts_in_batch, batch_submit_script, slurm_options, - exec_command, + srun_command, ) - submit_slurm_job(batch_submit_script) + submit_slurm_job(batch_submit_script, sbatch_command) except Exception as msg: # pragma: no cover print(msg) status |= 1 From d6c9710a2bb79f5eef7f9fef5a5f00ab0ec5d359 Mon Sep 17 00:00:00 2001 From: Eric Charles Date: Thu, 22 Jan 2026 14:08:05 -0800 Subject: [PATCH 5/9] fix: more tweaking execution --- src/rail/projects/execution.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/rail/projects/execution.py b/src/rail/projects/execution.py index 35d01f3..27f281a 100644 --- a/src/rail/projects/execution.py +++ b/src/rail/projects/execution.py @@ -193,11 +193,11 @@ def write_submit_script( def submit_slurm_job( script_path: Path | str, - sbatch_command: str, + sbatch_commands: list[str], ) -> str: """Submit a SLURM job and return the job ID.""" result = subprocess.run( - [sbatch_command, str(script_path)], + sbatch_commands + [str(script_path)], capture_output=True, text=True, check=False, @@ -293,10 +293,10 @@ def run_batches( if site in ["test"]: srun_command = "echo 0 srun" - sbatch_command = "echo 0 sbatch" + sbatch_commands = ["echo", "0", "sbatch"] else: srun_command = "srun" - sbatch_command = "sbatch" + sbatch_commands = ["sbatch"] job_idx = 0 start = 0 @@ -327,7 +327,7 @@ def run_batches( slurm_options, srun_command, ) - submit_slurm_job(batch_submit_script, sbatch_command) + submit_slurm_job(batch_submit_script, sbatch_commands) except Exception as msg: # pragma: no cover print(msg) status |= 1 From aefbb75ce4bac14f5e090e4804f6d9a4ceeccc40 Mon Sep 17 00:00:00 2001 From: Eric Charles Date: Thu, 22 Jan 2026 14:15:33 -0800 Subject: [PATCH 6/9] wip: tweak run script file name --- src/rail/projects/pipeline_holder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rail/projects/pipeline_holder.py b/src/rail/projects/pipeline_holder.py index 2222fb7..3f1811a 100644 --- a/src/rail/projects/pipeline_holder.py +++ b/src/rail/projects/pipeline_holder.py @@ -965,7 +965,7 @@ def make_pipeline_catalog_commands( sink_dir = os.path.dirname(sink_catalog) script_path = os.path.join( sink_dir, - f"submit_{pipeline_name}_{selection}_{flavor}.sh", + f"run_{pipeline_name}_{selection}_{flavor}.sh", ) ceci_commands = project.generate_ceci_command( pipeline_path=pipeline_path, From 01db39d4936a678ab4f02fd63816cc4f72dacef6 Mon Sep 17 00:00:00 2001 From: Eric Charles Date: Thu, 22 Jan 2026 15:05:26 -0800 Subject: [PATCH 7/9] Added site config to rail project yaml --- src/rail/projects/execution.py | 106 +++++++++++++++++---------------- src/rail/projects/project.py | 21 +++++-- 2 files changed, 73 insertions(+), 54 deletions(-) diff --git a/src/rail/projects/execution.py b/src/rail/projects/execution.py index 27f281a..5b4f45b 100644 --- a/src/rail/projects/execution.py +++ b/src/rail/projects/execution.py @@ -5,6 +5,7 @@ import subprocess from pathlib import Path import time +from typing import Any class RunMode(enum.Enum): @@ -15,40 +16,46 @@ class RunMode(enum.Enum): slurm = 2 -# FIXME: we probably want to get this from the config file - -S3DF_SLURM_OPTIONS: list[str] = [ - "-p=milano", - "--account=rubin:commissioning@milano", - "--mem=16G", - "--parsable", -] -PERLMUTTER_SLURM_OPTIONS: list[str] = [ - "--account=m1727", - "--constraint=cpu", - "--qos=regular", - "--parsable", -] -TEST_SLURM_OPTIONS: list[str] = [ - "--dummy=test", -] - -SLURM_OPTIONS = { - "test": TEST_SLURM_OPTIONS, - "s3df": S3DF_SLURM_OPTIONS, - "perlmutter": PERLMUTTER_SLURM_OPTIONS, -} - -BATCH_SIZES = { - "test": 8, - "s3df": 8, - "perlmutter": 32, -} +S3DF_SITE_CONFIG: dict[str, Any] = dict( + slurm_batch_size=8, + slurm_options=[ + "-p=milano", + "--account=rubin:commissioning@milano", + "--mem=16G", + "--parsable", + ], + srun_command="srun", + sbatch_commands=["sbatch"], +) +PERLMUTTER_SITE_CONFIG: dict[str, Any] = dict( + slurm_batch_size=32, + slurm_options=[ + "--account=m1727", + "--constraint=cpu", + "--qos=regular", + "--parsable", + ], + srun_command="srun", + sbatch_commands=["sbatch"], +) +TEST_SITE_CONFIG: dict[str, Any] = dict( + slurm_batch_size=4, + slurm_options=[ + "--dummy=test", + ], + srun_command="echo 0 srun", + sbatch_commands=["echo", "0", "sbatch"], +) + +DEFAULT_SITE_CONFIGS: dict[str, dict[str, Any]] = dict( + test=TEST_SITE_CONFIG, + perlmutter=PERLMUTTER_SITE_CONFIG, + s3df=S3DF_SITE_CONFIG, +) BASH_LINE = "#!/usr/bin/bash" - def handle_command( run_mode: RunMode, command_line: list[str], @@ -214,7 +221,7 @@ def handle_all_commands( run_mode: RunMode, all_commands: list[tuple[list[list[str]], str]], script_path: str | None = None, - site: str | None = None, + site_config: dict[str, Any] | None = None, ) -> int: # pragma: no cover """Run all the commands in the mode requested @@ -229,8 +236,8 @@ def handle_all_commands( script_path: Path to write the slurm submit script to - site: - Which site we are running at + site_config: + Config for site we are running at Returns ------- @@ -257,18 +264,14 @@ def handle_all_commands( "handle_all_commands with run_mode == RunMode.slurm requires a path to a script to write", ) - if site is None: - raise ValueError( - "handle_all_commands with run_mode == RunMode.slurm requires a site", - ) - - return run_batches(all_commands, Path(script_path), site) + assert site_config is not None + return run_batches(all_commands, Path(script_path), site_config) def run_batches( all_commands: list[tuple[list[list[str]], str]], script_path: Path, - site: str, + site_config: dict[str, Any], ) -> int: # pragma: no cover """Run all the commands in the mode requested @@ -280,7 +283,7 @@ def run_batches( script_path: Path to write the slurm submit script to - site: + site_config: Which site we are running at Returns @@ -288,15 +291,10 @@ def run_batches( int: Status returned by the commands. 0 for success, exit code otherwise """ - slurm_options = SLURM_OPTIONS[site] - batch_size = BATCH_SIZES[site] - - if site in ["test"]: - srun_command = "echo 0 srun" - sbatch_commands = ["echo", "0", "sbatch"] - else: - srun_command = "srun" - sbatch_commands = ["sbatch"] + slurm_options = site_config.get("slurm_options", []) + batch_size = site_config.get("slurm_batch_size", 4) + srun_command = site_config.get("srun_command", "srun") + sbatch_commands = site_config.get("sbatch_commands", ["sbatch"]) job_idx = 0 start = 0 @@ -307,6 +305,8 @@ def run_batches( command_batch = all_commands[start : start + batch_size] batch_submit_script = Path(str(script_path).replace(".sh", f"_{job_idx}.sh")) + batch_log = Path(str(script_path).replace(".sh", f"_{job_idx}.log")) + batch_err_log = Path(str(script_path).replace(".sh", f"_{job_idx}.err")) scripts_in_batch: list[str] = [] for commands_, script_path_ in command_batch: @@ -321,6 +321,12 @@ def run_batches( status |= 1 try: + slurm_options.update( + output=f"{batch_log}", + error=f"{batch_err_log}", + ntasks=batch_size, + ) + write_submit_script( scripts_in_batch, batch_submit_script, diff --git a/src/rail/projects/project.py b/src/rail/projects/project.py index a7ec2eb..4fb69af 100644 --- a/src/rail/projects/project.py +++ b/src/rail/projects/project.py @@ -84,6 +84,7 @@ class RailProject(Configurable): # pylint: disable=too-many-public-methods config_options: dict[str, StageParameter] = dict( Name=StageParameter(str, None, fmt="%s", required=True, msg="Project name"), + SiteConfig=StageParameter(dict, {}, fmt="%s", msg="Site configuration options"), Includes=StageParameter(list, [], fmt="%s", msg="Files to include"), Baseline=StageParameter( dict, None, fmt="%s", required=True, msg="Baseline analysis configuration" @@ -729,11 +730,12 @@ def run_pipeline_single( commands = self.make_pipeline_single_input_command( pipeline_name, flavor, **kwcopy ) + site_config = self.get_site_config(kwcopy.get("site")) return execution.handle_all_commands( run_mode, [([commands], script_path)], top_script_path, - site=kwcopy.get("site"), + site_config=site_config, ) def run_pipeline_catalog( @@ -771,18 +773,19 @@ def run_pipeline_catalog( if kwargs.get("site") is None: raise ValueError( "Running with --run-mode slurm requires setting the --site. " - f"Possible values are {list(execution.SLURM_OPTIONS.keys())}" + f"Possible values are {list(execution.DEFAULT_SITE_CONFIGS.keys())}" ) + site_config = self.get_site_config(kwcopy.get("site")) + all_commands = self.make_pipeline_catalog_commands( pipeline_name, flavor, **kwcopy ) - return execution.handle_all_commands( run_mode, all_commands, submit_script_path, - site=kwcopy.get("site"), + site_config=site_config, ) def add_flavor(self, name: str, **kwargs: Any) -> RailFlavor: @@ -799,6 +802,16 @@ def add_flavor(self, name: str, **kwargs: Any) -> RailFlavor: self._flavors[new_flavor.config.name] = new_flavor return new_flavor + def get_site_config(self, site: str | None) -> dict: + """Get the site configuration for a particular site""" + if site is None: + return {} + if site in self.config.SiteConfig: + return self.config.SiteConfig[site] + if site in execution.DEFAULT_SITE_CONFIGS: + return execution.DEFAULT_SITE_CONFIGS[site] + raise KeyError(f"Could not get configuration for site: {site}") + def write_yaml(self, yaml_file: str) -> None: """Write this project to a yaml file""" the_dict = self.to_yaml_dict() From e36b11162ec01328e5c3ab4e20abed4ae586576b Mon Sep 17 00:00:00 2001 From: Eric Charles Date: Thu, 22 Jan 2026 15:10:11 -0800 Subject: [PATCH 8/9] wip: fixing slurm_options merging --- src/rail/projects/execution.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/rail/projects/execution.py b/src/rail/projects/execution.py index 5b4f45b..5a67d82 100644 --- a/src/rail/projects/execution.py +++ b/src/rail/projects/execution.py @@ -291,7 +291,7 @@ def run_batches( int: Status returned by the commands. 0 for success, exit code otherwise """ - slurm_options = site_config.get("slurm_options", []) + slurm_options = site_config.get("slurm_options", []).copy() batch_size = site_config.get("slurm_batch_size", 4) srun_command = site_config.get("srun_command", "srun") sbatch_commands = site_config.get("sbatch_commands", ["sbatch"]) @@ -321,10 +321,10 @@ def run_batches( status |= 1 try: - slurm_options.update( - output=f"{batch_log}", - error=f"{batch_err_log}", - ntasks=batch_size, + slurm_options += [ + f"--output={batch_log}", + f"--error={batch_err_log}", + f"--ntasks={batch_size}", ) write_submit_script( From 838dfa0aff68f75fcda44b27d89fe198f5874ae6 Mon Sep 17 00:00:00 2001 From: Eric Charles Date: Thu, 22 Jan 2026 15:10:55 -0800 Subject: [PATCH 9/9] wip: fixing slurm_options merging --- src/rail/projects/execution.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rail/projects/execution.py b/src/rail/projects/execution.py index 5a67d82..9ec6c86 100644 --- a/src/rail/projects/execution.py +++ b/src/rail/projects/execution.py @@ -325,7 +325,7 @@ def run_batches( f"--output={batch_log}", f"--error={batch_err_log}", f"--ntasks={batch_size}", - ) + ] write_submit_script( scripts_in_batch,