Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions PostProcess/annotation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import List, Tuple, Optional
from time import time

import fcntl
import pysam
import sys
import os
Expand Down Expand Up @@ -59,8 +60,15 @@ def load_annotation_bed(annotation_fname: str) -> TabixFile:
Raises:
SamtoolsError: If the annotation BED file cannot be loaded or indexed.
"""
# check that tabix index is available for all annotation bed
pysam.tabix_index(annotation_fname, force=True, preset="bed")
# check that tabix index is available and up-to-date; use a lock file so
# that parallel annotation processes don't write the .tbi simultaneously
tbi_path = annotation_fname + ".tbi"
lock_path = annotation_fname + ".tbi.lock"
with open(lock_path, "w") as lock_fh:
fcntl.flock(lock_fh, fcntl.LOCK_EX)
if (not os.path.isfile(tbi_path)
or os.path.getmtime(tbi_path) < os.path.getmtime(annotation_fname)):
pysam.tabix_index(annotation_fname, force=True, preset="bed")
try: # return tabix indexes for each annotation bed
return pysam.TabixFile(annotation_fname)
except (SamtoolsError, Exception) as e:
Expand Down
124 changes: 107 additions & 17 deletions PostProcess/convert_gnomAD_vcfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import multiprocessing
import subprocess
import itertools
import pysam
import gzip
import time
Expand All @@ -44,34 +45,65 @@
BCFTOOLSNORM = "bcftools norm"


def parse_commandline(args: List[str]) -> Tuple[str, str, bool, bool, bool, int]:
def parse_commandline(args: List[str]) -> Tuple[str, str, bool, bool, bool, int, frozenset, float, str]:
"""Parse and validate command line arguments for gnomAD VCF conversion.

This function checks the provided arguments for correctness and extracts the
necessary parameters for the gnomAD VCF conversion process. It ensures that
the directory, joint processing flag, and thread count are valid before
returning the parsed values.

Positional args (6 required, 7th optional):
gnomad_vcfs_dir samples_ids joint keep multiallelic threads [filter_pass_values]

Optional keyword args (may appear anywhere after the positional args):
--af-threshold FLOAT Only include variants with AF_joint >= this value
(default: 0.0, i.e. no filtering).
--output-dir PATH Directory for converted output VCFs. Created if
absent. Default: same directory as each input VCF.

Args:
args (List[str]): A list of command line arguments.

Returns:
Tuple[str, str, bool, bool, bool, int]: A tuple containing the gnomAD
VCF directory, sample IDs, a flag indicating whether to use joint
VCF processing, a flag indicating whether to keep files, a flag for
multiallelic processing, and the number of threads to use.
Tuple containing: gnomAD VCF directory, sample IDs, joint flag, keep flag,
multiallelic flag, threads, filter_pass_values frozenset, af_threshold float,
and output_dir string (empty string means same dir as input).

Raises:
ValueError: If the number of arguments is incorrect, if the specified
directory is not valid, or if the number of threads is out of
allowed range.
"""

if len(args) != 6:
# Extract optional keyword args before counting positional args
af_threshold = 0.0
output_dir = ""
positional = []
i = 0
while i < len(args):
if args[i] == "--af-threshold":
if i + 1 >= len(args):
raise ValueError("Missing value for --af-threshold")
af_threshold = float(args[i + 1])
if not (0.0 <= af_threshold < 1.0):
raise ValueError(f"--af-threshold must be in [0, 1), got {af_threshold}")
i += 2
elif args[i] == "--output-dir":
if i + 1 >= len(args):
raise ValueError("Missing value for --output-dir")
output_dir = args[i + 1]
i += 2
else:
positional.append(args[i])
i += 1

if len(positional) not in (6, 7):
raise ValueError(
"Wrong number of input arguments, cannot proceed with gnomAD VCF conversion"
)
gnomad_vcfs_dir, samples_ids, joint, keep, multiallelic, threads = args
gnomad_vcfs_dir, samples_ids, joint, keep, multiallelic, threads = positional[:6]
filter_pass_values = frozenset(positional[6].split(",")) if len(positional) == 7 else frozenset({"PASS", "."})
if not os.path.isdir(gnomad_vcfs_dir):
raise ValueError(
f"The specified gnomAD VCF directory is not a directory ({gnomad_vcfs_dir})"
Expand All @@ -82,7 +114,9 @@ def parse_commandline(args: List[str]) -> Tuple[str, str, bool, bool, bool, int]
joint = joint == "True"
keep = keep == "True"
multiallelic = multiallelic == "True"
return gnomad_vcfs_dir, samples_ids, joint, keep, multiallelic, threads
if output_dir:
os.makedirs(output_dir, exist_ok=True)
return gnomad_vcfs_dir, samples_ids, joint, keep, multiallelic, threads, filter_pass_values, af_threshold, output_dir


def read_samples_ids(samples_ids: str):
Expand Down Expand Up @@ -194,21 +228,23 @@ def variant_observed(allele_count: Tuple[int]) -> bool:
return any((ac > 0 for ac in allele_count))


def format_variant_record(variant: pysam.VariantRecord, genotypes: str) -> str:
def format_variant_record(variant: pysam.VariantRecord, genotypes: str, af_field: str = "AF") -> str:
"""
Formats a variant record into a string with specified genotypes and variant
information.

Args:
variant (pysam.VariantRecord): The variant record to be formatted.
genotypes (str): Genotypes information to be included in the format.
af_field (str): INFO field name to read allele frequency from. Use
"AF_joint" for gnomAD joint VCFs, "AF" otherwise. Default: "AF".

Returns:
str: Formatted variant record as a tab-separated string.
"""

try:
af = ",".join(list(map(str, variant.info["AF"])))
af = ",".join(list(map(str, variant.info[af_field])))
except KeyError: # catch potential AF missing in variant INFO field
af = ",".join(["0.0" for _ in variant.alts]) # type: ignore
variant_format = [
Expand All @@ -226,7 +262,15 @@ def format_variant_record(variant: pysam.VariantRecord, genotypes: str) -> str:
return "\t".join([f"{e}" if e is not None else "." for e in variant_format])


def convert_vcf(vcf_fname: str, samples: List[str], joint: bool, keep: bool):
def convert_vcf(
vcf_fname: str,
samples: List[str],
joint: bool,
keep: bool,
filter_pass_values: frozenset = frozenset({"PASS", "."}),
af_threshold: float = 0.0,
output_dir: str = "",
):
"""
Converts a VCF file by updating the header, filtering variants, and creating
a new compressed VCF file.
Expand All @@ -236,6 +280,12 @@ def convert_vcf(vcf_fname: str, samples: List[str], joint: bool, keep: bool):
samples (List[str]): List of sample names.
keep (bool): Flag to determine whether to keep variants that do not pass
the filter.
filter_pass_values (frozenset): Set of VCF FILTER values to treat as
passing. Default: frozenset({"PASS", "."}).
af_threshold (float): Minimum AF_joint value required to include a variant.
Default 0.0 (no filtering).
output_dir (str): Directory for the output VCF. If empty, the output is
written alongside the input file. Default: "".

Returns:
str: Path to the converted and compressed VCF file.
Expand All @@ -245,11 +295,28 @@ def convert_vcf(vcf_fname: str, samples: List[str], joint: bool, keep: bool):
IOError: If an error occurs during the conversion process.
"""

vcf_outfname = f"{os.path.splitext(vcf_fname)[0]}.gz" # replace bgz with gz
# Determine output path: use output_dir if provided, else same dir as input
out_basename = f"{os.path.splitext(os.path.basename(vcf_fname))[0]}.gz"
if output_dir:
vcf_outfname = os.path.join(output_dir, out_basename)
else:
vcf_outfname = os.path.join(os.path.dirname(vcf_fname), out_basename)
try:
vcf = load_vcf(vcf_fname) # use pysam for optimized VCF loading
except OSError as e:
raise OSError(f"An error occurred while loading {vcf_fname}") from e
# Pre-flight: warn if no accepted FILTER records found in the first 10,000 rows.
# If every record has a hard-fail filter the output will be silently empty.
_sample = list(itertools.islice(vcf.fetch(), 10_000))
_n_ok = sum(1 for v in _sample if filter_pass_values & set(v.filter.keys()))
if not keep and _n_ok == 0 and _sample:
sys.stderr.write(
f"WARNING: {vcf_fname}: sampled {len(_sample)} records, none have "
f"FILTER in {filter_pass_values} — all variants will be skipped. "
f"Re-run with keep=True or adjust --vcf-filter-pass-values.\n"
)
vcf.reset() # rewind before the main loop below
af_field = "AF_joint" if joint else "AF" # field name for AF threshold filtering
samples_ac = [
f"AC_joint_{sample}" if joint else f"AC_{sample}" for sample in samples
] # recover allele count field for each input sample
Expand All @@ -258,16 +325,25 @@ def convert_vcf(vcf_fname: str, samples: List[str], joint: bool, keep: bool):
# write the upated header to the converted vcf
outfile.write(update_header(vcf.header.copy(), samples, joint))
for variant in vcf:
if not keep and "PASS" not in variant.filter.keys():
if not keep and not (filter_pass_values & set(variant.filter.keys())):
continue
# AF threshold filter (skipped when af_threshold == 0.0)
if af_threshold > 0.0:
try:
af_vals = variant.info[af_field]
af_max = max(af_vals) if hasattr(af_vals, "__len__") else af_vals
except KeyError:
af_max = 0.0
if af_max < af_threshold:
continue
genotypes = "\t".join(
[
GT[1] if variant_observed(variant.info[sac]) else GT[0]
for sac in samples_ac
]
)
outfile.write(
f"{format_variant_record(variant, genotypes)}\n"
f"{format_variant_record(variant, genotypes, af_field)}\n"
) # write the formatted VCF line to the out vcf
except IOError as e:
raise IOError(f"An error occurred while converting {vcf_fname}") from e
Expand Down Expand Up @@ -314,7 +390,14 @@ def bcftools_merge(vcf_fname: str, multiallelic: bool) -> str:


def run_conversion_pipeline(
vcf_fname: str, samples: List[str], joint: bool, keep: bool, multiallelic: bool
vcf_fname: str,
samples: List[str],
joint: bool,
keep: bool,
multiallelic: bool,
filter_pass_values: frozenset = frozenset({"PASS", "."}),
af_threshold: float = 0.0,
output_dir: str = "",
) -> None:
"""
Runs a conversion pipeline to process a VCF file by adding genotypes and merging
Expand All @@ -327,13 +410,17 @@ def run_conversion_pipeline(
the filter.
multiallelic (bool): Flag indicating whether to handle multiallelic variants
during merging.
filter_pass_values (frozenset): Set of VCF FILTER values to treat as passing.
af_threshold (float): Minimum AF_joint value required to include a variant.
Default 0.0 (no filtering).
output_dir (str): Directory for output VCFs. Default "" (same as input).

Returns:
None
"""

# add genotypes to input VCF
vcf_genotypes = convert_vcf(vcf_fname, samples, joint, keep)
vcf_genotypes = convert_vcf(vcf_fname, samples, joint, keep, filter_pass_values, af_threshold, output_dir)
# merge variants into mutlialleic/biallelic sites
vcf_merged = bcftools_merge(vcf_genotypes, multiallelic)
assert os.path.isfile(vcf_merged) and os.stat(vcf_merged).st_size > 0
Expand All @@ -354,7 +441,7 @@ def convert_gnomad_vcfs() -> None:
"""

# read input arguments
gnomad_vcfs_dir, samples_ids, joint, keep, multiallelic, threads = (
gnomad_vcfs_dir, samples_ids, joint, keep, multiallelic, threads, filter_pass_values, af_threshold, output_dir = (
parse_commandline(sys.argv[1:])
)
start = time.time()
Expand All @@ -372,6 +459,9 @@ def convert_gnomad_vcfs() -> None:
joint=joint,
keep=keep,
multiallelic=multiallelic,
filter_pass_values=filter_pass_values,
af_threshold=af_threshold,
output_dir=output_dir,
)
pool.map(partial_run_conversion_pipeline, gnomad_vcfs)
pool.close()
Expand Down
4 changes: 3 additions & 1 deletion PostProcess/new_simple_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,10 @@ def iupac_decomposition(split, guide_no_bulge, guide_no_pam, cluster_to_save):
haploSamples = {0: [], 1: []}
for count, sample in enumerate(sampleSet[i]):
sampleInfo = sample.split(":")
gt_alleles = sampleInfo[1].split("|")
for haplo in totalDict:
if sampleInfo[1].split("|")[haplo] != "0":
allele = gt_alleles[haplo] if len(gt_alleles) > haplo else gt_alleles[0] # haploid guard
if allele != "0":
haploSamples[haplo].append(sampleInfo[0])
totalDict[0][0][(pos_c, elem)] = [
listReplaceTarget,
Expand Down
24 changes: 21 additions & 3 deletions PostProcess/pool_post_analisi_indel.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,29 @@

from multiprocessing import Pool

import gzip
import subprocess
import sys
import os


def _chrom_from_vcf(vcf_path: str) -> str:
"""Return the chromosome name from the first data line of a VCF (plain or gzipped)."""
open_fn = gzip.open if vcf_path.endswith(".gz") else open
with open_fn(vcf_path, "rt") as fh:
for line in fh:
if not line.startswith("#"):
return line.split("\t")[0]
raise ValueError(f"No data lines found in VCF: {vcf_path}")


def _normalize_chrom(chrom: str) -> str:
"""Ensure chromosome name has chr prefix (e.g. '22' -> 'chr22').
Some VCF datasets (e.g. 1000G GRCh38) store chromosomes without the
prefix in the CHROM field, while the genome indices use 'chr'-prefixed names.
"""
return chrom if chrom.startswith("chr") else "chr" + chrom

# post-analysis script name
POSTANALYSIS = "./post_analisi_indel.sh"

Expand All @@ -26,8 +45,7 @@


def start_analysis(fname: str) -> None:
chrom = next((e for e in fname.split(".") if e.startswith("chr")), None)
assert chrom
chrom = _normalize_chrom(_chrom_from_vcf(os.path.join(vcf_folder, fname)))
code = subprocess.call(
f"{POSTANALYSIS} {output_folder} {ref_folder} {vcf_folder} {guide_file} "
f"{mm} {bDNA} {bRNA} {annotation_file} {pam_file} {dict_folder} "
Expand All @@ -41,6 +59,6 @@ def start_analysis(fname: str) -> None:


# chromosome-wise vcfs list
chrs = [f for f in os.listdir(vcf_folder) if "vcf.gz" in f]
chrs = [f for f in os.listdir(vcf_folder) if f.endswith(".vcf.gz")]
with Pool(processes=ncpus) as pool: # run chrom-wise post-analysis in parallel
pool.map(start_analysis, chrs)
Loading
Loading