Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 66 additions & 78 deletions scripts/launch_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,43 @@
help="Path to stats file",
)
parser.add_argument(
"-w",
"--workflow",
action="store",
choices=["local", "prod", "prod-legacy", "prod-test", "caterpillar"],
default="prod",
choices=["prod", "prod-legacy", "prod-test", "caterpillar"],
help="Workflow to use",
)
parser.add_argument(
"-r",
"--run",
"--pixi-path",
action="store",
default="local",
choices=["local", "slurm"],
help="How to run jobs?",
type=Path,
default=None,
help="Path passed to `pixi run --manifest-path`. Overrides --workflow defaults.",
)
parser.add_argument(
"--workflow-path",
action="store",
type=Path,
default=None,
help="Path to workflow repo root (must contain `workflow/Snakefile`). Overrides --workflow defaults.",
)
parser.add_argument(
"--snakemake-submit",
action="store_true",
default=False,
help="Submit Snakemake as an HPC job",
"--scheduler",
action="store",
default="slurm",
help="HPC scheduler",
)
parser.add_argument(
"--snakemake-logger",
"--partition",
action="store",
default="--logger snkmt --logger-snkmt-db snkmt.db",
help="Snakemake logger command",
default="compsnake",
help="Slurm partition (only used when --scheduler slurm).",
)
parser.add_argument(
"--account",
action="store",
default="prod",
help="Slurm account (only used when --scheduler slurm).",
)
parser.add_argument(
"-l",
Expand All @@ -61,7 +72,6 @@
extra_args = " ".join(extra_args)
extra_args += " --jobs 300 --retries 1"


# Set logger
loglevel = getattr(logging, args.loglevel.upper(), None)
logging.basicConfig(
Expand All @@ -71,63 +81,6 @@
datefmt="%Y-%m-%d %H:%M:%S",
)


# Infer pixi_env/workflow paths, and add extra options
if args.workflow == "prod":
pixi_env = workflow_path = "/projects/caeg/apps/aeDNA"
import os

if os.environ.get("CAEG_QC_USER") and os.environ.get("CAEG_QC_PASSWORD"):
# extra_args += """ --config 'report={multiqc_db_url: "postgresql+psycopg2://dandypdb01fl.unicph.domain:5432/caeg_qc"}'"""
extra_args += " --profile /projects/caeg/data/resources/profile_production"
elif args.workflow == "prod-legacy":
pixi_env = "/projects/caeg/apps/aeDNA"
workflow_path = "/projects/caeg/apps/aeDNA-legacy"
elif args.workflow == "prod-test":
pixi_env = workflow_path = "/projects/caeg/people/lnc113/workflows/aeDNA/aeDNA"
elif args.workflow == "caterpillar":
pixi_env = workflow_path = "/projects/caeg/people/lnc113/workflows/caterpillar"


# Infer hostname, HPC account and partition
import socket

hostname = socket.gethostname()
if hostname.startswith("dandy"):
hpc_snakemake_account = hpc_job_account = "prod"
hpc_job_partition = "compregular"
hpc_snakemake_partition = "compsnake"
hpc_snakemake_qos = ""
hpc_job_qos = ""
elif hostname.startswith("rubus"):
hpc_snakemake_account = hpc_job_account = "bench"
hpc_snakemake_partition = hpc_job_partition = "rubus"
hpc_snakemake_qos = "long"
hpc_job_qos = "normal"
else:
logging.error(f"Host {hostname} not supported yet!")
exit(-1)


# Workflow run command
if args.snakemake_submit:
logging.info(f"Workflows will be submitted to the {args.run} HPC, on:")
cmd = f"sbatch --chdir {{id}} --job-name {{id}}"
if hpc_snakemake_account:
logging.info(f" - account: {hpc_snakemake_account}")
cmd += f" --account {hpc_snakemake_account}"
if hpc_snakemake_partition:
logging.info(f" - partition: {hpc_snakemake_partition}")
cmd += f" --partition {hpc_snakemake_partition}"
if hpc_snakemake_qos:
logging.info(f" - qos: {hpc_snakemake_qos}")
cmd += f" --qos {hpc_snakemake_qos}"
cmd += f" --cpus-per-task 1 --mem 1G --time 5-00 --no-requeue --wrap="
else:
logging.info(f"Workflows will be run locally on host {hostname}")
cmd = f"env --chdir={{id}} bash -c "


# Jobs run command
if args.run == "local":
logging.info(f"Running jobs locally")
Expand All @@ -151,17 +104,52 @@
for job_list in args.job_list
]
)

n_jobs = df.shape[0]
logging.info(f"Launching {n_jobs} jobs")
logging.debug(df)
logging.debug(f"\n{df}")

logging.info("Build base command")
local_repo_root = Path(__file__).resolve(strict=True).parent.parent

# Print command
logging.info("Build command")
if args.pixi_path is not None:
pixi_path = str(args.pixi_path)
elif args.workflow == "local":
pixi_path = str(local_repo_root)
elif args.workflow == "prod":
pixi_path = "/projects/caeg/apps/aeDNA"
elif args.workflow == "prod-legacy":
pixi_path = "/projects/caeg/apps/aeDNA"
elif args.workflow == "prod-test":
pixi_path = "/projects/caeg/people/lnc113/workflows/aeDNA/aeDNA"
elif args.workflow == "caterpillar":
pixi_path = "/projects/caeg/people/lnc113/workflows/caterpillar"

if args.workflow_path is not None:
workflow_path = str(args.workflow_path)
elif args.workflow == "local":
workflow_path = str(local_repo_root)
elif args.workflow == "prod":
workflow_path = "/projects/caeg/apps/aeDNA"
elif args.workflow == "prod-legacy":
workflow_path = "/projects/caeg/apps/aeDNA-legacy"
elif args.workflow == "prod-test":
workflow_path = "/projects/caeg/people/lnc113/workflows/aeDNA/aeDNA"
elif args.workflow == "caterpillar":
workflow_path = "/projects/caeg/people/lnc113/workflows/caterpillar"

cmd = f"pixi run --manifest-path {pixi_path} snakemake --snakefile {workflow_path}/workflow/Snakefile --workflow-profile /projects/caeg/data/resources/profile {extra_args}"


# Print command
for id in df.index:
print(
f'{cmd.format(id=id)}"pixi run --manifest-path {pixi_env} snakemake --snakefile {workflow_path}/workflow/Snakefile --workflow-profile /projects/caeg/data/resources/profile {args.snakemake_logger} {extra_args}"; sleep 0.5'
)
if args.scheduler == "":
print(f"env --chdir={id} {cmd}")
elif args.scheduler == "slurm":
print(
f'sbatch --chdir {id} --job-name {id} --account {args.account} --partition {args.partition} --cpus-per-task 1 --mem 1G --time 5-00 --no-requeue --wrap "{cmd} --profile /projects/caeg/data/resources/profile_production --executor slurm {extra_args}"; sleep 1'
)
else:
logging.warning(f"HPC scheduler {args.scheduler} not supported!")

exit(0)
69 changes: 34 additions & 35 deletions scripts/make_units.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def gzip_n_lines(in_gzip):
"--in-regex",
action="store",
type=str,
default="^(?!Undetermined).*.(sam|bam|cram|fastq|fq)(.gz)?$",
default="^(?!Undetermined).*.(sam|bam|fastq|fq)(.gz)?$",
help="Regex to filter input files",
)
parser.add_argument(
Expand Down Expand Up @@ -91,6 +91,7 @@ def gzip_n_lines(in_gzip):
action="store",
nargs="+",
default=[
f"date={pd.Timestamp.today().normalize().strftime('%Y-%m-%d')}",
"sample=Lib",
"material=DNA",
"flowcell_pos=X",
Expand Down Expand Up @@ -173,7 +174,11 @@ def gzip_n_lines(in_gzip):
else:
args.extra_file = Path(args.extra_file)
extra_file_name = args.extra_file.name
assert args.extra_file.exists(), "Extra file does not exist."
if not args.extra_file.exists():
parser.error(
"Extra file does not exist: "
f"{args.extra_file}. Pass --extra-file name:/path/to/file with a valid path."
)


# Add extra metadata
Expand Down Expand Up @@ -252,30 +257,39 @@ def gzip_n_lines(in_gzip):
exit(0)


######################
### FORMAT COLUMNS ###
######################
# Add metadata
for metadata_default in args.metadata_default:
if metadata_default.find("=") > 0:
key, value = metadata_default.split("=")
if key not in units:
units[key] = value


######################
### FORMAT COLUMNS ###
######################
# Format date column (if present)
if "date" in units.columns.values:
units["date"] = pd.to_datetime(units["date"])


# Remove adapters if file SAM/BAM/CRAM
units.loc[units.data.str.contains(r"\.(?:sam|bam|cram)$"), "adapters"] = pd.NA

# Add missing fields used in out-path formatting.
missing_out_path_fields = [
key for key in out_path_wildcards if key not in units.columns.values
]
if missing_out_path_fields:
logging.warning(
"Missing out-path fields in parsed metadata: %s. Applying defaults.",
", ".join(sorted(missing_out_path_fields)),
Comment on lines +271 to +277

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won;t this clash with lines 263-268?

)

# Fix invalid values
fix_cols = units.columns.drop("data")
units[fix_cols] = units[fix_cols].replace(args.rm_chars, value="", regex=True)

# Normalize date column after metadata/default injection.
if "date" in units.columns.values:
units["date"] = pd.to_datetime(units["date"], errors="coerce")
if units["date"].isna().any():
logging.warning(
"Some date values could not be parsed. Using today's date for invalid rows."
)
units.loc[units["date"].isna(), "date"] = out_path_defaults["date"]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels a bit dangerous.

Suggested change
units.loc[units["date"].isna(), "date"] = out_path_defaults["date"]



# Fix seq_type info and collapse
if "read" in units:
Expand Down Expand Up @@ -311,26 +325,11 @@ def gzip_n_lines(in_gzip):
# Add workflow_ver (current workflow version), if present in out_path
if "workflow_ver" in out_path_wildcards:
import git

repo = git.Repo(Path(__file__).resolve(strict=True).parent.parent)
commits = pd.DataFrame(
[[commit.hexsha, commit.committed_date] for commit in repo.iter_commits()],
columns=["hexsha", "date"],
).sort_values(by="date")
tags = pd.DataFrame(
[[tag.commit.hexsha, tag.name] for tag in repo.tags], columns=["hexsha", "tag"]
)
commits = pd.merge(commits, tags, how="left", on="hexsha")
# if no tag, use commit hexsha
commits["tag"] = commits["tag"].fillna(commits["hexsha"])

# Sanity check
commits_no_tag = commits[commits.tag.str.len() == 40]
assert all(
commits_no_tag["hexsha"].eq(commits_no_tag["tag"])
), "Commits HEX SHA do not match!"

units["workflow_ver"] = commits.iloc[-1]["tag"]
try:
units["workflow_ver"] = repo.git.describe("--tags", "--exact-match")
except Exception:
units["workflow_ver"] = repo.head.commit.hexsha


# Reorder columns
Expand Down Expand Up @@ -391,7 +390,7 @@ def gzip_n_lines(in_gzip):
logging.debug(pd.concat(out_stats))

with open(args.out_stats, "x") as out_stat_fh:
np.set_printoptions(legacy="1.21")
np.set_printoptions(legacy="1.25")
out_stat_fh.write(f"# {args}\n")
pd.concat(out_stats).dropna(axis=1, how="all").to_csv(
out_stat_fh,
Expand All @@ -410,7 +409,7 @@ def gzip_n_lines(in_gzip):
# Create folders
out_path.mkdir(parents=True, exist_ok=args.force)
# Save units.tsv file
units.drop(["extra_file_md5"], axis=1).dropna(axis=1, how="all").to_csv(
units.dropna(axis=1, how="all").to_csv(

@fgvieira fgvieira Jun 2, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why include the config md5 in units.tsv?

out_path / "units.tsv",
sep="\t",
index=False,
Expand Down
Loading