1+ """
2+ Unit tests for neat/quality_score_modeling/markov_utils.py
3+ """
4+
5+ import pytest
6+
7+ from neat .quality_score_modeling .markov_utils import (
8+ _down_bin_quality ,
9+ read_quality_lists ,
10+ compute_initial_distribution ,
11+ compute_position_distributions ,
12+ compute_transition_distributions ,
13+ build_markov_model ,
14+ )
15+
16+
17+ # ---------------------------------------------------------------------------
18+ # Helpers
19+ # ---------------------------------------------------------------------------
20+
21+ def _write_fastq (path , reads ):
22+ """Write a list of (seq, qual_string) tuples as a FASTQ file."""
23+ lines = []
24+ for i , (seq , qual ) in enumerate (reads ):
25+ lines += [f"@read{ i } " , seq , "+" , qual ]
26+ path .write_text ("\n " .join (lines ) + "\n " )
27+
28+
29+ # ---------------------------------------------------------------------------
30+ # _down_bin_quality
31+ # ---------------------------------------------------------------------------
32+
33+ def test_down_bin_exact_match ():
34+ assert _down_bin_quality (30 , [10 , 20 , 30 , 40 ]) == 30
35+
36+
37+ def test_down_bin_between_bins_maps_down ():
38+ assert _down_bin_quality (25 , [10 , 20 , 30 , 40 ]) == 20
39+
40+
41+ def test_down_bin_below_min_maps_to_first_bin ():
42+ assert _down_bin_quality (5 , [10 , 20 , 30 ]) == 10
43+
44+
45+ def test_down_bin_above_max_maps_to_last_bin ():
46+ assert _down_bin_quality (99 , [10 , 20 , 30 ]) == 30
47+
48+
49+ def test_down_bin_empty_allowed_returns_q_unchanged ():
50+ assert _down_bin_quality (25 , []) == 25
51+
52+
53+ def test_down_bin_single_bin ():
54+ assert _down_bin_quality (0 , [20 ]) == 20
55+ assert _down_bin_quality (20 , [20 ]) == 20
56+ assert _down_bin_quality (40 , [20 ]) == 20
57+
58+
59+ # ---------------------------------------------------------------------------
60+ # compute_initial_distribution
61+ # ---------------------------------------------------------------------------
62+
63+ def test_compute_initial_distribution_basic ():
64+ quals = [[30 , 31 , 32 ], [30 , 28 , 27 ], [35 , 30 , 29 ]]
65+ result = compute_initial_distribution (quals )
66+ assert result [30 ] == 2.0
67+ assert result [35 ] == 1.0
68+ assert 28 not in result # position 0 only
69+ assert 31 not in result
70+
71+
72+ def test_compute_initial_distribution_empty_input ():
73+ assert compute_initial_distribution ([]) == {}
74+
75+
76+ def test_compute_initial_distribution_skips_empty_reads ():
77+ result = compute_initial_distribution ([[], [30 , 31 ]])
78+ assert result == {30 : 1.0 }
79+
80+
81+ def test_compute_initial_distribution_single_read ():
82+ result = compute_initial_distribution ([[40 , 38 , 35 ]])
83+ assert result == {40 : 1.0 }
84+
85+
86+ # ---------------------------------------------------------------------------
87+ # compute_position_distributions
88+ # ---------------------------------------------------------------------------
89+
90+ def test_compute_position_distributions_basic ():
91+ quals = [[30 , 35 , 40 ], [30 , 36 , 41 ]]
92+ result = compute_position_distributions (quals , 3 )
93+ assert len (result ) == 3
94+ assert result [0 ][30 ] == 2.0
95+ assert result [1 ][35 ] == 1.0
96+ assert result [1 ][36 ] == 1.0
97+ assert result [2 ][40 ] == 1.0
98+ assert result [2 ][41 ] == 1.0
99+
100+
101+ def test_compute_position_distributions_skips_length_mismatch ():
102+ quals = [[30 , 31 , 32 ], [30 , 31 ]] # second read is wrong length
103+ result = compute_position_distributions (quals , 3 )
104+ assert result [0 ][30 ] == 1.0 # only first read counted
105+ assert 31 not in result [0 ]
106+
107+
108+ def test_compute_position_distributions_zero_length_returns_empty ():
109+ assert compute_position_distributions ([[30 , 31 ]], 0 ) == []
110+
111+
112+ def test_compute_position_distributions_single_position ():
113+ quals = [[40 ], [38 ], [40 ]]
114+ result = compute_position_distributions (quals , 1 )
115+ assert len (result ) == 1
116+ assert result [0 ][40 ] == 2.0
117+ assert result [0 ][38 ] == 1.0
118+
119+
120+ # ---------------------------------------------------------------------------
121+ # compute_transition_distributions
122+ # ---------------------------------------------------------------------------
123+
124+ def test_compute_transition_distributions_basic ():
125+ quals = [[30 , 31 , 32 ], [30 , 31 , 33 ]]
126+ result = compute_transition_distributions (quals , 3 )
127+ assert len (result ) == 2 # read_length - 1
128+ assert result [0 ][30 ][31 ] == 2.0 # 30→31 seen twice at position 0
129+ assert result [1 ][31 ][32 ] == 1.0 # 31→32 once at position 1
130+ assert result [1 ][31 ][33 ] == 1.0 # 31→33 once at position 1
131+
132+
133+ def test_compute_transition_distributions_read_length_one_returns_empty ():
134+ assert compute_transition_distributions ([[30 ]], 1 ) == []
135+
136+
137+ def test_compute_transition_distributions_read_length_zero_returns_empty ():
138+ assert compute_transition_distributions ([], 0 ) == []
139+
140+
141+ def test_compute_transition_distributions_skips_length_mismatch ():
142+ quals = [[30 , 31 , 32 ], [30 , 31 ]] # second read is wrong length
143+ result = compute_transition_distributions (quals , 3 )
144+ assert result [0 ][30 ][31 ] == 1.0 # only first read counted
145+
146+
147+ def test_compute_transition_distributions_self_transitions ():
148+ """A read with a constant quality score produces only self-transitions."""
149+ quals = [[35 , 35 , 35 , 35 ]]
150+ result = compute_transition_distributions (quals , 4 )
151+ assert len (result ) == 3
152+ for pos in result :
153+ assert pos [35 ][35 ] == 1.0
154+ assert len (pos ) == 1
155+
156+
157+ # ---------------------------------------------------------------------------
158+ # read_quality_lists
159+ # ---------------------------------------------------------------------------
160+
161+ def test_read_quality_lists_basic (tmp_path ):
162+ fq = tmp_path / "test.fastq"
163+ # 'I' = ASCII 73, offset 33 → score 40
164+ _write_fastq (fq , [("ACGTA" , "IIIII" ), ("ACGTA" , "IIIII" )])
165+ quals , read_length = read_quality_lists ([str (fq )], max_reads = 100 , offset = 33 )
166+ assert read_length == 5
167+ assert len (quals ) == 2
168+ assert quals [0 ] == [40 , 40 , 40 , 40 , 40 ]
169+
170+
171+ def test_read_quality_lists_applies_offset (tmp_path ):
172+ fq = tmp_path / "test.fastq"
173+ # '!' = ASCII 33, offset 33 → score 0
174+ _write_fastq (fq , [("ACGT" , "!!!!" )])
175+ quals , _ = read_quality_lists ([str (fq )], max_reads = 100 , offset = 33 )
176+ assert quals [0 ] == [0 , 0 , 0 , 0 ]
177+
178+
179+ def test_read_quality_lists_respects_max_reads (tmp_path ):
180+ fq = tmp_path / "test.fastq"
181+ _write_fastq (fq , [("ACGT" , "IIII" )] * 10 )
182+ quals , _ = read_quality_lists ([str (fq )], max_reads = 3 , offset = 33 )
183+ assert len (quals ) == 3
184+
185+
186+ def test_read_quality_lists_applies_binning (tmp_path ):
187+ fq = tmp_path / "test.fastq"
188+ # Scores 40 ('I'), binned to allowed [10, 20, 30] → maps to 30
189+ _write_fastq (fq , [("ACGT" , "IIII" )])
190+ quals , _ = read_quality_lists ([str (fq )], max_reads = 100 , offset = 33 ,
191+ allowed_quality_scores = [10 , 20 , 30 ])
192+ assert quals [0 ] == [30 , 30 , 30 , 30 ]
193+
194+
195+ def test_read_quality_lists_missing_file_raises ():
196+ with pytest .raises (FileNotFoundError ):
197+ read_quality_lists (["/nonexistent/path.fastq" ], max_reads = 10 , offset = 33 )
198+
199+
200+ def test_read_quality_lists_empty_file_returns_empty (tmp_path ):
201+ fq = tmp_path / "empty.fastq"
202+ fq .write_text ("" )
203+ quals , read_length = read_quality_lists ([str (fq )], max_reads = 100 , offset = 33 )
204+ assert quals == []
205+ assert read_length == 0
206+
207+
208+ # ---------------------------------------------------------------------------
209+ # build_markov_model (end-to-end)
210+ # ---------------------------------------------------------------------------
211+
212+ def test_build_markov_model_basic (tmp_path ):
213+ fq = tmp_path / "test.fastq"
214+ _write_fastq (fq , [("ACGTA" , "IIIII" ), ("ACGTA" , "IIIII" )])
215+ init , pos_dists , trans_dists , max_q , read_len = build_markov_model (
216+ [str (fq )], max_reads = 100 , offset = 33
217+ )
218+ assert read_len == 5
219+ assert max_q == 40
220+ assert init == {40 : 2.0 }
221+ assert len (pos_dists ) == 5
222+ assert len (trans_dists ) == 4 # read_length - 1
223+ assert trans_dists [0 ][40 ][40 ] == 2.0
224+
225+
226+ def test_build_markov_model_max_quality_capped_by_bins (tmp_path ):
227+ fq = tmp_path / "test.fastq"
228+ _write_fastq (fq , [("ACGT" , "IIII" )]) # score 40
229+ _ , _ , _ , max_q , _ = build_markov_model (
230+ [str (fq )], max_reads = 100 , offset = 33 ,
231+ allowed_quality_scores = [10 , 20 , 30 ]
232+ )
233+ assert max_q == 30 # capped to max allowed bin
234+
235+
236+ def test_build_markov_model_no_reads_raises (tmp_path ):
237+ fq = tmp_path / "empty.fastq"
238+ fq .write_text ("" )
239+ with pytest .raises (ValueError , match = "No quality scores" ):
240+ build_markov_model ([str (fq )], max_reads = 100 , offset = 33 )
0 commit comments