diff --git a/ChangeLog.md b/ChangeLog.md index 6e2d724a..9e406c1a 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -1,3 +1,106 @@ +# NEAT v4.4.4 +Follow-up release on top of v4.4.3 bundling three lines of work: another perf +pass over the remaining single-thread hot paths (variant overlap checks, +trinucleotide bias mapping, BAM record encoding), a coverage-semantics fix +for the GC bias model added earlier in the 4.4 line, and removal of the +`parallel_mode` config key. The perf changes are behavior-preserving for a +given seed; the GC bias fix changes output bytes when a non-uniform `gc_model` +is configured (a typical model with mean weight ~0.7 previously delivered +~70 % of the requested coverage; v4.4.4 delivers the full coverage). + +**Benchmark (ecoli 10× coverage, 4 threads, identical configs):** + +| Metric | v4.4.3 | v4.4.4 | Improvement | +|------------------------------|----------|----------|-----------------| +| ecoli SE wall time | 1:35 | **1:07** | 1.42× faster | +| ecoli PE wall time | 1:34 | **1:06** | 1.43× faster | +| ecoli SE total CPU | 331 s | 244 s | 26% less | +| ecoli PE total CPU | 338 s | 248 s | 27% less | +| Worker CPU utilization (PE) | 359%/400 | **378%** | 94.5% of theory | +| Peak resident memory | 175 MB | 175 MB | unchanged | + +Cumulative vs the v4.4.2 baseline at the start of this performance arc: +SE 14:55 → 1:07 (**13.3× faster**), PE 14:46 → 1:06 (**13.4× faster**). + +**Hot-path fixes:** + +- `ContigVariants.check_if_del` / `check_if_ins` — replace O(N) linear scans + of `all_dels` / `all_ins` with O(1) lookups into per-position bucket dicts. + Each indel is indexed at every reference position it covers when added; + total memory cost is O(sum of indel lengths). Single-thread profile on + 14 k calls: `check_if_del` 3.62 s → 0.011 s (325×); `check_if_ins` 3.70 s → + 0.007 s (530×). Removes O(N²) scaling that would have hurt larger genomes. + +- `variant_models.map_local_trinuc_bias` — replace 64 regex scans per call + (one `re.finditer` per trinucleotide) with a single vectorized numpy + expression. The cache check is also tightened from `==` (O(N) byte + compare) to `is` (O(1) identity), since callers pass different slice + objects each time anyway. Single-thread profile on 14 k calls: 13.5 s → + 0.81 s (16×). The entire `generate_variants` call dropped from 24.9 s + cumulative to 4.2 s. + +- `OutputFileWriter.write_bam_record` — vectorize BAM record encoding: + - Sequence encoding replaces the per-byte Python loop (28 M + `Seq.__getitem__` calls in the old code) with a 256-entry numpy lookup + plus a single `((codes[0::2] << 4) | codes[1::2]).tobytes()`. + - Quality encoding replaces `"".join([chr(n) for n in quality_array])` + with one `np.asarray(...).astype(np.uint8).tobytes()` call. + - CIGAR ops are now packed in one `struct.pack(f'<{n}I', ...)` instead of + one pack-per-op + `bytearray.extend`. The CIGAR string is parsed in a + single linear pass (the previous code did `re.split` + `re.findall`). + - The fixed-size BAM record header is now one + `struct.pack(' 1`). +Existing configs with `parallel_mode: ...` keep parsing — the key is +silently ignored with a one-line deprecation warning. The option never +actually let users change effective behavior under the old logic (it was +force-overridden to `'contig'` when `threads == 1`, and `'size'` was a +strict superset for `threads > 1`), so this is API surface cleanup rather +than a behavior change. + +**GC bias coverage fix.** When a non-uniform `gc_model` was configured, +`cover_dataset` had been scaling per-chunk reads by `chunk_mean / max_weight`. +That made `coverage` the *peak* coverage at the GC bias maximum, undercounting +total reads by `mean_weight_genome / max_weight` (a typical model with +mean ~0.7, max 1.0 delivered ~70 % of the requested coverage). The user-facing +config (`template_neat_config.yml:18`) documents `coverage` as "average coverage +for the entire genome," so this was a documentation/behavior mismatch. + +The fix: + +- New `compute_genome_wide_gc_mean_weight(reference, gc_model)` helper + (`neat/model_gc_bias/utils.py`) — single-pass vectorized scan over every + length-`window_size` window in the reference, reusing the same prefix-sum + windowing as `cover_dataset`. Uniform models short-circuit without scanning. +- `read_simulator_runner` precomputes this once and stores it on `options` so + all worker chunks see the same denominator (no redundant per-chunk genome + scans). +- `cover_dataset` now scales by `chunk_mean / global_mean`, which averages to + 1.0 across chunks and preserves `options.coverage` as the genome-wide average. +- When `cover_dataset` is called directly outside the runner (e.g., unit tests + with an in-memory `SeqRecord`), `gc_global_mean_weight` is unset and the + scaling falls back to chunk-local mean — i.e., no rescaling, which is the + correct single-chunk behavior. + +The stale `TODO use gc bias to skew this number. Calculate at the runner level.` +at `generate_reads.py:72` is removed; that work is now complete. + +All 603 unit/integration tests pass (up from 599 with three new +`test_compute_genome_wide_gc_mean_*` unit tests plus +`test_gc_bias_preserves_average_coverage`, which runs the full pipeline against +a half-AT / half-GC reference under a non-uniform model and asserts delivered +reads are within 20 % of `coverage × genome_length / read_len`). + # NEAT v4.4.3 Major performance and memory overhaul focused on making NEAT viable for large genomes on supercomputing hardware. No user-visible API changes (other than diff --git a/README.md b/README.md index 24f39272..5b568aff 100755 --- a/README.md +++ b/README.md @@ -47,7 +47,6 @@ To cite this work, please use both of the following: * [`neat model-seq-err`](#neat-model-seq-err) * [`neat model-qual-score`](#neat-model-qual-score) * [`neat model-gc-bias`](#neat-model-gc-bias) - * [`neat vcf_compare`](#neat-vcf_compare) * [Tests](#tests) * [Guide to run locally](#guide-to-run-locally) * [Note on Sensitive Patient Data](#note-on-sensitive-patient-data) @@ -57,15 +56,15 @@ To cite this work, please use both of the following: The most up-to-date requirements are found in `environment.yml`. * Some version of Anaconda to set up the environment -* `python` == 3.10.* -* `poetry` == 1.3.* +* `python` >= 3.10 +* `poetry` >= 2.0 * `biopython` == 1.85 * `pkginfo` * `matplotlib` -* `numpy` +* `numpy` >= 2.2 * `pyyaml` * `scipy` -* `pysam` +* `pysam` >= 0.23 * `frozendict` NEAT assumes a Linux environment and was tested primarily on Ubuntu Linux. It should work on most Linux systems. If you @@ -109,7 +108,7 @@ and run: Assuming you have installed `conda`, run `source activate` or `conda activate`. -Please note that these installation instructions support MacOS, Windows, and Linux. +NEAT is tested on Linux. macOS may work but is not regularly tested; on Windows, use WSL. Alternatively, if you wish to work with NEAT in the development-only environment, you can use `poetry install` within the NEAT repo, after creating the `conda` environment: @@ -203,9 +202,8 @@ More parameters are below: | `mutation_bed` | Full path to a list of regions with a column describing the mutation rate of that region, as a float with values between 0 and 0.3. The mutation rate must be in the third column as, e.g., `mut_rate`=0.00. | | `rng_seed` | Manually enter a seed for the random number generator. Used for repeating runs. Must be an integer. | | `min_mutations` | Set the minimum number of mutations that NEAT should add, per contig. Default is 0. We recommend setting this to at least one for small chromosomes, so NEAT will produce at least one mutation per contig. | -| `threads` | Number of threads to use. More than 1 will use multi-threading to speed up processing. | -| `mode` | `size` or `contig` whether to divide the contigs into blocks or just by contig. By `contig` is the default, but division by `size` may speed up your run. | -| `size` | Default value of 500,000. | +| `threads` | Number of threads to use. More than 1 will use multi-threading to speed up processing. With `threads > 1`, NEAT splits each contig into chunks; with `threads == 1`, one chunk per contig is used. | +| `parallel_block_size` | Per-chunk size in bases when `threads > 1`. Default `0` (auto-tune from total genome length and thread count, targeting ~8 chunks per thread). Set to a positive integer to override. Ignored when `threads == 1`. | | `cleanup_splits` | If running more than one simulation on the same input fasta, you can reuse splits files. By default, this will be set to `False`, and splits files will be deleted at the end of the run. | | `reuse_splits` | If an existing splits file exists in the output folder, it will use those splits, if this value is set to `True`. | @@ -268,7 +266,7 @@ seqkit shuffle -s 42 reads_r2.fastq.gz -o reads_r2.shuffled.fastq.gz ### Estimated runtimes -> **Note:** The tables below are from the original NEAT 4.4 (v4.4.0) benchmark. NEAT v4.4.3 is roughly **9× faster on multi-threaded ecoli** and uses **~3× less memory** thanks to the performance work landed in that release. The relative shape of the tables (size scaling, contig vs. size mode tradeoffs) remains accurate, but absolute runtimes should be divided by ~5–10× for v4.4.3+. See ChangeLog v4.4.3 for a detailed before/after table on ecoli and a c_elegans scale-test. +> **Note:** The tables below are from the original NEAT 4.4 (v4.4.0) benchmark. NEAT v4.4.4 is roughly **13× faster on multi-threaded ecoli** and uses **~3× less memory** thanks to the performance work in v4.4.3 and v4.4.4. The relative shape of the tables (size scaling, contig vs. size mode tradeoffs) remains accurate, but absolute runtimes should be divided by ~10–15× for v4.4.4+. See `ChangeLog.md` (v4.4.3 and v4.4.4 entries) for a detailed before/after table on ecoli and a c_elegans scale-test. To give users a sense of how long `neat read-simulator` runs may take, we benchmarked NEAT 4.4 on several reference genomes. All runs were paired-end, with read length of 150 bp, coverage of 10, fragment mean of 300 bp, and fragment standard deviation of 50 bp. Runtimes are reported as the average across three unique runs (`Avg. time (ms)`) and the corresponding runtime in minutes. Cells marked with N/A indicate that NEAT was not able to run to completion. @@ -280,8 +278,7 @@ The inputs in single-threaded, contig-based mode most closely replicate the beha The configuration used: -- `mode: contig` -- `threads: 1` +- `threads: 1` (NEAT processes one contig per chunk in single-thread mode) - `cleanup_splits: True` | Organism | File size (bytes) | Avg. runtime (ms) | Avg. runtime (min) | @@ -299,10 +296,9 @@ For small genomes, single-threaded performance is already fast (on the order of #### NEAT Multi-Threaded (size mode) -Here we enabled NEAT’s parallelized mode (“small filtering”), which splits contigs into size-based blocks and processes them concurrently. The configuration used: +Here we enabled NEAT’s multi-threaded mode, which splits contigs into size-based blocks and processes them concurrently. The configuration used: -- `mode: size` -- `size: 500000` +- `parallel_block_size: 500000` - `produce_bam: false` - `threads: 7` - `cleanup_splits: True` @@ -329,8 +325,8 @@ The following commands are examples for common types of data to be generated. Th ### Whole genome simulation Simulate whole genome dataset with random variants inserted according to the default model: +Contents of neat_config.yml: ```yml -[contents of neat_config.yml] reference: hg19.fa read_len: 126 produce_bam: True @@ -338,7 +334,8 @@ produce_vcf: True paired_ended: True fragment_mean: 300 fragment_st_dev: 30 - +``` +```bash neat read-simulator \ -c neat_config.yml \ -o /home/me/simulated_reads/ @@ -347,8 +344,8 @@ neat read-simulator \ ### Targeted region simulation Simulate a targeted region of a genome (e.g., an exome) with a targeted bed: +Contents of neat_config.yml: ```yml -[contents of neat_config.yml] reference: hg19.fa read_len: 126 produce_bam: True @@ -357,11 +354,11 @@ paired_ended: True fragment_mean: 300 fragment_st_dev: 30 targed_bed: hg19_exome.bed - +``` +```bash neat read-simulator \ -c neat_config \ -o /home/me/simulated_reads/ - ``` ### Parallelizing simulation Split the contig into blocks rather than reading by contig. Even in single-threaded mode this is likely to be faster. The chunk size auto-tunes from total genome length and thread count, targeting roughly 8 chunks per thread for good load balancing — on small bacterial genomes you get ~500 kb chunks (similar to NEAT's old hardcoded default), on human-scale genomes you get ~6 Mb chunks (a few hundred total instead of thousands). Specify `parallel_block_size` explicitly if you want to override. @@ -377,7 +374,6 @@ paired_ended: True fragment_mean: 350 fragment_st_dev: 50 threads: 12 -parallel_mode: size # parallel_block_size omitted: auto-tuned from genome length and thread count. # Set explicitly (e.g. parallel_block_size: 1000000) only if you have a reason. ``` @@ -417,7 +413,6 @@ produce_bam: true produce_fastq: true produce_vcf: true threads: ${SLURM_CPUS_PER_TASK} -parallel_mode: size YAML neat read-simulator -c "config_${CHROM}.yml" \ @@ -429,8 +424,7 @@ After the array completes, concatenate the per-chromosome outputs: ```bash # BAMs are already coordinate-sorted within each chromosome; cat in chromosome order -# produces a globally-sorted whole-genome BAM with no further sort needed. -samtools cat -o all.bam out/chr1/chr1_golden.bam out/chr2/chr2_golden.bam ... out/chrY/chrY_golden.bam +# produces a globally-sorted whole-genome BAM with no further 2_golden.bam ... out/chrY/chrY_golden.bam samtools index all.bam # FASTQs: gzip-concat preserves a valid gzip stream @@ -445,10 +439,10 @@ bcftools index all.vcf.gz This gives you whole-genome simulation in roughly `single-chromosome-time / nodes` wall time. For a 30× human genome with 24 nodes (each 64 cores), a per-chromosome run takes ~10 min and the array finishes in ~10 min wall, plus a few minutes for the final concat. ### Insert specific variants -Simulate a whole genome dataset with only the variants in the provided VCF file using `-v` and setting mutation rate to 0 with `-M`. +Simulate a whole genome dataset with only the variants in the provided VCF file by setting `include_vcf` to the VCF path and `mutation_rate: 0`: +Contents of neat_config.yml: ```yml -[contents of neat_config.yml] reference: hg19.fa read_len: 126 produce_bam: True @@ -458,7 +452,8 @@ fragment_mean: 300 fragment_st_dev: 30 include_vcf: NA12878.vcf mutation_rate: 0 - +``` +```bash neat read-simulator \ -c neat_config.yml \ -o /home/me/simulated_reads/ @@ -467,29 +462,31 @@ neat read-simulator \ ### Single-end reads Simulate single-end reads by omitting paired-ended options: +Contents of neat_config.yml: ```yml -[contents of neat_config.yml] reference: hg18.fa read_len: 126 produce_bam: True produce_vcf: True - +``` +```bash neat read-simulator \ -c neat_config.yml \ - -o /home/me/simulated_read/ + -o /home/me/simulated_read/ \ -p 126_frags ``` ### Large single-end reads -Simulate PacBio-like reads by providing an error model (note that this is not yet implemented in NEAT 4.0): +Simulate PacBio-like reads by providing an error model. Note: full long-read support is planned for a separate future release and is not yet fully validated in NEAT 4.4. +Contents of neat_config.yml: ```yml -[contents of neat-config.yml] reference: hg19.fa read_len: 5000 error_model: errorModel_pacbio.pickle.gz avg_seq_error: 0.1 - +``` +```bash neat read-simulator \ -c neat_config.yml \ -o /home/me/simulated_reads @@ -601,29 +598,6 @@ neat model-gc-bias \ and creates `gc_model.pickle.gz` model in the working directory. -### `neat vcf_compare` - -Tool for comparing VCF files (Not yet implemented in NEAT 4.4). - -```bash -neat vcf_compare - -r * Reference Fasta \ - -g * Golden VCF \ - -w * Workflow VCF \ - -o * Output Prefix \ - -m Mappability Track \ - -M Maptrack Min Len \ - -t Targetted Regions \ - -T Min Region Len \ - -c Coverage Filter Threshold [15] \ - -a Allele Freq Filter Threshold [0.3] \ - --vcf-out Output Match/FN/FP variants [False] \ - --no-plot No plotting [False] \ - --incl-homs Include homozygous ref calls [False] \ - --incl-fail Include calls that failed filters [False] \ - --fast No equivalent variant detection [False] -``` - ## Tests We provide unit tests (e.g., mutation and sequencing error models) and basic integration tests for the CLI. diff --git a/config_template/simple_template.yml b/config_template/simple_template.yml index 05969722..9eab62b4 100644 --- a/config_template/simple_template.yml +++ b/config_template/simple_template.yml @@ -27,7 +27,6 @@ rng_seed: . min_mutations: . overwrite_output: . -parallel_mode: . parallel_block_size: . threads: . cleanup_splits: . diff --git a/config_template/template_neat_config.yml b/config_template/template_neat_config.yml index 6a2ea6eb..342ccbb2 100644 --- a/config_template/template_neat_config.yml +++ b/config_template/template_neat_config.yml @@ -142,14 +142,11 @@ min_mutations: . # type: bool | required = no | default = false overwrite_output: . -# How to split the input reference for parallelization -# Note: If threads == 1, this option has no effect. -# type = string | required: no | default = contig | values: contig, size -parallel_mode: . - -# Target block size if by = size (overlap = read_len * 2). -# Default is 500000 when by = size. Not used for by = contig -# type = int | required: no | default = 500000 (when by=size) +# Per-chunk size in bases used when threads > 1. +# Default 0 = auto-tune from total genome length and thread count (targeting ~8 +# chunks per thread). Set to a positive integer to override. When threads == 1 +# one chunk per contig is used and this option has no effect. +# type = int | required: no | default = 0 (auto) parallel_block_size: . # Maximum number of concurrent NEAT jobs (threads or hyperthreads) to run diff --git a/neat/common/ploid_functions.py b/neat/common/ploid_functions.py index 1c1d226e..8e7eff80 100644 --- a/neat/common/ploid_functions.py +++ b/neat/common/ploid_functions.py @@ -38,7 +38,6 @@ def pick_ploids(ploidy: int, how_many = 1 else: # if it's polyploid, we'll consider it to be on roughly half the ploids - # TODO may need to improve the modeling for polyploid, maybe how_many = ploidy//2 # wp is just the temporary genotype list, a hat tip to the old version of NEAT diff --git a/neat/gen_mut_model/runner.py b/neat/gen_mut_model/runner.py index f30801a5..6fc1715d 100644 --- a/neat/gen_mut_model/runner.py +++ b/neat/gen_mut_model/runner.py @@ -19,7 +19,7 @@ from ..common import validate_output_path, validate_input_path, open_input, open_output, \ ALLOWED_NUCL, ALL_TRINUCS, HUMAN_WHITELIST from .utils import extract_header, read_and_filter_variants,\ - cluster_list, count_trinucleotides, find_caf, convert_trinuc_transition_matrix, \ + count_trinucleotides, convert_trinuc_transition_matrix, \ convert_snp_transition_matrix, convert_trinuc_mutation_dict, check_homozygous __all__ = [ @@ -68,10 +68,6 @@ def runner(reference_index, indel_count = {} # Total homozygous variants detected homozygous_count = 0 - # detect variants that occur in a significant percentage of the input samples (pos,ref,alt,pop_fraction) - common_variants = [] - # identify regions that have significantly higher local mutation rates than the average - high_mut_regions = [] # Clean up and simplify reference index # simplify naming and filter out actual human genomes from scaffolding @@ -122,9 +118,6 @@ def runner(reference_index, # Running total of how many non-N bases there are in the reference total_reflen += len(reference_index[contig].seq) - reference_index[contig].seq.upper().count('N') - # list to be used for counting variants that occur multiple times in file (i.e. in multiple samples) - vcf_common = {} - # Create a list of variants to process variants_to_process = [x for x in matching_variants if x[0] == contig] @@ -171,51 +164,7 @@ def runner(reference_index, indel_count[indel_len] = 0 indel_count[indel_len] += 1 - my_pop_freq = find_caf(variant[4]) homozygous_count += check_homozygous(variant[4]) - vcf_common[tuple(variant[1:5])] = my_pop_freq - - # identify common mutations - percentile_var = 95 - min_value = np.percentile([vcf_common[n] for n in vcf_common], percentile_var) - for variant, allele_freq in sorted(vcf_common.items()): - if allele_freq >= min_value: - common_variants.append(variant) - # TODO figure out what to do with these common variants - - # Identify areas that have contained significantly higher random mutation rates. - # Added a potential for smaller deltas to handle smaller datasets in 4.0 - dist_thresh = min(2000, int(len(ref_sequence) * 0.01)) - percentile_clust = 97 - # Adjusted the scalar for smaller deltas in 4.0 - scaler = min(1000, dist_thresh//2) - # identify regions with disproportionately more variants in them - variant_pos = sorted([int(n[0]) for n in vcf_common]) - clustered_pos = cluster_list(variant_pos, dist_thresh) - # Since the list is sorted, taking the first and last position gives us the min and max - by_len = [(len(clustered_pos[i]), clustered_pos[i][0], clustered_pos[i][-1], i) - for i in range(len(clustered_pos))] - - candidate_regions = [] - for n in by_len: - ref_scalar = int((n[1] - dist_thresh) / float(scaler)) * scaler - alt_scalar = int((n[2] + dist_thresh) / float(scaler)) * scaler - candidate_regions.append((n[0] / float(alt_scalar - ref_scalar), max([0, ref_scalar]), - min([len(ref_sequence), alt_scalar]))) - minimum_value = np.percentile([n[0] for n in candidate_regions], percentile_clust) - for n in candidate_regions: - if n[0] >= minimum_value: - high_mut_regions.append((contig, n[1], n[2], n[0])) - # collapse overlapping regions - for i in range(len(high_mut_regions) - 1, 0, -1): - if high_mut_regions[i - 1][2] >= high_mut_regions[i][1] and \ - high_mut_regions[i - 1][0] == high_mut_regions[i][0]: - # Might need to research a more accurate way to get the mutation rate for this region - avg_mut_rate = 0.5 * high_mut_regions[i - 1][3] + 0.5 * high_mut_regions[i][3] - high_mut_regions[i - 1] = ( - high_mut_regions[i - 1][0], high_mut_regions[i - 1][1], high_mut_regions[i][2], avg_mut_rate - ) - del high_mut_regions[i] # if for some reason we didn't find any valid input variants, exit gracefully... total_var = snp_count + sum([abs(x) for x in indel_count.values()]) diff --git a/neat/model_fragment_lengths/utils.py b/neat/model_fragment_lengths/utils.py index b295d4c7..aedd934b 100644 --- a/neat/model_fragment_lengths/utils.py +++ b/neat/model_fragment_lengths/utils.py @@ -45,8 +45,6 @@ def filter_lengths(datalist: list, min_reads: int) -> list: """ Filters the datalist to remove outliers. - TODO this function might be useful in the simulation as well - :param datalist: The list to filter :param min_reads: filter reads with less than this many reads. If 0, filter_lengths returns the original datalist. :return: The filtered list diff --git a/neat/model_gc_bias/__init__.py b/neat/model_gc_bias/__init__.py index 42a243ce..6677963c 100644 --- a/neat/model_gc_bias/__init__.py +++ b/neat/model_gc_bias/__init__.py @@ -1 +1,2 @@ from .runner import compute_gc_bias_runner +from .utils import compute_genome_wide_gc_mean_weight diff --git a/neat/model_gc_bias/utils.py b/neat/model_gc_bias/utils.py index fa69e8f6..19248243 100644 --- a/neat/model_gc_bias/utils.py +++ b/neat/model_gc_bias/utils.py @@ -8,6 +8,56 @@ from pathlib import Path +def compute_genome_wide_gc_mean_weight(reference_path: str | Path, gc_model) -> float: + """ + Compute the genome-wide mean GC bias weight across every length-`window_size` window + in the reference. Used at the runner level so that per-chunk read-count scaling can + divide by the global mean (preserving the user-requested average coverage) rather + than by the model's max weight (which would under-deliver coverage). + + For a uniform model this returns weights[0] without scanning the reference. + All-N or sub-window contigs are skipped. + """ + if gc_model.is_uniform: + return float(gc_model.weights[0]) + + window_size = gc_model.window_size + weights_arr = np.asarray(gc_model.weights, dtype=np.float32) + + total_weight = 0.0 + total_positions = 0 + + with pysam.FastaFile(str(reference_path)) as fa: + for contig in fa.references: + span_length = fa.get_reference_length(contig) + if span_length <= window_size: + continue + seq = fa.fetch(contig).upper() + n_positions = span_length - window_size + 1 + seq_arr = np.frombuffer(seq.encode(), dtype=np.uint8) + gc_mask = (seq_arr == ord('G')) | (seq_arr == ord('C')) + n_mask = seq_arr == ord('N') + gc_cumsum = np.empty(span_length + 1, dtype=np.int64) + n_cumsum = np.empty(span_length + 1, dtype=np.int64) + gc_cumsum[0] = 0 + np.cumsum(gc_mask, out=gc_cumsum[1:]) + n_cumsum[0] = 0 + np.cumsum(n_mask, out=n_cumsum[1:]) + positions = np.arange(n_positions, dtype=np.int64) + window_ends = positions + window_size + gc_counts = (gc_cumsum[window_ends] - gc_cumsum[positions]).astype(np.float32) + n_counts = (n_cumsum[window_ends] - n_cumsum[positions]).astype(np.float32) + called = np.maximum(window_size - n_counts, 1.0) + gc_indices = np.clip(np.rint(gc_counts / called * 100).astype(np.int16), 0, 100) + weights = weights_arr[gc_indices] + total_weight += float(weights.sum()) + total_positions += n_positions + + if total_positions == 0: + return float(gc_model.weights[0]) + return total_weight / total_positions + + def calculate_gc_content(sequence: str) -> float: """ Calculates GC content of a sequence. diff --git a/neat/model_sequencing_error/utils.py b/neat/model_sequencing_error/utils.py index 1abffd03..98578001 100644 --- a/neat/model_sequencing_error/utils.py +++ b/neat/model_sequencing_error/utils.py @@ -4,7 +4,6 @@ import logging import numpy as np -# TODO implement plotting import matplotlib.pyplot as plt import sys @@ -144,9 +143,6 @@ def parse_file(input_file: str, quality_scores: list, max_reads: int, qual_offse _LOG.debug("Skipping record that doesn't match read length") continue - # TODO Adding this section to account for quality score "shape" in a fastq - # shape_curves.append(qualities_to_check) - for j in range(read_length): # The qualities of each read_position_scores q_score = qualities_to_check[j] @@ -179,16 +175,6 @@ def parse_file(input_file: str, quality_scores: list, max_reads: int, qual_offse st_d_q = np.std(expanded_counts) avg_std_by_pos.append((average_q, st_d_q)) - # TODO In progress, working on ensuring the error model produces the right shape - # shape_curves = pd.DataFrame(shape_curves) - # columns = list(range(1, 11)) + list(range(15, len(shape_curves[0]), 5)) - # shape_curves_plot = shape_curves.iloc[columns] - # averages = [] - # for name, value in shape_curves_plot.items(): - # averages.append(np.average(value)) - # sns.boxplot(data=shape_curves_plot, fliersize=0) - # plt.show() - # Calculates the average error rate tot_bases = read_length * records_read avg_err = 0 diff --git a/neat/models/error_models.py b/neat/models/error_models.py index 37cec861..c9ed0793 100644 --- a/neat/models/error_models.py +++ b/neat/models/error_models.py @@ -2,8 +2,8 @@ Classes for the error models used in the simulation. This will generate errors of the type contained in variant_models.py, each of which is based on a type from the variants submodule. Also contained is a simple container for storing errors, and a subclass for the simulation type to be performed. -Currently, only the traditional type is supported, but we are working on adding a Markov chain based model in the near -future. +Two quality-score models are available: TraditionalQualityModel here, and the Markov-chain model in +neat.models.markov_quality_model. """ import logging @@ -33,8 +33,8 @@ class TraditionalQualityModel: """ - This is the traditional NEAT model for generating quality scores. This will be replaced by - a Markov model or an optional markov model + Per-position normal-distribution model for generating quality scores. An alternative Markov-chain + model is available in neat.models.markov_quality_model; this one remains the default. :param transition_matrix: 2x2 matrix that gives the probability of each base transitioning to another. :param quality_scores: numpy array of ints of the PHRED quality scores possible from the sequencing machine @@ -104,16 +104,6 @@ def get_quality_scores( return np.clip(np.rint(scores).astype(int), 1, 42) -class MarkovQualityModel: - def __init__(self): - # TODO - pass - - def get_quality_scores(self): - # TODO - pass - - class SequencingErrorModel(SnvModel, DeletionModel, InsertionModel): """ This is a SequencingErrorModel class, based on the old SequencingError. Instead of handling everything, we've diff --git a/neat/models/variant_models.py b/neat/models/variant_models.py index d3c7d855..e3e353c6 100644 --- a/neat/models/variant_models.py +++ b/neat/models/variant_models.py @@ -2,7 +2,6 @@ Classes for the variant models included in NEAT. Every Variant type in variants > variant_types must have a corresponding model in order to be fully implemented. """ -import re import logging import abc @@ -148,30 +147,54 @@ def map_local_trinuc_bias( :param ngaps: A list of dictionaries, each describing an ngap. :return: A list of the bias factors by position. """ - # If the sequence is unchanged, we don't want to process again. - if not self.local_sequence == sequence: - - # start by assuming no bias. Each of the 64 valid trinucleotide combinations mutate with equal frequency. - self.local_trinuc_bias = np.ones(len(sequence), dtype=float) - - # If there are ngaps we'll set the bias rate to zero in those areas - self.local_trinuc_bias[np.where(ngaps == 0)] = 0.0 - - # If the model was set up with no bias, then we skip the biasing part - if not self.no_bias: - # Update the map bias at the central position for that trinuc - for trinuc in ALL_TRINUCS: - for match in re.finditer(trinuc, str(sequence)): - # match.start() + 1 puts us at the center of the trinuc - if match.start() + 1 > len(self.local_trinuc_bias): - _LOG.warning(f"Trinuc bias index out of range: {match.start() + 1} > {len(self.local_trinuc_bias)}") - self.local_trinuc_bias[match.start() + 1] = self.trinuc_mutation_bias[TRINUC_IND[trinuc]] - if len(self.local_trinuc_bias) != len(sequence): - _LOG.warning(f"Trinuc bias length mismatch: {len(self.local_trinuc_bias)} != {len(sequence)}") - - # Now we normalize the bias - self.local_trinuc_bias = self.local_trinuc_bias / sum(self.local_trinuc_bias) - self.local_sequence = sequence + # Cache-hit short-circuit. The old equality check `self.local_sequence == sequence` + # did a full bytewise compare on every call (O(N) per call); identity check is + # O(1) and covers the case where the caller reuses the same Seq object. A new + # slice each call will miss either way, so we don't pay extra for the miss. + if self.local_sequence is sequence: + return + + n = len(sequence) + # Start by assuming no bias. Each of the 64 valid trinucleotide combinations + # mutate with equal frequency. N-gap positions become 0. + self.local_trinuc_bias = np.ones(n, dtype=float) + self.local_trinuc_bias[np.where(ngaps == 0)] = 0.0 + + # If the model was set up with no bias, skip the trinuc weighting entirely. + if not self.no_bias and n >= 3: + # Vectorized trinucleotide scan: replaces 64 regex passes (one per trinuc) + # over the sequence with a single O(N) numpy expression. Each center + # position i in [1, n-1) gets its bias from the trinuc spanning + # [i-1, i, i+1], encoded as base_left*16 + base_mid*4 + base_right + # (matching the TRINUC_IND layout). Centers whose trinuc contains any + # non-ACGT base keep the initial value (1.0 normally, 0.0 if in an N-gap). + seq_bytes = np.frombuffer(str(sequence).upper().encode(), dtype=np.uint8) + nuc_idx = np.full(n, -1, dtype=np.int8) + nuc_idx[seq_bytes == ord('A')] = 0 + nuc_idx[seq_bytes == ord('C')] = 1 + nuc_idx[seq_bytes == ord('G')] = 2 + nuc_idx[seq_bytes == ord('T')] = 3 + tri_left = nuc_idx[:-2] + tri_mid = nuc_idx[1:-1] + tri_right = nuc_idx[2:] + valid = (tri_left >= 0) & (tri_mid >= 0) & (tri_right >= 0) + # Use int32 for the multiplied terms so we don't overflow int8. + trinuc_idx = ( + tri_left.astype(np.int32) * 16 + + tri_mid.astype(np.int32) * 4 + + tri_right.astype(np.int32) + ) + bias_table = np.asarray(self.trinuc_mutation_bias, dtype=float) + # Where the trinuc is valid, look up the bias; otherwise preserve current value. + center_view = self.local_trinuc_bias[1:-1] + looked_up = bias_table[np.where(valid, trinuc_idx, 0)] + self.local_trinuc_bias[1:-1] = np.where(valid, looked_up, center_view) + + # Now we normalize the bias + total = self.local_trinuc_bias.sum() + if total > 0: + self.local_trinuc_bias = self.local_trinuc_bias / total + self.local_sequence = sequence def sample_trinucs(self, rng: Generator) -> int: """ diff --git a/neat/read_simulator/runner.py b/neat/read_simulator/runner.py index 509d6cd4..22574ff9 100644 --- a/neat/read_simulator/runner.py +++ b/neat/read_simulator/runner.py @@ -20,7 +20,8 @@ from .utils import Options, OutputFileWriter, parse_beds, parse_input_vcf from ..common import validate_input_path, validate_output_path from .single_runner import read_simulator_single -from ..models import SequencingErrorModel +from ..models import SequencingErrorModel, GCBiasModel +from ..model_gc_bias import compute_genome_wide_gc_mean_weight from ..variants import ContigVariants from .utils.split_inputs import main as split_main from .utils.stitch_outputs import main as stitch_main @@ -94,9 +95,7 @@ def read_simulator_runner(config: str, output_dir: str, file_prefix: str): # to avoid pathological cases on very small or very large genomes. The FASTA index # (.fai) gives us per-contig lengths without parsing sequences; pysam.FastaFile # builds the .fai on first access if needed. - if (options.threads > 1 - and options.parallel_mode == "size" - and options.parallel_block_size <= 0): + if options.threads > 1 and options.parallel_block_size <= 0: with pysam.FastaFile(str(options.reference)) as _fa: total_bp = sum(_fa.get_reference_length(c) for c in _fa.references) target_chunks = options.threads * 8 @@ -107,6 +106,20 @@ def read_simulator_runner(config: str, output_dir: str, file_prefix: str): ) options.parallel_block_size = auto_size + # Precompute the genome-wide mean GC bias weight once so per-chunk read-count scaling + # in cover_dataset can divide by the global mean (preserving the documented "average + # coverage" semantics) instead of by the model's max weight (which would under-deliver + # coverage on non-uniform models). No-op when gc_model is unset. + if options.gc_model: + gc_model_obj = GCBiasModel.from_file(options.gc_model) + options.gc_global_mean_weight = compute_genome_wide_gc_mean_weight( + options.reference, gc_model_obj + ) + _LOG.debug( + f"Genome-wide GC mean weight: {options.gc_global_mean_weight:.6f} " + f"(max: {gc_model_obj.max_weight:.6f})" + ) + # Split file by chunk for parallel analysis or by contig for either parallel or single analysis _LOG.info("Splitting reference...") (splits_files_dict, count, reference_keys_with_lens) = split_main( diff --git a/neat/read_simulator/utils/generate_reads.py b/neat/read_simulator/utils/generate_reads.py index a8ca23b3..bea6fb19 100644 --- a/neat/read_simulator/utils/generate_reads.py +++ b/neat/read_simulator/utils/generate_reads.py @@ -69,7 +69,6 @@ def cover_dataset( # responsibility_length, not span_length, so chunks don't over-sample their overlap # region (which is also covered by the next chunk). if options.paired_ended: - # TODO use gc bias to skew this number. Calculate at the runner level. number_reads = ceil(responsibility_length * options.coverage / (2 * options.read_len)) else: number_reads = ceil(responsibility_length * options.coverage / options.read_len) @@ -115,10 +114,17 @@ def cover_dataset( mean_weight = total_weight / n_positions - # Scale total reads by the mean/max ratio so that GC-rich regions receive - # proportionally more reads than AT-rich regions across the genome. - # For a uniform model mean_weight == max_weight, so this is a no-op. - number_reads = ceil(number_reads * mean_weight / gc_model.max_weight) + # Scale this chunk's read count by chunk_mean / global_mean. With CDF-biased + # placement, expected genome-wide average coverage = options.coverage × (chunk_mean / + # denominator). Using the genome-wide mean as denominator makes that ratio average + # to 1.0 across chunks, so average coverage matches options.coverage (the documented + # contract). The global mean is precomputed once at the runner level. When unset + # (e.g., cover_dataset is called directly outside the runner), we fall back to the + # chunk's own mean — i.e., no scaling, which is the correct single-chunk behavior. + denominator = options.gc_global_mean_weight if getattr( + options, "gc_global_mean_weight", None + ) else mean_weight + number_reads = ceil(number_reads * mean_weight / denominator) if number_reads == 0: return [] diff --git a/neat/read_simulator/utils/options.py b/neat/read_simulator/utils/options.py index 627eed34..991857e7 100644 --- a/neat/read_simulator/utils/options.py +++ b/neat/read_simulator/utils/options.py @@ -81,7 +81,6 @@ def __init__(self, produce_vcf: bool = False, produce_fastq: bool = True, min_mutations: int = 0, - parallel_mode: str = "contig", parallel_block_size: int = 0, cleanup_splits: bool = True, splits_dir: Path | None = None, @@ -133,10 +132,10 @@ def __init__(self, :param produce_vcf: True to produce a VCF of neat-added variants :param produce_fastq: False to turn off default fastq creation :param min_mutations: If you wish to have a minimunm number of mutations per block, enter it here - :param parallel_mode: If you wish to use block size method, enter 'size' here - :param parallel_block_size: If you use size method, specify a positive integer to set the per-chunk - size in basepairs. The default (0) auto-tunes from total genome length and thread count, targeting - ~8 chunks per thread. Specify a value explicitly to override. + :param parallel_block_size: Per-chunk size in basepairs when threads > 1 (contigs are split into + chunks of this size). Default 0 auto-tunes from total genome length and thread count, targeting + ~8 chunks per thread. Specify a positive integer to override. Ignored when threads == 1, where + one chunk per contig is used. :param cleanup_splits: Set to False in order to preserve splits after run :param reuse_splits: Attempts to reuse existing splits file """ @@ -170,13 +169,22 @@ def __init__(self, self.produce_fastq: bool = produce_fastq self.min_mutations: int = min_mutations - # Parallel features - self.parallel_mode: str = parallel_mode + # Parallel features. The splitting strategy is no longer user-configurable: with + # threads > 1, contigs are split into chunks of `parallel_block_size`; with + # threads == 1, each contig is processed as a single chunk. self.parallel_block_size: int = parallel_block_size self.cleanup_splits: bool = cleanup_splits self.splits_dir: Path | None = splits_dir self.reuse_splits: bool = reuse_splits self.gc_model: Path | None = Path(gc_model) if gc_model else None + # Genome-wide mean GC bias weight, computed once at the runner level when + # gc_model is loaded. cover_dataset divides per-chunk reads by this rather + # than by gc_model.max_weight so that target coverage means *average* + # coverage across the genome (matching the documented contract), not peak + # coverage at the GC bias maximum. None when no gc_model is configured or + # when cover_dataset is called outside the runner (e.g., direct unit tests), + # in which case scaling falls back to the per-chunk mean (no rescaling). + self.gc_global_mean_weight: float | None = None # Actual output files self.fq1: Path | None = fq1 @@ -241,7 +249,6 @@ def from_cli(output_dir: Path, 'rng_seed': (int, None, None, None), 'min_mutations': (int, 0, None, None), 'overwrite_output': (bool, False, None, None), - 'parallel_mode': (str, 'size', 'choice', ['size', 'contig']), 'parallel_block_size': (int, 0, None, None), 'threads': (int, 1, 1, 1000), 'cleanup_splits': (bool, True, None, None), @@ -300,6 +307,13 @@ def check_and_log_error(keyname, value_to_check, crit1, crit2): _LOG.error(f'`{keyname}` must be between {crit1} and {crit2} (input: {value_to_check}).') sys.exit(1) + # YAML keys that have been removed from the schema but which we still want to accept + # silently (with a one-line deprecation warning) so older user configs keep parsing. + # Map: deprecated key -> short reason shown to the user. + DEPRECATED_KEYS = { + "parallel_mode": "splitting strategy is now derived from `threads`", + } + def read_yaml(self, config_yaml: Path, args: dict): """ This sets up the option attributes. It's not perfect, because it sort of kills type hints. @@ -307,6 +321,9 @@ def read_yaml(self, config_yaml: Path, args: dict): """ config = yaml.load(open(config_yaml, 'r'), Loader=Loader) + for dep_key, reason in self.DEPRECATED_KEYS.items(): + if dep_key in config: + _LOG.warning(f"`{dep_key}` is deprecated and ignored; {reason}.") for key, value in config.items(): if key in args: @@ -442,18 +459,14 @@ def log_configuration(self): if self.threads > 1: _LOG.info(f'Running read simulator in parallel mode.') _LOG.info(f'Multithreading - {self.threads} threads (or CPU Max)') - else: - _LOG.info(f"Single threading - 1 thread.") - self.parallel_mode = 'contig' - - if self.parallel_mode == 'size': - _LOG.info(f'Splitting reference into chunks.') + _LOG.info('Splitting reference into chunks.') if self.parallel_block_size > 0: _LOG.info(f' - splitting input into size {self.parallel_block_size}') else: - _LOG.info(f' - chunk size will be auto-tuned from genome length and thread count') - elif self.parallel_mode == 'contig': - _LOG.info(f'Splitting input by contig.') + _LOG.info(' - chunk size will be auto-tuned from genome length and thread count') + else: + _LOG.info('Single threading - 1 thread.') + _LOG.info('Splitting input by contig.') if self.reuse_splits: splits_dir = Path(f'{self.output_dir}/splits/') _LOG.info(f'Reusing existing splits {splits_dir}.') diff --git a/neat/read_simulator/utils/output_file_writer.py b/neat/read_simulator/utils/output_file_writer.py index afe3ccf8..58b9c6d3 100755 --- a/neat/read_simulator/utils/output_file_writer.py +++ b/neat/read_simulator/utils/output_file_writer.py @@ -9,13 +9,13 @@ ] import os -import re import shutil import time from struct import pack import logging from typing import Any +import numpy as np from Bio import bgzf from Bio import SeqIO from pathlib import Path @@ -35,6 +35,18 @@ CIGAR_PACKED = {'M': 0, 'I': 1, 'D': 2, 'N': 3, 'S': 4, 'H': 5, 'P': 6, '=': 7, 'X': 8} SEQ_PACKED = {'=': 0, 'A': 1, 'C': 2, 'M': 3, 'G': 4, 'R': 5, 'S': 6, 'V': 7, 'T': 8, 'W': 9, 'Y': 10, 'H': 11, 'K': 12, 'D': 13, 'B': 14, 'N': 15} + +# 256-entry byte→4-bit lookup tables: write_bam_record encodes the read sequence +# 2 bases per byte and packs each CIGAR op into one uint32. A flat numpy array +# indexed by byte value is ~30x faster than a per-character dict lookup in a Python +# loop, which dominated write_bam_record self-time before vectorization. +SEQ_PACKED_LOOKUP = np.zeros(256, dtype=np.uint8) +for _c, _v in SEQ_PACKED.items(): + SEQ_PACKED_LOOKUP[ord(_c)] = _v +CIGAR_PACKED_LOOKUP = np.zeros(256, dtype=np.uint8) +for _c, _v in CIGAR_PACKED.items(): + CIGAR_PACKED_LOOKUP[ord(_c)] = _v + # TODO figure out an optimum batch size or get rid of this idea BUFFER_BATCH_SIZE = 8000 # write out to file after this many reads @@ -231,8 +243,19 @@ def write_bam_record( cigar = read.make_cigar() - cig_letters = re.split(r"\d+", cigar)[1:] - cig_numbers = [int(n) for n in re.findall(r"\d+", cigar)] + # Parse the CIGAR string in one linear pass instead of two regex scans + # (re.split + re.findall) — equivalent output, no regex overhead. + cig_letters = [] + cig_numbers = [] + cig_pos = 0 + cig_len = len(cigar) + while cig_pos < cig_len: + num_start = cig_pos + while cig_pos < cig_len and cigar[cig_pos].isdigit(): + cig_pos += 1 + cig_numbers.append(int(cigar[num_start:cig_pos])) + cig_letters.append(cigar[cig_pos]) + cig_pos += 1 cig_ops = len(cig_letters) next_ref_id = contig_id @@ -242,27 +265,34 @@ def write_bam_record( else: next_pos = mate_position - encoded_cig = bytearray() - - for i in range(cig_ops): - encoded_cig.extend(pack(' tuple[dict, int, dict]: reference_keys_with_lens[contig] = len(seq_str) split_fasta_dict[contig] = {} - if options.parallel_mode == "contig": + # Single-thread runs use one chunk per contig; multi-thread runs split each + # contig into chunks of `parallel_block_size`. The splitting strategy is no + # longer user-configurable — it's derived from `threads`. + if options.threads == 1: stem = f"{idx:0{pad}d}__{contig}" fa = options.splits_dir / f"{stem}.fa.gz" write_fasta(contig, seq_str, fa) diff --git a/neat/variants/contig_variants.py b/neat/variants/contig_variants.py index 72f5f09b..75f413ec 100644 --- a/neat/variants/contig_variants.py +++ b/neat/variants/contig_variants.py @@ -38,18 +38,24 @@ def __init__(self): "INFO": '.', "FORMAT": 'GT', } - self.all_dels = [] - self.all_ins = [] + # Indels are indexed by every reference position they cover so check_if_del / + # check_if_ins is O(1) on average. The previous representation (`all_dels` / + # `all_ins` flat lists) made each check O(N) and the whole variant-generation + # pass O(N**2). Each Insertion/Deletion of length L occupies L position-buckets; + # since indels here are short (typically 1-10 bp) and the simulator rejects + # overlapping ones per genotype, total memory is O(sum of indel lengths). + self._del_by_position: dict[int, list] = {} + self._ins_by_position: dict[int, list] = {} def check_if_del(self, other): - for deletion in self.all_dels: - if np.array_equal(other.genotype, deletion.genotype) and deletion.contains(other): + for deletion in self._del_by_position.get(other.position1, ()): + if np.array_equal(other.genotype, deletion.genotype): return deletion return None def check_if_ins(self, other): - for insert in self.all_ins: - if np.array_equal(other.genotype, insert.genotype) and insert.contains(other.position1): + for insert in self._ins_by_position.get(other.position1, ()): + if np.array_equal(other.genotype, insert.genotype): return insert return None @@ -65,9 +71,11 @@ def add_variant(self, input_variant): if self.find_dups(input_variant): return 1 if type(input_variant) == Insertion: - self.all_ins.append(input_variant) + for p in range(input_variant.position1, input_variant.position1 + input_variant.length): + self._ins_by_position.setdefault(p, []).append(input_variant) elif type(input_variant) == Deletion: - self.all_dels.append(input_variant) + for p in range(input_variant.position1, input_variant.position1 + input_variant.length): + self._del_by_position.setdefault(p, []).append(input_variant) self.contig_variants[input_variant.position1].append(input_variant) return 0 diff --git a/pyproject.toml b/pyproject.toml index ec50b6db..fad2097a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "neat-genreads" -version = "4.4.3" +version = "4.4.4" description = "NGS Simulation toolkit" readme = "README.md" authors = ["Joshua Allen ", "Keshav Gandhi "] diff --git a/tests/test_models/test_gc_bias_model.py b/tests/test_models/test_gc_bias_model.py index aa9aba6e..b416cc34 100644 --- a/tests/test_models/test_gc_bias_model.py +++ b/tests/test_models/test_gc_bias_model.py @@ -137,3 +137,51 @@ def test_get_gc_bias_weights_gc_rich_heavier_than_at_rich(tmp_path): weights = get_gc_bias_weights(str(bam), str(ref), window_size=100) assert weights[100] > weights[0] + + +# --------------------------------------------------------------------------- +# compute_genome_wide_gc_mean_weight +# --------------------------------------------------------------------------- + +from neat.model_gc_bias.utils import compute_genome_wide_gc_mean_weight + + +def test_compute_genome_wide_gc_mean_uniform_model(tmp_path): + """Uniform model: returns weights[0] without scanning the reference.""" + ref = tmp_path / "ref.fa" + ref.write_text(">chr1\n" + "ACGT" * 1000 + "\n") + model = GCBiasModel([0.7] * 101, window_size=100) + # No need to even index — uniform short-circuits + assert compute_genome_wide_gc_mean_weight(str(ref), model) == pytest.approx(0.7) + + +def test_compute_genome_wide_gc_mean_pure_at_reference(tmp_path): + """A 100%-AT reference samples every window at GC bin 0 → returns weights[0].""" + ref = tmp_path / "ref.fa" + ref.write_text(">chr1\n" + "A" * 2000 + "\n") + weights = [0.0] * 101 + weights[0] = 0.3 + weights[100] = 1.0 + model = GCBiasModel(weights, window_size=100) + pysam_index_fasta(ref) + mean = compute_genome_wide_gc_mean_weight(str(ref), model) + assert mean == pytest.approx(0.3, abs=1e-6) + + +def test_compute_genome_wide_gc_mean_mixed_reference(tmp_path): + """Half-AT/half-GC reference should land between weights[0] and weights[100].""" + ref = tmp_path / "ref.fa" + ref.write_text(">chr1\n" + "A" * 5000 + "G" * 5000 + "\n") + weights = [0.5] * 101 + weights[100] = 1.0 + model = GCBiasModel(weights, window_size=100) + pysam_index_fasta(ref) + mean = compute_genome_wide_gc_mean_weight(str(ref), model) + # ≈ 50 % windows at bin 0 (weight 0.5) + 50 % at bin 100 (weight 1.0), with a 100 bp + # transition zone near the midpoint. Should land near 0.75. + assert 0.7 < mean < 0.8 + + +def pysam_index_fasta(ref_path): + """Build a .fai for pysam.FastaFile.""" + pysam.faidx(str(ref_path)) diff --git a/tests/test_models/test_split_inputs.py b/tests/test_models/test_split_inputs.py index a0b192c8..23e11fa7 100644 --- a/tests/test_models/test_split_inputs.py +++ b/tests/test_models/test_split_inputs.py @@ -22,9 +22,12 @@ # --------------------------------------------------------------------------- def _make_options(tmp_path, mode="contig", read_len=50, block_size=200): + """Build an Options for split_inputs tests. `mode` is a test-side convenience + that maps to `threads` (1 for contig-style, 4 for size-style splitting) — the + splitting strategy is no longer a separately-configurable option.""" opts = Options(rng_seed=0) opts.read_len = read_len - opts.parallel_mode = mode + opts.threads = 1 if mode == "contig" else 4 opts.parallel_block_size = block_size opts.splits_dir = tmp_path / "splits" opts.splits_dir.mkdir(exist_ok=True) diff --git a/tests/test_read_simulator/test_gc_bias_pipeline.py b/tests/test_read_simulator/test_gc_bias_pipeline.py index 15961bf2..d0c14ffb 100644 --- a/tests/test_read_simulator/test_gc_bias_pipeline.py +++ b/tests/test_read_simulator/test_gc_bias_pipeline.py @@ -167,3 +167,106 @@ def test_gc_bias_multi_contig(tmp_path): # chr2 is 100% G, chr1 is 100% A. With our model, chr2 should have significantly more reads. assert chr2_reads > chr1_reads + + +def test_gc_bias_preserves_average_coverage(tmp_path): + """Average coverage across the genome should match the requested coverage even when + a non-uniform GC bias model is applied. Before the v4.4.5 fix, scaling by max_weight + instead of genome-wide mean under-delivered total reads by mean/max.""" + # Reference: 50/50 split AT vs GC. With a model favoring 100% GC, a chunk of 100% GC + # has mean_weight near max, but a chunk of 0% GC has weight ~0.5 (the boost in the + # weights array). Genome-wide mean lies between the two extremes. + ref_seq = "A" * 5000 + "G" * 5000 + ref_path = _write_ref(tmp_path / "ref.fa", ref_seq) + + from neat.models.gc_bias_model import GCBiasModel + weights = [0.5] * 101 + weights[100] = 1.0 + model = GCBiasModel(weights, window_size=100) + model_path = tmp_path / "biased_model.pickle.gz" + model.save(model_path) + + coverage = 30 + read_len = 50 + out_dir = tmp_path / "avg_out" + cfg_path = _write_config( + tmp_path / "avg.yml", + ref_path, + gc_model=str(model_path), + coverage=coverage, + read_len=read_len, + ) + + read_simulator_runner(str(cfg_path), str(out_dir), "sim_avg") + + fq_files = list(out_dir.glob("*.fastq.gz")) + n_reads = 0 + with gzip.open(fq_files[0], "rt") as f: + for line in f: + if line.startswith("@NEAT_generated"): + n_reads += 1 + + # Expected total bases produced (single-ended): coverage × genome_length. + # Expected total reads: expected_bases / read_len. + expected_reads = (coverage * len(ref_seq)) / read_len + delivered_ratio = n_reads / expected_reads + + # Allow 25% tolerance for stochastic sampling, GC bin smoothing, and the boundary + # window (5000 ± window_size) where GC content transitions. Pre-fix the ratio was + # ~mean/max ≈ (0.5+1.0)/2 / 1.0 = 0.75, so even loose bounds catch a regression. + assert 0.8 < delivered_ratio < 1.2, ( + f"Average coverage drifted: delivered {n_reads} reads vs expected " + f"{expected_reads:.0f} (ratio {delivered_ratio:.3f})" + ) + + +def test_gc_bias_preserves_coverage_multi_threaded(tmp_path): + """Same average-coverage contract as the single-threaded test, but with threads>1 and + a parallel_block_size small enough to force sub-contig chunking. Verifies that the + precomputed genome-wide mean is correctly applied across worker processes.""" + # 8 kb reference with 4 alternating 2 kb segments — gives a non-trivial spread of + # per-chunk GC content so the chunks each see different mean_weights. + ref_seq = ("A" * 2000) + ("G" * 2000) + ("A" * 2000) + ("G" * 2000) + ref_path = _write_ref(tmp_path / "ref.fa", ref_seq) + + from neat.models.gc_bias_model import GCBiasModel + weights = [0.5] * 101 + weights[100] = 1.0 + model = GCBiasModel(weights, window_size=100) + model_path = tmp_path / "biased_model.pickle.gz" + model.save(model_path) + + coverage = 30 + read_len = 50 + out_dir = tmp_path / "mt_out" + # parallel_block_size = 2500 with read_len=50 yields overlap=100 and produces 4 chunks + # over the 8 kb reference, exercising the multi-chunk path under threads=2. + cfg_path = _write_config( + tmp_path / "mt.yml", + ref_path, + gc_model=str(model_path), + coverage=coverage, + read_len=read_len, + threads=2, + parallel_block_size=2500, + ) + + read_simulator_runner(str(cfg_path), str(out_dir), "sim_mt") + + fq_files = list(out_dir.glob("*.fastq.gz")) + n_reads = 0 + with gzip.open(fq_files[0], "rt") as f: + for line in f: + if line.startswith("@NEAT_generated"): + n_reads += 1 + + expected_reads = (coverage * len(ref_seq)) / read_len + delivered_ratio = n_reads / expected_reads + + # Same 20 % tolerance as the single-threaded test. Pre-fix the ratio would have been + # ~0.75 (mean/max); the multi-threaded path is where the per-chunk precomputation + # propagation matters most, so this test guards against regressions in that plumbing. + assert 0.8 < delivered_ratio < 1.2, ( + f"Multi-threaded average coverage drifted: delivered {n_reads} reads vs expected " + f"{expected_reads:.0f} (ratio {delivered_ratio:.3f})" + ) diff --git a/tests/test_read_simulator/test_options.py b/tests/test_read_simulator/test_options.py index 33e9c17c..6a82d5d6 100644 --- a/tests/test_read_simulator/test_options.py +++ b/tests/test_read_simulator/test_options.py @@ -73,7 +73,6 @@ def test_from_cli_single_end_with_threads_and_splits(tmp_path: _PathAlias): rng_seed: 42 overwrite_output: true - parallel_mode: size parallel_block_size: 500000 threads: 2 cleanup_splits: false @@ -125,7 +124,6 @@ def test_from_cli_paired_end_fragments(tmp_path: _PathAlias): rng_seed: 7 overwrite_output: true - parallel_mode: contig threads: 1 cleanup_splits: true reuse_splits: false @@ -157,9 +155,10 @@ def test_default_values(): assert opts.produce_vcf is False assert opts.quality_offset == 33 assert opts.threads == 1 - assert opts.parallel_mode == "contig" - # Default is 0 (sentinel for auto-tune from genome length and thread count). - # An explicit positive int in YAML overrides; see runner for the auto-tune logic. + # parallel_block_size default is 0 (sentinel for auto-tune from genome length and + # thread count). An explicit positive int in YAML overrides; see runner for the + # auto-tune logic. The splitting strategy itself is no longer a user-facing option — + # it's derived from `threads` at runtime. assert opts.parallel_block_size == 0 assert opts.cleanup_splits is True assert opts.reuse_splits is False @@ -222,12 +221,15 @@ def test_check_and_log_error_numeric_out_of_range(capsys): def test_check_and_log_error_choice_valid(): - Options.check_and_log_error("parallel_mode", "contig", "choice", ["size", "contig"]) + # The schema's `choice`-type validator isn't currently used by any production + # option, but we still want it covered. Use a synthetic key name so the test + # stays meaningful regardless of which schema fields use this validator. + Options.check_and_log_error("_test_choice_key", "alpha", "choice", ["alpha", "beta"]) def test_check_and_log_error_choice_invalid(): with _pytest.raises(SystemExit): - Options.check_and_log_error("parallel_mode", "bad", "choice", ["size", "contig"]) + Options.check_and_log_error("_test_choice_key", "gamma", "choice", ["alpha", "beta"]) def test_check_options_no_output_files_exits(): @@ -260,12 +262,28 @@ def test_log_configuration_produces_bam_and_vcf(tmp_path: _PathAlias): assert opts.vcf in opts.output_files -def test_log_configuration_threads_one_forces_contig(tmp_path: _PathAlias): +def test_read_yaml_deprecated_parallel_mode_warns(tmp_path: _PathAlias, caplog): + """An old YAML config with `parallel_mode: ...` parses without error and emits a + deprecation warning. The key is silently ignored (the splitting strategy is now + derived from `threads`), so existing user configs keep working across the + deprecation.""" ref = _project_root() / "data" / "H1N1.fa" - opts = Options(reference=ref, output_dir=tmp_path, output_prefix="out", - overwrite_output=True, threads=1, parallel_mode="size") - opts.log_configuration() - assert opts.parallel_mode == "contig" + cfg = _textwrap.dedent( + f""" + reference: {ref} + parallel_mode: size + threads: 1 + """ + ).strip() + "\n" + yml_path = tmp_path / "old_config.yml" + yml_path.write_text(cfg, encoding="utf-8") + import logging as _logging + with caplog.at_level(_logging.WARNING): + Options.from_cli(tmp_path, "out", yml_path) + assert any( + "parallel_mode" in rec.message and "deprecated" in rec.message + for rec in caplog.records + ) def test_log_configuration_fragment_mean_less_than_read_len_exits(tmp_path: _PathAlias): @@ -304,7 +322,6 @@ def test_from_cli_reuse_splits_missing_dir_raises(tmp_path: _PathAlias): produce_bam: false produce_vcf: false threads: 4 - parallel_mode: size parallel_block_size: 500000 cleanup_splits: true reuse_splits: true diff --git a/tests/test_variants/test_contig_variants.py b/tests/test_variants/test_contig_variants.py index b6337824..82701f53 100644 --- a/tests/test_variants/test_contig_variants.py +++ b/tests/test_variants/test_contig_variants.py @@ -108,22 +108,7 @@ def test_get_sample_info_without_metadata_uses_genotype_string(): assert "|" in result or "/" in result -# remove_variant - -def test_remove_variant_method_exists(): - """remove_variant silently no-ops due to variant.position bug. - - The fix is on branch fix/contig-variants-remove-variant with regression - tests in tests/test_variants/test_remove_variant.py. - TODO (post-fix): replace this test with: - cv = ContigVariants() - v = _snv(10) - cv.add_variant(v) - cv.remove_variant(v) - assert 10 not in cv.variant_locations - """ - cv = ContigVariants() - assert callable(cv.remove_variant) +# remove_variant — full behavioral coverage lives in test_remove_variant.py # compile_genotypes_for_location