diff --git a/scripts/launch_runs.py b/scripts/launch_runs.py index 9ec3900..9cc3957 100755 --- a/scripts/launch_runs.py +++ b/scripts/launch_runs.py @@ -22,32 +22,43 @@ help="Path to stats file", ) parser.add_argument( - "-w", "--workflow", action="store", + choices=["local", "prod", "prod-legacy", "prod-test", "caterpillar"], default="prod", - choices=["prod", "prod-legacy", "prod-test", "caterpillar"], help="Workflow to use", ) parser.add_argument( - "-r", - "--run", + "--pixi-path", action="store", - default="local", - choices=["local", "slurm"], - help="How to run jobs?", + type=Path, + default=None, + help="Path passed to `pixi run --manifest-path`. Overrides --workflow defaults.", +) +parser.add_argument( + "--workflow-path", + action="store", + type=Path, + default=None, + help="Path to workflow repo root (must contain `workflow/Snakefile`). Overrides --workflow defaults.", ) parser.add_argument( - "--snakemake-submit", - action="store_true", - default=False, - help="Submit Snakemake as an HPC job", + "--scheduler", + action="store", + default="slurm", + help="HPC scheduler", ) parser.add_argument( - "--snakemake-logger", + "--partition", action="store", - default="--logger snkmt --logger-snkmt-db snkmt.db", - help="Snakemake logger command", + default="compsnake", + help="Slurm partition (only used when --scheduler slurm).", +) +parser.add_argument( + "--account", + action="store", + default="prod", + help="Slurm account (only used when --scheduler slurm).", ) parser.add_argument( "-l", @@ -61,7 +72,6 @@ extra_args = " ".join(extra_args) extra_args += " --jobs 300 --retries 1" - # Set logger loglevel = getattr(logging, args.loglevel.upper(), None) logging.basicConfig( @@ -71,63 +81,6 @@ datefmt="%Y-%m-%d %H:%M:%S", ) - -# Infer pixi_env/workflow paths, and add extra options -if args.workflow == "prod": - pixi_env = workflow_path = "/projects/caeg/apps/aeDNA" - import os - - if os.environ.get("CAEG_QC_USER") and os.environ.get("CAEG_QC_PASSWORD"): - # extra_args += """ --config 'report={multiqc_db_url: "postgresql+psycopg2://dandypdb01fl.unicph.domain:5432/caeg_qc"}'""" - extra_args += " --profile /projects/caeg/data/resources/profile_production" -elif args.workflow == "prod-legacy": - pixi_env = "/projects/caeg/apps/aeDNA" - workflow_path = "/projects/caeg/apps/aeDNA-legacy" -elif args.workflow == "prod-test": - pixi_env = workflow_path = "/projects/caeg/people/lnc113/workflows/aeDNA/aeDNA" -elif args.workflow == "caterpillar": - pixi_env = workflow_path = "/projects/caeg/people/lnc113/workflows/caterpillar" - - -# Infer hostname, HPC account and partition -import socket - -hostname = socket.gethostname() -if hostname.startswith("dandy"): - hpc_snakemake_account = hpc_job_account = "prod" - hpc_job_partition = "compregular" - hpc_snakemake_partition = "compsnake" - hpc_snakemake_qos = "" - hpc_job_qos = "" -elif hostname.startswith("rubus"): - hpc_snakemake_account = hpc_job_account = "bench" - hpc_snakemake_partition = hpc_job_partition = "rubus" - hpc_snakemake_qos = "long" - hpc_job_qos = "normal" -else: - logging.error(f"Host {hostname} not supported yet!") - exit(-1) - - -# Workflow run command -if args.snakemake_submit: - logging.info(f"Workflows will be submitted to the {args.run} HPC, on:") - cmd = f"sbatch --chdir {{id}} --job-name {{id}}" - if hpc_snakemake_account: - logging.info(f" - account: {hpc_snakemake_account}") - cmd += f" --account {hpc_snakemake_account}" - if hpc_snakemake_partition: - logging.info(f" - partition: {hpc_snakemake_partition}") - cmd += f" --partition {hpc_snakemake_partition}" - if hpc_snakemake_qos: - logging.info(f" - qos: {hpc_snakemake_qos}") - cmd += f" --qos {hpc_snakemake_qos}" - cmd += f" --cpus-per-task 1 --mem 1G --time 5-00 --no-requeue --wrap=" -else: - logging.info(f"Workflows will be run locally on host {hostname}") - cmd = f"env --chdir={{id}} bash -c " - - # Jobs run command if args.run == "local": logging.info(f"Running jobs locally") @@ -151,17 +104,52 @@ for job_list in args.job_list ] ) + n_jobs = df.shape[0] logging.info(f"Launching {n_jobs} jobs") -logging.debug(df) +logging.debug(f"\n{df}") +logging.info("Build base command") +local_repo_root = Path(__file__).resolve(strict=True).parent.parent -# Print command -logging.info("Build command") +if args.pixi_path is not None: + pixi_path = str(args.pixi_path) +elif args.workflow == "local": + pixi_path = str(local_repo_root) +elif args.workflow == "prod": + pixi_path = "/projects/caeg/apps/aeDNA" +elif args.workflow == "prod-legacy": + pixi_path = "/projects/caeg/apps/aeDNA" +elif args.workflow == "prod-test": + pixi_path = "/projects/caeg/people/lnc113/workflows/aeDNA/aeDNA" +elif args.workflow == "caterpillar": + pixi_path = "/projects/caeg/people/lnc113/workflows/caterpillar" + +if args.workflow_path is not None: + workflow_path = str(args.workflow_path) +elif args.workflow == "local": + workflow_path = str(local_repo_root) +elif args.workflow == "prod": + workflow_path = "/projects/caeg/apps/aeDNA" +elif args.workflow == "prod-legacy": + workflow_path = "/projects/caeg/apps/aeDNA-legacy" +elif args.workflow == "prod-test": + workflow_path = "/projects/caeg/people/lnc113/workflows/aeDNA/aeDNA" +elif args.workflow == "caterpillar": + workflow_path = "/projects/caeg/people/lnc113/workflows/caterpillar" +cmd = f"pixi run --manifest-path {pixi_path} snakemake --snakefile {workflow_path}/workflow/Snakefile --workflow-profile /projects/caeg/data/resources/profile {extra_args}" + + +# Print command for id in df.index: - print( - f'{cmd.format(id=id)}"pixi run --manifest-path {pixi_env} snakemake --snakefile {workflow_path}/workflow/Snakefile --workflow-profile /projects/caeg/data/resources/profile {args.snakemake_logger} {extra_args}"; sleep 0.5' - ) + if args.scheduler == "": + print(f"env --chdir={id} {cmd}") + elif args.scheduler == "slurm": + print( + f'sbatch --chdir {id} --job-name {id} --account {args.account} --partition {args.partition} --cpus-per-task 1 --mem 1G --time 5-00 --no-requeue --wrap "{cmd} --profile /projects/caeg/data/resources/profile_production --executor slurm {extra_args}"; sleep 1' + ) + else: + logging.warning(f"HPC scheduler {args.scheduler} not supported!") exit(0) diff --git a/scripts/make_units.py b/scripts/make_units.py index 9d596eb..36718a6 100755 --- a/scripts/make_units.py +++ b/scripts/make_units.py @@ -42,7 +42,7 @@ def gzip_n_lines(in_gzip): "--in-regex", action="store", type=str, - default="^(?!Undetermined).*.(sam|bam|cram|fastq|fq)(.gz)?$", + default="^(?!Undetermined).*.(sam|bam|fastq|fq)(.gz)?$", help="Regex to filter input files", ) parser.add_argument( @@ -91,6 +91,7 @@ def gzip_n_lines(in_gzip): action="store", nargs="+", default=[ + f"date={pd.Timestamp.today().normalize().strftime('%Y-%m-%d')}", "sample=Lib", "material=DNA", "flowcell_pos=X", @@ -173,7 +174,11 @@ def gzip_n_lines(in_gzip): else: args.extra_file = Path(args.extra_file) extra_file_name = args.extra_file.name -assert args.extra_file.exists(), "Extra file does not exist." +if not args.extra_file.exists(): + parser.error( + "Extra file does not exist: " + f"{args.extra_file}. Pass --extra-file name:/path/to/file with a valid path." + ) # Add extra metadata @@ -252,6 +257,9 @@ def gzip_n_lines(in_gzip): exit(0) +###################### +### FORMAT COLUMNS ### +###################### # Add metadata for metadata_default in args.metadata_default: if metadata_default.find("=") > 0: @@ -259,23 +267,29 @@ def gzip_n_lines(in_gzip): if key not in units: units[key] = value - -###################### -### FORMAT COLUMNS ### -###################### -# Format date column (if present) -if "date" in units.columns.values: - units["date"] = pd.to_datetime(units["date"]) - - -# Remove adapters if file SAM/BAM/CRAM -units.loc[units.data.str.contains(r"\.(?:sam|bam|cram)$"), "adapters"] = pd.NA - +# Add missing fields used in out-path formatting. +missing_out_path_fields = [ + key for key in out_path_wildcards if key not in units.columns.values +] +if missing_out_path_fields: + logging.warning( + "Missing out-path fields in parsed metadata: %s. Applying defaults.", + ", ".join(sorted(missing_out_path_fields)), + ) # Fix invalid values fix_cols = units.columns.drop("data") units[fix_cols] = units[fix_cols].replace(args.rm_chars, value="", regex=True) +# Normalize date column after metadata/default injection. +if "date" in units.columns.values: + units["date"] = pd.to_datetime(units["date"], errors="coerce") + if units["date"].isna().any(): + logging.warning( + "Some date values could not be parsed. Using today's date for invalid rows." + ) + units.loc[units["date"].isna(), "date"] = out_path_defaults["date"] + # Fix seq_type info and collapse if "read" in units: @@ -311,26 +325,11 @@ def gzip_n_lines(in_gzip): # Add workflow_ver (current workflow version), if present in out_path if "workflow_ver" in out_path_wildcards: import git - repo = git.Repo(Path(__file__).resolve(strict=True).parent.parent) - commits = pd.DataFrame( - [[commit.hexsha, commit.committed_date] for commit in repo.iter_commits()], - columns=["hexsha", "date"], - ).sort_values(by="date") - tags = pd.DataFrame( - [[tag.commit.hexsha, tag.name] for tag in repo.tags], columns=["hexsha", "tag"] - ) - commits = pd.merge(commits, tags, how="left", on="hexsha") - # if no tag, use commit hexsha - commits["tag"] = commits["tag"].fillna(commits["hexsha"]) - - # Sanity check - commits_no_tag = commits[commits.tag.str.len() == 40] - assert all( - commits_no_tag["hexsha"].eq(commits_no_tag["tag"]) - ), "Commits HEX SHA do not match!" - - units["workflow_ver"] = commits.iloc[-1]["tag"] + try: + units["workflow_ver"] = repo.git.describe("--tags", "--exact-match") + except Exception: + units["workflow_ver"] = repo.head.commit.hexsha # Reorder columns @@ -391,7 +390,7 @@ def gzip_n_lines(in_gzip): logging.debug(pd.concat(out_stats)) with open(args.out_stats, "x") as out_stat_fh: - np.set_printoptions(legacy="1.21") + np.set_printoptions(legacy="1.25") out_stat_fh.write(f"# {args}\n") pd.concat(out_stats).dropna(axis=1, how="all").to_csv( out_stat_fh, @@ -410,7 +409,7 @@ def gzip_n_lines(in_gzip): # Create folders out_path.mkdir(parents=True, exist_ok=args.force) # Save units.tsv file - units.drop(["extra_file_md5"], axis=1).dropna(axis=1, how="all").to_csv( + units.dropna(axis=1, how="all").to_csv( out_path / "units.tsv", sep="\t", index=False,