Skip to content

Commit eb8cee5

Browse files
committed
Add -R Cmdline Option
One given `@RG` SAM header line will replace existing read group lines in the header. This read group will be added to all reads in the output file.
1 parent 1cacf88 commit eb8cee5

4 files changed

Lines changed: 248 additions & 13 deletions

File tree

src/distributed/dispatcher.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,16 @@ use std::{
88
};
99

1010
use bio::data_structures::suffix_array::SuffixArray;
11+
use bstr::{BStr, BString, ByteSlice};
1112
use log::{debug, info, warn};
1213
use mio::{net, Events, Interest, Poll, Token};
1314
use noodles::{
1415
bam,
15-
sam::{self, alignment::io::Write as AlignmentWrite},
16+
sam::{
17+
self,
18+
alignment::io::Write as AlignmentWrite,
19+
header::record::value::map::{Map, ReadGroup},
20+
},
1621
};
1722
use rayon::prelude::*;
1823

@@ -56,6 +61,7 @@ pub struct Dispatcher<'a, 'b> {
5661
reference_path: &'b Path,
5762
out_file_path: &'b Path,
5863
force_overwrite: bool,
64+
read_group: Option<(BString, Map<ReadGroup>)>,
5965
alignment_parameters: &'a AlignmentParameters,
6066
connections: HashMap<Token, Connection>,
6167
accept_connections: bool,
@@ -71,6 +77,7 @@ impl<'a, 'b> Dispatcher<'a, 'b> {
7177
out_file_path: &'b str,
7278
force_overwrite: bool,
7379
alignment_parameters: &'a AlignmentParameters,
80+
read_group: Option<(BString, Map<ReadGroup>)>,
7481
) -> Result<Self> {
7582
let reads_path = Path::new(reads_path);
7683
let out_file_path = Path::new(out_file_path);
@@ -87,6 +94,7 @@ impl<'a, 'b> Dispatcher<'a, 'b> {
8794
out_file_path,
8895
force_overwrite,
8996
alignment_parameters,
97+
read_group,
9098
connections: HashMap::new(),
9199
accept_connections: true,
92100
})
@@ -133,7 +141,11 @@ impl<'a, 'b> Dispatcher<'a, 'b> {
133141
);
134142

135143
let mut input_source = InputSource::from_path(self.reads_path)?;
136-
let out_header = create_bam_header(input_source.header(), &identifier_position_map)?;
144+
let out_header = create_bam_header(
145+
input_source.header(),
146+
&identifier_position_map,
147+
self.read_group.clone(),
148+
)?;
137149
out_file.write_header(&out_header)?;
138150
let mut task_queue = input_source.task_queue(self.alignment_parameters.chunk_size);
139151
self.run_inner(
@@ -229,6 +241,7 @@ impl<'a, 'b> Dispatcher<'a, 'b> {
229241
identifier_position_map,
230242
original_symbols,
231243
self.alignment_parameters,
244+
self.read_group.as_ref().map(|(id, _map)| id.as_bstr()),
232245
out_header,
233246
out_file,
234247
)?;
@@ -331,6 +344,7 @@ impl<'a, 'b> Dispatcher<'a, 'b> {
331344
identifier_position_map: &FastaIdPositions,
332345
original_symbols: &OriginalSymbols,
333346
alignment_parameters: &AlignmentParameters,
347+
read_group: Option<&BStr>,
334348
out_header: &sam::Header,
335349
out_file: &mut bam::io::Writer<W>,
336350
) -> Result<()>
@@ -350,6 +364,7 @@ impl<'a, 'b> Dispatcher<'a, 'b> {
350364
original_symbols,
351365
Some(&duration),
352366
alignment_parameters,
367+
read_group,
353368
&mut rng,
354369
)
355370
})

src/main.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
use bstr::BString;
12
use clap::{crate_authors, crate_description, value_parser, Arg, ArgAction, ArgMatches, Command};
23
use log::{error, info, warn};
4+
use noodles::sam::{self, header::record::value::map::Map, header::record::value::map::ReadGroup};
35
use simple_logger::SimpleLogger;
46
use time::UtcOffset;
57

@@ -37,6 +39,21 @@ fn define_cli() -> ArgMatches {
3739
.ok_or_else(|| clap::Error::new(clap::error::ErrorKind::ValueValidation))
3840
}
3941

42+
fn parse_validate_read_group(raw_arg: &str) -> Result<(BString, Map<ReadGroup>), clap::Error> {
43+
let mut read_groups = raw_arg
44+
.replace(r"\t", "\t")
45+
.parse()
46+
.map(|header: sam::Header| header.read_groups().to_owned())
47+
.map_err(|_e| clap::Error::new(clap::error::ErrorKind::ValueValidation))?;
48+
if read_groups.len() != 1 {
49+
return Err(clap::Error::new(clap::error::ErrorKind::ValueValidation));
50+
}
51+
let rg = read_groups
52+
.pop()
53+
.expect("As per the above check, there is exactly one RG");
54+
Ok(rg)
55+
}
56+
4057
Command::new(CRATE_NAME)
4158
.about(crate_description!())
4259
.version(build_info::get_software_version())
@@ -261,6 +278,15 @@ fn define_cli() -> ArgMatches {
261278
it already exists."))
262279
.action(ArgAction::SetTrue)
263280
)
281+
.arg(
282+
Arg::new("read_group")
283+
.short('R')
284+
.long("read_group")
285+
.help("Read group SAM header line. The given read group ID will be \
286+
added to every read in the output file. \
287+
Example: '@RG\tID:identifier1\tSM:sample2'.")
288+
.value_parser(parse_validate_read_group)
289+
)
264290
)
265291
.subcommand(
266292
Command::new("worker")
@@ -328,6 +354,7 @@ fn start_mapper(map_matches: &ArgMatches, _seed: u64) {
328354
.get_one::<String>("output")
329355
.expect("Presence is ensured by CLI definition");
330356
let force_overwrite = map_matches.get_flag("force_overwrite");
357+
let read_group = map_matches.get_one("read_group").cloned();
331358

332359
let num_threads = *map_matches
333360
.get_one("num_threads")
@@ -350,6 +377,7 @@ fn start_mapper(map_matches: &ArgMatches, _seed: u64) {
350377
out_file_path,
351378
force_overwrite,
352379
&alignment_parameters,
380+
read_group,
353381
)
354382
.and_then(|mut dispatcher| dispatcher.run(port))
355383
} else {
@@ -359,6 +387,7 @@ fn start_mapper(map_matches: &ArgMatches, _seed: u64) {
359387
out_file_path,
360388
force_overwrite,
361389
&alignment_parameters,
390+
read_group,
362391
)
363392
} {
364393
error!("{}", e);

src/map/mapping.rs

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,17 @@ use std::{
1212
};
1313

1414
use bio::{alphabets::dna, data_structures::suffix_array::SuffixArray};
15-
use bstr::{BString, ByteSlice};
15+
use bstr::{BStr, BString, ByteSlice};
1616
use clap::crate_description;
1717
use log::{debug, info, trace};
1818
use min_max_heap::MinMaxHeap;
1919
use noodles::{
2020
bam,
21-
sam::{self, alignment::io::Write as SamWrite},
21+
sam::{
22+
self,
23+
alignment::io::Write as SamWrite,
24+
header::record::value::map::{Map, ReadGroup},
25+
},
2226
};
2327
use rand::RngCore;
2428
use rayon::prelude::*;
@@ -56,6 +60,7 @@ pub fn run(
5660
out_file_path: &str,
5761
force_overwrite: bool,
5862
alignment_parameters: &AlignmentParameters,
63+
read_group: Option<(BString, Map<ReadGroup>)>,
5964
) -> Result<()> {
6065
let reads_path = Path::new(reads_path);
6166
let out_file_path = Path::new(out_file_path);
@@ -97,7 +102,11 @@ pub fn run(
97102

98103
info!("Map reads");
99104
let mut input_source = InputSource::from_path(reads_path)?;
100-
let out_header = create_bam_header(input_source.header(), &identifier_position_map)?;
105+
let out_header = create_bam_header(
106+
input_source.header(),
107+
&identifier_position_map,
108+
read_group.clone(),
109+
)?;
101110
out_file.write_header(&out_header)?;
102111
run_inner(
103112
input_source.task_queue(alignment_parameters.chunk_size),
@@ -107,6 +116,7 @@ pub fn run(
107116
&identifier_position_map,
108117
&original_symbols,
109118
&out_header,
119+
read_group.as_ref().map(|(id, _map)| id.as_bstr()),
110120
&mut out_file,
111121
)?;
112122

@@ -125,6 +135,7 @@ fn run_inner<S, T, W>(
125135
identifier_position_map: &FastaIdPositions,
126136
original_symbols: &OriginalSymbols,
127137
out_header: &sam::Header,
138+
read_group: Option<&BStr>,
128139
out_file: &mut bam::io::Writer<W>,
129140
) -> Result<()>
130141
where
@@ -269,6 +280,7 @@ where
269280
original_symbols,
270281
Some(&duration),
271282
alignment_parameters,
283+
read_group,
272284
&mut rng,
273285
)
274286
},
@@ -288,6 +300,7 @@ where
288300
pub fn create_bam_header(
289301
src_header: Option<&sam::Header>,
290302
identifier_position_map: &FastaIdPositions,
303+
read_group: Option<(BString, Map<ReadGroup>)>,
291304
) -> Result<sam::Header> {
292305
use sam::header::record::value::map::{self, Map};
293306

@@ -343,9 +356,13 @@ pub fn create_bam_header(
343356
sam_header_builder = sam_header_builder.add_comment(comment.as_bstr());
344357
}
345358

346-
for (id, read_group) in src_header.read_groups() {
347-
sam_header_builder =
348-
sam_header_builder.add_read_group(id.as_bstr(), read_group.clone());
359+
if let Some((id, map)) = read_group {
360+
sam_header_builder = sam_header_builder.add_read_group(id, map);
361+
} else {
362+
for (id, read_group) in src_header.read_groups() {
363+
sam_header_builder =
364+
sam_header_builder.add_read_group(id.as_bstr(), read_group.clone());
365+
}
349366
}
350367
}
351368

@@ -390,6 +407,7 @@ pub fn intervals_to_bam<R, S>(
390407
original_symbols: &OriginalSymbols,
391408
duration: Option<&Duration>,
392409
alignment_parameters: &AlignmentParameters,
410+
read_group: Option<&BStr>,
393411
rng: &mut R,
394412
) -> Result<sam::alignment::RecordBuf>
395413
where
@@ -517,6 +535,7 @@ where
517535
duration,
518536
Some(alternative_hits),
519537
original_symbols,
538+
read_group,
520539
);
521540
}
522541
None => {
@@ -544,6 +563,7 @@ where
544563
duration,
545564
None,
546565
original_symbols,
566+
read_group,
547567
)
548568
}
549569

@@ -712,6 +732,7 @@ fn create_bam_record(
712732
// Contains valid content for the `YA` tag
713733
alternative_hits: Option<AlternativeAlignments>,
714734
original_symbols: &OriginalSymbols,
735+
read_group: Option<&BStr>,
715736
) -> Result<sam::alignment::RecordBuf> {
716737
let mut bam_builder = sam::alignment::RecordBuf::builder();
717738

@@ -822,9 +843,18 @@ fn create_bam_record(
822843
.into_iter()
823844
// Remove BWA (+ mapAD) specific auxiliary fields (avoids potential confusion)
824845
.filter(|(tag, _v)| !tag_filter.contains(&tag))
846+
// Don't copy input read group if one is given via cmdline
847+
.filter(|(tag, _v)| !(tag == b"RG" && read_group.is_some()))
825848
.map(|(tag, value)| Ok((tag.into(), value.into())))
826849
.collect::<Result<Vec<_>>>()?;
827850

851+
if let Some(read_group) = read_group {
852+
aux_data.push((
853+
sam::alignment::record::data::field::tag::Tag::READ_GROUP,
854+
sam::alignment::record_buf::data::field::Value::String(read_group.to_owned()),
855+
));
856+
}
857+
828858
if let Some(hit_interval) = hit_interval {
829859
aux_data.push((
830860
sam::alignment::record::data::field::tag::Tag::ALIGNMENT_SCORE,

0 commit comments

Comments
 (0)