From 2498b6111088a07f0fdcc5c9688a7970980f2484 Mon Sep 17 00:00:00 2001 From: Julian Regalado Date: Fri, 29 May 2026 10:49:15 +0200 Subject: [PATCH 1/6] Added aditional options --- scripts/launch_runs.py | 150 +++++++++++++++++------------------------ scripts/make_units.py | 89 ++++++++++++------------ 2 files changed, 108 insertions(+), 131 deletions(-) diff --git a/scripts/launch_runs.py b/scripts/launch_runs.py index 9ec3900..732a6d1 100755 --- a/scripts/launch_runs.py +++ b/scripts/launch_runs.py @@ -22,32 +22,37 @@ help="Path to stats file", ) parser.add_argument( - "-w", "--workflow", action="store", + choices=["local", "prod", "prod-legacy", "prod-test", "caterpillar"], default="prod", - choices=["prod", "prod-legacy", "prod-test", "caterpillar"], help="Workflow to use", ) parser.add_argument( - "-r", - "--run", + "--pixi-path", action="store", - default="local", - choices=["local", "slurm"], - help="How to run jobs?", + type=Path, + default=None, + help="Path passed to `pixi run --manifest-path`. Overrides --workflow defaults.", ) parser.add_argument( - "--snakemake-submit", - action="store_true", - default=False, - help="Submit Snakemake as an HPC job", + "--workflow-path", + action="store", + type=Path, + default=None, + help="Path to workflow repo root (must contain `workflow/Snakefile`). Overrides --workflow defaults.", ) parser.add_argument( - "--snakemake-logger", + "--scheduler", action="store", - default="--logger snkmt --logger-snkmt-db snkmt.db", - help="Snakemake logger command", + default="slurm", + help="HPC scheduler", +) +parser.add_argument( + "--partition", + action="store", + default="compsnake", + help="Slurm partition (only used when --scheduler slurm).", ) parser.add_argument( "-l", @@ -59,7 +64,6 @@ ) args, extra_args = parser.parse_known_args() extra_args = " ".join(extra_args) -extra_args += " --jobs 300 --retries 1" # Set logger @@ -72,75 +76,6 @@ ) -# Infer pixi_env/workflow paths, and add extra options -if args.workflow == "prod": - pixi_env = workflow_path = "/projects/caeg/apps/aeDNA" - import os - - if os.environ.get("CAEG_QC_USER") and os.environ.get("CAEG_QC_PASSWORD"): - # extra_args += """ --config 'report={multiqc_db_url: "postgresql+psycopg2://dandypdb01fl.unicph.domain:5432/caeg_qc"}'""" - extra_args += " --profile /projects/caeg/data/resources/profile_production" -elif args.workflow == "prod-legacy": - pixi_env = "/projects/caeg/apps/aeDNA" - workflow_path = "/projects/caeg/apps/aeDNA-legacy" -elif args.workflow == "prod-test": - pixi_env = workflow_path = "/projects/caeg/people/lnc113/workflows/aeDNA/aeDNA" -elif args.workflow == "caterpillar": - pixi_env = workflow_path = "/projects/caeg/people/lnc113/workflows/caterpillar" - - -# Infer hostname, HPC account and partition -import socket - -hostname = socket.gethostname() -if hostname.startswith("dandy"): - hpc_snakemake_account = hpc_job_account = "prod" - hpc_job_partition = "compregular" - hpc_snakemake_partition = "compsnake" - hpc_snakemake_qos = "" - hpc_job_qos = "" -elif hostname.startswith("rubus"): - hpc_snakemake_account = hpc_job_account = "bench" - hpc_snakemake_partition = hpc_job_partition = "rubus" - hpc_snakemake_qos = "long" - hpc_job_qos = "normal" -else: - logging.error(f"Host {hostname} not supported yet!") - exit(-1) - - -# Workflow run command -if args.snakemake_submit: - logging.info(f"Workflows will be submitted to the {args.run} HPC, on:") - cmd = f"sbatch --chdir {{id}} --job-name {{id}}" - if hpc_snakemake_account: - logging.info(f" - account: {hpc_snakemake_account}") - cmd += f" --account {hpc_snakemake_account}" - if hpc_snakemake_partition: - logging.info(f" - partition: {hpc_snakemake_partition}") - cmd += f" --partition {hpc_snakemake_partition}" - if hpc_snakemake_qos: - logging.info(f" - qos: {hpc_snakemake_qos}") - cmd += f" --qos {hpc_snakemake_qos}" - cmd += f" --cpus-per-task 1 --mem 1G --time 5-00 --no-requeue --wrap=" -else: - logging.info(f"Workflows will be run locally on host {hostname}") - cmd = f"env --chdir={{id}} bash -c " - - -# Jobs run command -if args.run == "local": - logging.info(f"Running jobs locally") -elif args.run == "slurm": - logging.info( - f"Jobs will be submitted to the {args.run} HPC, on account '{hpc_job_account}' and partition '{hpc_job_partition}'." - ) - extra_args += ( - f" --executor {args.run} --default-resources slurm_account={hpc_job_account} slurm_partition={hpc_job_partition}" - + (f" --slurm-qos {hpc_job_qos}" if hpc_job_qos else "") - ) - - # Read job list logging.info("Reading input file(s)") df = pd.concat( @@ -151,17 +86,54 @@ for job_list in args.job_list ] ) + + n_jobs = df.shape[0] logging.info(f"Launching {n_jobs} jobs") logging.debug(df) -# Print command -logging.info("Build command") +logging.info("Build base command") +local_repo_root = Path(__file__).resolve(strict=True).parent.parent +if args.pixi_path is not None: + pixi_path = str(args.pixi_path) +elif args.workflow == "local": + pixi_path = str(local_repo_root) +elif args.workflow == "prod": + pixi_path = "/projects/caeg/apps/aeDNA" +elif args.workflow == "prod-legacy": + pixi_path = "/projects/caeg/apps/aeDNA" +elif args.workflow == "prod-test": + pixi_path = "/projects/caeg/people/lnc113/workflows/aeDNA/aeDNA" +elif args.workflow == "caterpillar": + pixi_path = "/projects/caeg/people/lnc113/workflows/caterpillar" + +if args.workflow_path is not None: + workflow_path = str(args.workflow_path) +elif args.workflow == "local": + workflow_path = str(local_repo_root) +elif args.workflow == "prod": + workflow_path = "/projects/caeg/apps/aeDNA" +elif args.workflow == "prod-legacy": + workflow_path = "/projects/caeg/apps/aeDNA-legacy" +elif args.workflow == "prod-test": + workflow_path = "/projects/caeg/people/lnc113/workflows/aeDNA/aeDNA" +elif args.workflow == "caterpillar": + workflow_path = "/projects/caeg/people/lnc113/workflows/caterpillar" + +cmd = f"pixi run --manifest-path {pixi_path} snakemake --snakefile {workflow_path}/workflow/Snakefile --workflow-profile /projects/caeg/data/resources/profile {extra_args}" + + +# Print command for id in df.index: - print( - f'{cmd.format(id=id)}"pixi run --manifest-path {pixi_env} snakemake --snakefile {workflow_path}/workflow/Snakefile --workflow-profile /projects/caeg/data/resources/profile {args.snakemake_logger} {extra_args}"; sleep 0.5' - ) + if args.scheduler == "": + print(f"env --chdir={id} {cmd}") + elif args.scheduler == "slurm": + print( + f'sbatch --chdir {id} --job-name {id} --account prod --partition {args.partition} --cpus-per-task 1 --mem 1G --time 5-00 --no-requeue --wrap "{cmd} --profile /projects/caeg/data/resources/profile_production --executor slurm --retries 1"; sleep 1' + ) + else: + logging.warning(f"HPC scheduler {args.scheduler} not supported!") exit(0) diff --git a/scripts/make_units.py b/scripts/make_units.py index 9d596eb..6c5f8be 100755 --- a/scripts/make_units.py +++ b/scripts/make_units.py @@ -42,14 +42,14 @@ def gzip_n_lines(in_gzip): "--in-regex", action="store", type=str, - default="^(?!Undetermined).*.(sam|bam|cram|fastq|fq)(.gz)?$", + default="^(?!Undetermined).*.(sam|bam|fastq|fq)(.gz)?$", help="Regex to filter input files", ) parser.add_argument( "-r", "--regex", action="store", - default=r"\/(?P\d{8})_(?P[A-Z]{1,2}\d{5})_(?P\d{4})_(?P[AB])(?P[A-Z0-9]{9})(_(?P[A-Z0-9]+))?(_\w+)?\/(?P[^\/]+)\/(?PLV\d{10})(-(?P[^_]+))?-(?P[^_]+)_(?PS\d+)_(?PL\d{3})_(?PR[12])_001", + default=r"\/(?P\d{8})_(?PA\d{5})_(?P\d{4})_(?P[AB])(?PH[A-Z0-9]{8})(_(?P[A-Z0-9]+))?(_\w+)?\/(?P[^\/]+)\/(?PLV\d{10})-(?P[^_-]+)-(?P[^_]+)_(?PS\d+)_(?PL\d{3})_(?PR[12])_001", help="Regex to extract identifiers. For help, see: https://docs.python.org/3/library/re.html#regular-expression-syntax", ) parser.add_argument( @@ -243,7 +243,7 @@ def gzip_n_lines(in_gzip): # Add row to DF row = pd.DataFrame([row]) - logging.debug(f"\n{row.iloc[0]}") + logging.debug(row) units = pd.concat([units, row]) @@ -252,14 +252,6 @@ def gzip_n_lines(in_gzip): exit(0) -# Add metadata -for metadata_default in args.metadata_default: - if metadata_default.find("=") > 0: - key, value = metadata_default.split("=") - if key not in units: - units[key] = value - - ###################### ### FORMAT COLUMNS ### ###################### @@ -268,9 +260,12 @@ def gzip_n_lines(in_gzip): units["date"] = pd.to_datetime(units["date"]) -# Remove adapters if file SAM/BAM/CRAM -units.loc[units.data.str.contains(r"\.(?:sam|bam|cram)$"), "adapters"] = pd.NA - +# Add metadata +for metadata_default in args.metadata_default: + if metadata_default.find("=") > 0: + key, value = metadata_default.split("=") + if key not in units: + units[key] = value # Fix invalid values fix_cols = units.columns.drop("data") @@ -310,27 +305,39 @@ def gzip_n_lines(in_gzip): # Add workflow_ver (current workflow version), if present in out_path if "workflow_ver" in out_path_wildcards: - import git - - repo = git.Repo(Path(__file__).resolve(strict=True).parent.parent) - commits = pd.DataFrame( - [[commit.hexsha, commit.committed_date] for commit in repo.iter_commits()], - columns=["hexsha", "date"], - ).sort_values(by="date") - tags = pd.DataFrame( - [[tag.commit.hexsha, tag.name] for tag in repo.tags], columns=["hexsha", "tag"] - ) - commits = pd.merge(commits, tags, how="left", on="hexsha") - # if no tag, use commit hexsha - commits["tag"] = commits["tag"].fillna(commits["hexsha"]) + repo_root = Path(__file__).resolve(strict=True).parent.parent + workflow_ver = None + + # Prefer GitPython if available, but do not require it. + try: + import git # type: ignore + + repo = git.Repo(repo_root) + try: + workflow_ver = repo.git.describe("--tags", "--always") + except Exception: + workflow_ver = repo.head.commit.hexsha[:12] + except ModuleNotFoundError: + logging.warning( + "GitPython not installed; falling back to `git describe` for workflow_ver." + ) - # Sanity check - commits_no_tag = commits[commits.tag.str.len() == 40] - assert all( - commits_no_tag["hexsha"].eq(commits_no_tag["tag"]) - ), "Commits HEX SHA do not match!" + if workflow_ver is None: + import subprocess + + try: + workflow_ver = subprocess.check_output( + ["git", "-C", str(repo_root), "describe", "--tags", "--always"], + text=True, + stderr=subprocess.DEVNULL, + ).strip() + except Exception: + workflow_ver = "unknown" + logging.warning( + "Could not determine workflow_ver from git; using 'unknown'." + ) - units["workflow_ver"] = commits.iloc[-1]["tag"] + units["workflow_ver"] = workflow_ver # Reorder columns @@ -348,8 +355,8 @@ def gzip_n_lines(in_gzip): # Sort rows units = units.sort_values(by=list(units.columns.drop(["size_kb", "n_reads"]).values)) logging.info(f"Units file has {units.shape[0]} rows and {units.shape[1]} columns.") -logging.debug(f"\n{units}") -logging.debug(f"\n{units.dtypes}") +logging.debug(units) +logging.debug(units.dtypes) ##################### @@ -360,7 +367,7 @@ def gzip_n_lines(in_gzip): for keys, units in units.groupby(out_path_wildcards, group_keys=True): name = dict(zip(out_path_wildcards, keys)) logging.debug(f"Group units with output path wildcards: {name}") - logging.debug(f"\n{units}") + logging.debug(units) # Create output path out_path = Path(args.out_path.format(**name)) datasets.append((out_path, units)) @@ -391,7 +398,7 @@ def gzip_n_lines(in_gzip): logging.debug(pd.concat(out_stats)) with open(args.out_stats, "x") as out_stat_fh: - np.set_printoptions(legacy="1.21") + np.set_printoptions(legacy="1.25") out_stat_fh.write(f"# {args}\n") pd.concat(out_stats).dropna(axis=1, how="all").to_csv( out_stat_fh, @@ -403,14 +410,12 @@ def gzip_n_lines(in_gzip): ) -for out_path, units in datasets: - if args.dryrun: - assert not out_path.exists() - else: +if not args.dryrun: + for out_path, units in datasets: # Create folders out_path.mkdir(parents=True, exist_ok=args.force) # Save units.tsv file - units.drop(["extra_file_md5"], axis=1).dropna(axis=1, how="all").to_csv( + units.dropna(axis=1, how="all").to_csv( out_path / "units.tsv", sep="\t", index=False, From 62c04ee00fc4b9a97c5359fbfede96e856c735b6 Mon Sep 17 00:00:00 2001 From: Julian Regalado Date: Fri, 29 May 2026 12:59:36 +0200 Subject: [PATCH 2/6] added --account and --jobs parameters --- scripts/launch_runs.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/scripts/launch_runs.py b/scripts/launch_runs.py index 732a6d1..58040d8 100755 --- a/scripts/launch_runs.py +++ b/scripts/launch_runs.py @@ -54,6 +54,19 @@ default="compsnake", help="Slurm partition (only used when --scheduler slurm).", ) +parser.add_argument( + "--account", + action="store", + default="prod", + help="Slurm account (only used when --scheduler slurm).", +) +parser.add_argument( + "--jobs", + action="store", + type=int, + default=100, + help="Maximum number of parallel jobs submitted to slurm (passed as --jobs to snakemake).", +) parser.add_argument( "-l", "--loglevel", @@ -131,7 +144,7 @@ print(f"env --chdir={id} {cmd}") elif args.scheduler == "slurm": print( - f'sbatch --chdir {id} --job-name {id} --account prod --partition {args.partition} --cpus-per-task 1 --mem 1G --time 5-00 --no-requeue --wrap "{cmd} --profile /projects/caeg/data/resources/profile_production --executor slurm --retries 1"; sleep 1' + f'sbatch --chdir {id} --job-name {id} --account {args.account} --partition {args.partition} --cpus-per-task 1 --mem 1G --time 5-00 --no-requeue --wrap "{cmd} --profile /projects/caeg/data/resources/profile_production --executor slurm --jobs {args.jobs} --retries 1"; sleep 1' ) else: logging.warning(f"HPC scheduler {args.scheduler} not supported!") From a3fb3d7b2b7b59fe921e40fe5354781106dba684 Mon Sep 17 00:00:00 2001 From: Julian Regalado Date: Mon, 1 Jun 2026 10:51:17 +0200 Subject: [PATCH 3/6] Improve make_units robustness for minimal input paths and missing dependencies add fallback defaults for missing out-path wildcards (project, library, flowcell_pos, flowcell, lane, date, workflow_ver, extra_file_md5, sample) warn when out-path fields are missing from parsed metadata and defaults are applied normalize date after metadata injection and replace invalid dates with today to keep formatting stable make workflow version detection resilient when GitPython is unavailable by falling back to git describe (and finally unknown) replace assert on missing --extra-file with argparse parser error for a clean, user-friendly failure message --- scripts/make_units.py | 46 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 40 insertions(+), 6 deletions(-) diff --git a/scripts/make_units.py b/scripts/make_units.py index 6c5f8be..7dff10a 100755 --- a/scripts/make_units.py +++ b/scripts/make_units.py @@ -173,7 +173,11 @@ def gzip_n_lines(in_gzip): else: args.extra_file = Path(args.extra_file) extra_file_name = args.extra_file.name -assert args.extra_file.exists(), "Extra file does not exist." +if not args.extra_file.exists(): + parser.error( + "Extra file does not exist: " + f"{args.extra_file}. Pass --extra-file name:/path/to/file with a valid path." + ) # Add extra metadata @@ -191,6 +195,20 @@ def gzip_n_lines(in_gzip): set([key.split("[")[0] for _, key, _, _ in Formatter().parse(args.out_path) if key]) ) +# Default values used when an out-path wildcard is not present in parsed metadata. +# Keep date as datetime so format specs like {date:%Y%m%d} work. +out_path_defaults = { + "sample": "sample", + "project": "project", + "library": "library", + "flowcell_pos": "X", + "flowcell": "HXXXXXXXX", + "lane": "L001", + "date": pd.Timestamp.today().normalize(), + "workflow_ver": "unknown", + "extra_file_md5": "noextra", +} + ########### ### LOG ### @@ -255,11 +273,6 @@ def gzip_n_lines(in_gzip): ###################### ### FORMAT COLUMNS ### ###################### -# Format date column (if present) -if "date" in units.columns.values: - units["date"] = pd.to_datetime(units["date"]) - - # Add metadata for metadata_default in args.metadata_default: if metadata_default.find("=") > 0: @@ -267,10 +280,31 @@ def gzip_n_lines(in_gzip): if key not in units: units[key] = value +# Add missing fields used in out-path formatting. +missing_out_path_fields = [ + key for key in out_path_wildcards if key not in units.columns.values +] +if missing_out_path_fields: + logging.warning( + "Missing out-path fields in parsed metadata: %s. Applying defaults.", + ", ".join(sorted(missing_out_path_fields)), + ) +for key in missing_out_path_fields: + units[key] = out_path_defaults.get(key, f"unknown_{key}") + # Fix invalid values fix_cols = units.columns.drop("data") units[fix_cols] = units[fix_cols].replace(args.rm_chars, value="", regex=True) +# Normalize date column after metadata/default injection. +if "date" in units.columns.values: + units["date"] = pd.to_datetime(units["date"], errors="coerce") + if units["date"].isna().any(): + logging.warning( + "Some date values could not be parsed. Using today's date for invalid rows." + ) + units.loc[units["date"].isna(), "date"] = out_path_defaults["date"] + # Fix seq_type info and collapse if "read" in units: From cb448227d3f9c1a968616640574097a4c2f085da Mon Sep 17 00:00:00 2001 From: Julian Regalado Date: Mon, 1 Jun 2026 12:00:38 +0200 Subject: [PATCH 4/6] Addressed PR #29 --- scripts/make_units.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/scripts/make_units.py b/scripts/make_units.py index 7dff10a..3682507 100755 --- a/scripts/make_units.py +++ b/scripts/make_units.py @@ -49,7 +49,7 @@ def gzip_n_lines(in_gzip): "-r", "--regex", action="store", - default=r"\/(?P\d{8})_(?PA\d{5})_(?P\d{4})_(?P[AB])(?PH[A-Z0-9]{8})(_(?P[A-Z0-9]+))?(_\w+)?\/(?P[^\/]+)\/(?PLV\d{10})-(?P[^_-]+)-(?P[^_]+)_(?PS\d+)_(?PL\d{3})_(?PR[12])_001", + default=r"\/(?P\d{8})_(?P[A-Z]{1,2}\d{5})_(?P\d{4})_(?P[AB])(?P[A-Z0-9]{9})(_(?P[A-Z0-9]+))?(_\w+)?\/(?P[^\/]+)\/(?PLV\d{10})(-(?P[^_]+))?-(?P[^_]+)_(?PS\d+)_(?PL\d{3})_(?PR[12])_001", help="Regex to extract identifiers. For help, see: https://docs.python.org/3/library/re.html#regular-expression-syntax", ) parser.add_argument( @@ -389,8 +389,8 @@ def gzip_n_lines(in_gzip): # Sort rows units = units.sort_values(by=list(units.columns.drop(["size_kb", "n_reads"]).values)) logging.info(f"Units file has {units.shape[0]} rows and {units.shape[1]} columns.") -logging.debug(units) -logging.debug(units.dtypes) +logging.debug(f"\n{units}") +logging.debug(f"\n{units.dtypes}") ##################### @@ -444,8 +444,10 @@ def gzip_n_lines(in_gzip): ) -if not args.dryrun: - for out_path, units in datasets: +for out_path, units in datasets: + if args.dryrun: + assert not out_path.exists() + else: # Create folders out_path.mkdir(parents=True, exist_ok=args.force) # Save units.tsv file From 58d187f1df1855e59924f2e90d56f8291f3b7917 Mon Sep 17 00:00:00 2001 From: Julian Regalado Date: Mon, 1 Jun 2026 13:36:19 +0200 Subject: [PATCH 5/6] Resolving second round of comments --- scripts/launch_runs.py | 8 +++---- scripts/make_units.py | 48 +++++++++++------------------------------- 2 files changed, 15 insertions(+), 41 deletions(-) diff --git a/scripts/launch_runs.py b/scripts/launch_runs.py index 58040d8..39ae80a 100755 --- a/scripts/launch_runs.py +++ b/scripts/launch_runs.py @@ -77,7 +77,7 @@ ) args, extra_args = parser.parse_known_args() extra_args = " ".join(extra_args) - +extra_args += " --jobs 300 --retries 1" # Set logger loglevel = getattr(logging, args.loglevel.upper(), None) @@ -100,11 +100,9 @@ ] ) - n_jobs = df.shape[0] logging.info(f"Launching {n_jobs} jobs") -logging.debug(df) - +logging.debug(f"\n{df}") logging.info("Build base command") local_repo_root = Path(__file__).resolve(strict=True).parent.parent @@ -144,7 +142,7 @@ print(f"env --chdir={id} {cmd}") elif args.scheduler == "slurm": print( - f'sbatch --chdir {id} --job-name {id} --account {args.account} --partition {args.partition} --cpus-per-task 1 --mem 1G --time 5-00 --no-requeue --wrap "{cmd} --profile /projects/caeg/data/resources/profile_production --executor slurm --jobs {args.jobs} --retries 1"; sleep 1' + f'sbatch --chdir {id} --job-name {id} --account {args.account} --partition {args.partition} --cpus-per-task 1 --mem 1G --time 5-00 --no-requeue --wrap "{cmd} --profile /projects/caeg/data/resources/profile_production --executor slurm {extra_args}"; sleep 1' ) else: logging.warning(f"HPC scheduler {args.scheduler} not supported!") diff --git a/scripts/make_units.py b/scripts/make_units.py index 3682507..8b0e7c3 100755 --- a/scripts/make_units.py +++ b/scripts/make_units.py @@ -91,6 +91,7 @@ def gzip_n_lines(in_gzip): action="store", nargs="+", default=[ + f"date={pd.Timestamp.today().normalize().strftime('%Y-%m-%d')}", "sample=Lib", "material=DNA", "flowcell_pos=X", @@ -195,8 +196,6 @@ def gzip_n_lines(in_gzip): set([key.split("[")[0] for _, key, _, _ in Formatter().parse(args.out_path) if key]) ) -# Default values used when an out-path wildcard is not present in parsed metadata. -# Keep date as datetime so format specs like {date:%Y%m%d} work. out_path_defaults = { "sample": "sample", "project": "project", @@ -261,7 +260,7 @@ def gzip_n_lines(in_gzip): # Add row to DF row = pd.DataFrame([row]) - logging.debug(row) + logging.debug(f"\n{row.iloc[0]}") units = pd.concat([units, row]) @@ -339,39 +338,16 @@ def gzip_n_lines(in_gzip): # Add workflow_ver (current workflow version), if present in out_path if "workflow_ver" in out_path_wildcards: - repo_root = Path(__file__).resolve(strict=True).parent.parent - workflow_ver = None - - # Prefer GitPython if available, but do not require it. - try: - import git # type: ignore - - repo = git.Repo(repo_root) - try: - workflow_ver = repo.git.describe("--tags", "--always") - except Exception: - workflow_ver = repo.head.commit.hexsha[:12] - except ModuleNotFoundError: - logging.warning( - "GitPython not installed; falling back to `git describe` for workflow_ver." - ) - - if workflow_ver is None: - import subprocess - - try: - workflow_ver = subprocess.check_output( - ["git", "-C", str(repo_root), "describe", "--tags", "--always"], - text=True, - stderr=subprocess.DEVNULL, - ).strip() - except Exception: - workflow_ver = "unknown" - logging.warning( - "Could not determine workflow_ver from git; using 'unknown'." - ) + import git - units["workflow_ver"] = workflow_ver + repo = git.Repo(Path(__file__).resolve(strict=True).parent.parent) + tag_recent = [ + [tag.name, commit.hexsha] + for commit in repo.iter_commits() + for tag in repo.tags + if commit.hexsha == tag.commit.hexsha + ][0] + units["workflow_ver"] = tag_recent[0] # Reorder columns @@ -401,7 +377,7 @@ def gzip_n_lines(in_gzip): for keys, units in units.groupby(out_path_wildcards, group_keys=True): name = dict(zip(out_path_wildcards, keys)) logging.debug(f"Group units with output path wildcards: {name}") - logging.debug(units) + logging.debug(f"\n{units}") # Create output path out_path = Path(args.out_path.format(**name)) datasets.append((out_path, units)) From 64724de0b9c84695b3e6139dbd14752d30e736de Mon Sep 17 00:00:00 2001 From: Julian Regalado Date: Tue, 2 Jun 2026 09:30:06 +0200 Subject: [PATCH 6/6] Resolving third round of comments --- scripts/launch_runs.py | 19 ++++++++++++------- scripts/make_units.py | 26 ++++---------------------- 2 files changed, 16 insertions(+), 29 deletions(-) diff --git a/scripts/launch_runs.py b/scripts/launch_runs.py index 39ae80a..9cc3957 100755 --- a/scripts/launch_runs.py +++ b/scripts/launch_runs.py @@ -60,13 +60,6 @@ default="prod", help="Slurm account (only used when --scheduler slurm).", ) -parser.add_argument( - "--jobs", - action="store", - type=int, - default=100, - help="Maximum number of parallel jobs submitted to slurm (passed as --jobs to snakemake).", -) parser.add_argument( "-l", "--loglevel", @@ -88,6 +81,18 @@ datefmt="%Y-%m-%d %H:%M:%S", ) +# Jobs run command +if args.run == "local": + logging.info(f"Running jobs locally") +elif args.run == "slurm": + logging.info( + f"Jobs will be submitted to the {args.run} HPC, on account '{hpc_job_account}' and partition '{hpc_job_partition}'." + ) + extra_args += ( + f" --executor {args.run} --default-resources slurm_account={hpc_job_account} slurm_partition={hpc_job_partition}" + + (f" --slurm-qos {hpc_job_qos}" if hpc_job_qos else "") + ) + # Read job list logging.info("Reading input file(s)") diff --git a/scripts/make_units.py b/scripts/make_units.py index 8b0e7c3..36718a6 100755 --- a/scripts/make_units.py +++ b/scripts/make_units.py @@ -196,18 +196,6 @@ def gzip_n_lines(in_gzip): set([key.split("[")[0] for _, key, _, _ in Formatter().parse(args.out_path) if key]) ) -out_path_defaults = { - "sample": "sample", - "project": "project", - "library": "library", - "flowcell_pos": "X", - "flowcell": "HXXXXXXXX", - "lane": "L001", - "date": pd.Timestamp.today().normalize(), - "workflow_ver": "unknown", - "extra_file_md5": "noextra", -} - ########### ### LOG ### @@ -288,8 +276,6 @@ def gzip_n_lines(in_gzip): "Missing out-path fields in parsed metadata: %s. Applying defaults.", ", ".join(sorted(missing_out_path_fields)), ) -for key in missing_out_path_fields: - units[key] = out_path_defaults.get(key, f"unknown_{key}") # Fix invalid values fix_cols = units.columns.drop("data") @@ -339,15 +325,11 @@ def gzip_n_lines(in_gzip): # Add workflow_ver (current workflow version), if present in out_path if "workflow_ver" in out_path_wildcards: import git - repo = git.Repo(Path(__file__).resolve(strict=True).parent.parent) - tag_recent = [ - [tag.name, commit.hexsha] - for commit in repo.iter_commits() - for tag in repo.tags - if commit.hexsha == tag.commit.hexsha - ][0] - units["workflow_ver"] = tag_recent[0] + try: + units["workflow_ver"] = repo.git.describe("--tags", "--exact-match") + except Exception: + units["workflow_ver"] = repo.head.commit.hexsha # Reorder columns