Skip to content

Commit cdb1abb

Browse files
committed
Got bam production up and running again
1 parent 44bb242 commit cdb1abb

6 files changed

Lines changed: 87 additions & 74 deletions

File tree

neat/read_simulator/runner.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ def read_simulator_runner(config: str, output_dir: str, file_prefix: str):
111111

112112
# Creates files and sets up objects for files that can be written to as needed.
113113
# Also creates headers for bam and vcf.
114-
output_file_writer = OutputFileWriter(options=options, bam_header=bam_header)
114+
output_file_writer = OutputFileWriter(options=options, header=bam_header)
115115

116116
# Split file by chunk for parallel analysis or by contig for either parallel or single analysis
117117
_LOG.info("Splitting reference...")
@@ -140,7 +140,7 @@ def read_simulator_runner(config: str, output_dir: str, file_prefix: str):
140140
# Create local filenames based on fasta indexing scheme.
141141
fq1 = None
142142
fq2 = None
143-
block_reads_pickle = None
143+
bam = None
144144
if options.produce_fastq:
145145
fq1 = current_output_dir / options.fq1.name
146146
# validate to double-check we don't have name collisions
@@ -150,14 +150,15 @@ def read_simulator_runner(config: str, output_dir: str, file_prefix: str):
150150
fq2 = current_output_dir / options.fq2.name
151151
validate_output_path(fq2, True, False)
152152
if options.produce_bam:
153-
block_reads_pickle = current_output_dir / "reads.p.gz"
154-
validate_output_path(block_reads_pickle, True, False)
155-
current_options = options.copy_with_changes(splits_file, current_output_dir, fq1, fq2, block_reads_pickle)
153+
bam = current_output_dir / options.bam.name
154+
validate_output_path(bam, True, False)
155+
current_options = options.copy_with_changes(splits_file, current_output_dir, fq1, fq2, bam)
156156
if options.threads == 1:
157157
idx, contig, local_variants, files_written = read_simulator_single(
158158
1,
159159
start,
160160
current_options,
161+
bam_header,
161162
contig,
162163
contig_index,
163164
input_variants_dict[contig],
@@ -178,6 +179,7 @@ def read_simulator_runner(config: str, output_dir: str, file_prefix: str):
178179
thread_idx,
179180
start,
180181
current_options,
182+
bam_header,
181183
contig,
182184
contig_index,
183185
thread_input_variants,
@@ -208,12 +210,12 @@ def read_simulator_runner(config: str, output_dir: str, file_prefix: str):
208210

209211
start = time.time()
210212
if options.produce_bam:
211-
stitch_main(output_file_writer, output_files, contig_dict, options.read_len, options.threads)
213+
stitch_main(output_file_writer, output_files, options.threads)
212214
else:
213215
stitch_main(output_file_writer, output_files)
214216
_LOG.info(f"It took {time.time()-start} s to write the bam file")
215217

216-
output_file_writer.close_files()
218+
output_file_writer.flush_and_close_files()
217219
_LOG.info(f"Read simulator complete in {time.time() - analysis_start} s")
218220

219221
def write_final_vcf(

neat/read_simulator/single_runner.py

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@
22
Runner for read-simulator in single-ended mode
33
"""
44
import gzip
5+
import os
56
import pickle
6-
from Bio import SeqIO
7+
8+
import pysam
9+
from Bio import SeqIO, bgzf
710
import logging
811
from pathlib import Path
912

@@ -21,6 +24,7 @@ def read_simulator_single(
2124
thread_idx: int,
2225
block_start: int,
2326
local_options: Options,
27+
bam_header: list | None,
2428
contig_name: str,
2529
contig_index: int,
2630
input_variants_local: ContigVariants,
@@ -33,6 +37,7 @@ def read_simulator_single(
3337
:param thread_idx: index of current thread
3438
:param block_start: Where on the reference does this block start? For a full contig, this will be 0.
3539
:param local_options: options for current thread and reference chunk
40+
:param bam_header: Pass in the outer bam header.
3641
:param contig_name: The original list of contig names.
3742
:param contig_index: The index of the contig which this chunk comes from
3843
:param input_variants_local: The input variants for this block
@@ -83,7 +88,7 @@ def read_simulator_single(
8388
# We'll also keep track here of what files we are producing.
8489
# We don't really need to write out the VCF. We should be able to store it in memory
8590
local_options.produce_vcf = False
86-
local_output_file_writer = OutputFileWriter(options=local_options)
91+
local_output_file_writer = OutputFileWriter(options=local_options, header=bam_header)
8792
"""
8893
Begin Analysis
8994
"""
@@ -100,7 +105,7 @@ def read_simulator_single(
100105
)
101106

102107
if local_options.produce_fastq or local_options.produce_bam:
103-
generate_reads(
108+
reads_to_write = generate_reads(
104109
thread_idx,
105110
local_seq_record,
106111
seq_error_model,
@@ -115,11 +120,38 @@ def read_simulator_single(
115120
contig_index,
116121
)
117122

118-
local_output_file_writer.close_files()
123+
# TODO write the per-block bam here. For some reason moving it entirely out of the loop slows it down in all cases
124+
# Hopefully this lets us take advantage of multithreading for writing the bams. Should make the final stitching
125+
# easier -- basically, that gets moved to here and there will just be a straight file dump.
126+
bam_handle = bgzf.BgzfWriter(local_output_file_writer.bam, 'a', compresslevel=6)
127+
for read_data in reads_to_write:
128+
read1 = read_data[0]
129+
read2 = read_data[1]
130+
if read1:
131+
local_output_file_writer.write_bam_record(
132+
read1,
133+
contig_index,
134+
bam_handle,
135+
local_options.read_len
136+
)
137+
if read2:
138+
local_output_file_writer.write_bam_record(
139+
read2,
140+
contig_index,
141+
bam_handle,
142+
local_options.read_len
143+
)
144+
bam_handle.flush()
145+
bam_handle.close()
146+
sorted_bam = local_output_file_writer.bam.with_suffix(".sorted.bam")
147+
pysam.sort("-@", str(local_options.threads), "-o", str(sorted_bam), str(local_output_file_writer.bam))
148+
os.rename(str(sorted_bam), str(local_output_file_writer.bam))
149+
150+
local_output_file_writer.flush_and_close_files()
119151
file_dict = {
120152
"fq1": local_output_file_writer.fq1,
121153
"fq2": local_output_file_writer.fq2,
122-
"reads": local_options.reads_pickle,
154+
"bam": local_options.bam,
123155
}
124156
return (
125157
thread_idx,

neat/read_simulator/utils/generate_reads.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -309,12 +309,5 @@ def generate_reads(
309309
else:
310310
reads_to_write.append((read_1, None))
311311

312-
if options.produce_fastq:
313-
# _LOG.info(f"Chunk fastq(s) written in: {(time.time() - t)/60:.2f} m")
314-
pass
315-
if options.produce_bam:
316-
with open_output(options.reads_pickle) as reads:
317-
pickle.dump(reads_to_write, reads)
318-
# _LOG.info(f"Chunk bam(s) written.")
319-
320312
_LOG.info(f"Finished sampling reads for thread {thread_index} in {(time.time() - start_time)/60:.2f} m")
313+
return reads_to_write

neat/read_simulator/utils/options.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ def __init__(self,
5656
fq2: Path | None = None,
5757
vcf: Path | None = None,
5858
bam: Path | None = None,
59-
reads_pickle: Path | None = None,
6059
output_prefix: str = "neat_sim",
6160
overwrite_output: bool = False,
6261
rng_seed: int | None = None,
@@ -180,8 +179,6 @@ def __init__(self,
180179
self.fq2: Path | None = fq2
181180
self.vcf: Path | None = vcf
182181
self.bam: Path | None = bam
183-
# kind of a temporary holding pen for bam files
184-
self.reads_pickle: Path | None = reads_pickle
185182

186183
# Set the rng for the run
187184
self.rng = self.set_random_seed()
@@ -337,7 +334,7 @@ def copy_with_changes(self,
337334
current_output_dir: Path | None = None,
338335
fq1: Path | None = None,
339336
fq2: Path | None = None,
340-
reads_pickle: Path | None = None,
337+
bam: Path | None = None
341338
):
342339
return_options = deepcopy(self)
343340
if reference is not None:
@@ -348,9 +345,8 @@ def copy_with_changes(self,
348345
return_options.fq1 = fq1
349346
if fq2 is not None:
350347
return_options.fq2 = fq2
351-
if reads_pickle is not None:
352-
return_options.reads_pickle = reads_pickle
353-
self.bam = None
348+
if bam is not None:
349+
return_options.bam = bam
354350
return return_options
355351

356352
def set_random_seed(self) -> Generator:

neat/read_simulator/utils/output_file_writer.py

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,24 @@
88
"OutputFileWriter"
99
]
1010

11+
import os
1112
import re
1213
from struct import pack
1314
import logging
15+
from typing import Any
1416

1517
from Bio import bgzf
1618
from pathlib import Path
1719

20+
from Bio.bgzf import BgzfWriter
21+
1822
from .read import Read
1923
from .options import Options
2024

2125
_LOG = logging.getLogger(__name__)
2226

2327

2428
# Some Constants
25-
# TODO make bam compression a configurable option
26-
BAM_COMPRESSION_LEVEL = 6
2729
CIGAR_PACKED = {'M': 0, 'I': 1, 'D': 2, 'N': 3, 'S': 4, 'H': 5, 'P': 6, '=': 7, 'X': 8}
2830
SEQ_PACKED = {'=': 0, 'A': 1, 'C': 2, 'M': 3, 'G': 4, 'R': 5, 'S': 6, 'V': 7,
2931
'T': 8, 'W': 9, 'Y': 10, 'H': 11, 'K': 12, 'D': 13, 'B': 14, 'N': 15}
@@ -62,16 +64,16 @@ class OutputFileWriter:
6264
in the various formats.
6365
6466
:param options: Options for the current run.
65-
:param bam_header: A dictionary of lengths of each contig from the reference, keyed by contig id.
67+
:param header: A dictionary of lengths of each contig from the reference, keyed by contig id.
6668
"""
6769
def __init__(self,
6870
options: Options,
69-
bam_header: dict = None):
71+
header: dict = None):
7072

7173
self.paired_ended = options.paired_ended
72-
self.bam_header = bam_header
74+
self.bam_header = header
7375

74-
file_handles = {}
76+
file_handles: dict[Path, Any] = {}
7577

7678
# Set up filenames based on booleans
7779
if options.fq1 is not None:
@@ -91,7 +93,6 @@ def __init__(self,
9193
vcf = None
9294
if options.bam is not None:
9395
bam = options.bam
94-
file_handles[bam] = bgzf.BgzfWriter(bam, 'w', compresslevel=BAM_COMPRESSION_LEVEL)
9596
else:
9697
bam = None
9798

@@ -119,26 +120,27 @@ def __init__(self,
119120
f'#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\tNEAT_simulated_sample\n'
120121
self.files_to_write[self.vcf].write(vcf_header)
121122

122-
if options.produce_bam and self.bam_header:
123+
if options.produce_bam:
123124
# bam header
124-
bam_handle = self.files_to_write[self.bam]
125+
bam_handle = bgzf.BgzfWriter(self.bam, 'w', compresslevel=6)
125126
bam_handle.write("BAM\1")
126127
# Without a header, we can't write these as bams.
127-
bam_header = "@HD\tVN:1.4\tSO:coordinate\n"
128+
header = "@HD\tVN:1.4\tSO:coordinate\n"
128129
for item in self.bam_header:
129-
bam_header += f'@SQ\tSN:{item}\tLN:{str(self.bam_header[item])}\n'
130-
bam_header += "@RG\tID:NEAT\tSM:NEAT\tLB:NEAT\tPL:NEAT\n"
131-
header_bytes = len(bam_header)
130+
header += f'@SQ\tSN:{item}\tLN:{str(self.bam_header[item])}\n'
131+
header += "@RG\tID:NEAT\tSM:NEAT\tLB:NEAT\tPL:NEAT\n"
132+
header_bytes = len(header)
132133
num_refs = len(self.bam_header)
133134
bam_handle.write(pack('<i', header_bytes))
134-
bam_handle.write(bam_header)
135+
bam_handle.write(header)
135136
bam_handle.write(pack('<i', num_refs))
136137
# Contigs and lengths. If we can skip writing this out for intermediate files, great
137138
for item in self.bam_header:
138139
name_length = len(item) + 1
139140
bam_handle.write(pack('<i', name_length))
140141
bam_handle.write(f'{item}\0')
141142
bam_handle.write(pack('<i', self.bam_header[item]))
143+
bam_handle.flush()
142144
bam_handle.close()
143145

144146
def write_fastq_record(self, filename: Path, record: str):
@@ -148,10 +150,15 @@ def write_fastq_record(self, filename: Path, record: str):
148150
_LOG.error(f"Tried to write fastq record to unknown file {filename}")
149151
raise ValueError
150152

151-
def close_files(self):
153+
def flush_and_close_files(self):
152154
for file_name in self.files_to_write:
153155
file_handle = self.files_to_write[file_name]
154-
file_handle.close()
156+
try:
157+
file_handle.flush()
158+
os.fsync(file_handle.fileno())
159+
file_handle.close()
160+
except ValueError:
161+
_LOG.debug(f"file {file_name} already closed")
155162

156163
def write_vcf_record(self, line: str):
157164
"""
@@ -171,14 +178,15 @@ def write_bam_record(
171178
self,
172179
read: Read,
173180
contig_id: int,
174-
bam_handle,
181+
bam_handle: BgzfWriter,
175182
read_length: int
176183
):
177184
"""
178185
Takes a read object and writes it out as a bam record
179186
180187
:param read: Read object to write to file
181188
:param contig_id: the index of the reference for this
189+
:param bam_handle: the handle to write data to
182190
:param read_length: the length of the read to output
183191
"""
184192
read_bin = reg2bin(read.position, read.end_point)

0 commit comments

Comments
 (0)