33from __future__ import annotations
44import re
55from concurrent .futures import ThreadPoolExecutor , as_completed
6+ import mmap
7+ import os
68
79
810def find_matches (fasta_path : str , pattern : bytes ) -> list [tuple [str , list [int ]]]:
@@ -11,38 +13,39 @@ def find_matches(fasta_path: str, pattern: bytes) -> list[tuple[str, list[int]]]
1113 Returns ``[(record_id, [positions...]), ...]`` in file order.
1214 """
1315 # Read as bytes — no decode overhead, pattern stays as bytes.
14- with open (fasta_path , "rb" ) as f :
15- data = f .read ()
16-
17- # Pre-compile a lookahead regex so overlapping matches are found in one pass.
1816 regex = re .compile (b"(?=" + re .escape (pattern ) + b")" )
19-
20- def process_record (record : bytes ) -> tuple [str , list [int ]] | None :
21- if not record .strip ():
22- return None
23- lines = record .split (b"\n " )
24- record_id = lines [0 ].strip ().decode ("ascii" )
17+ with open (fasta_path , "rb" ) as f :
18+ with mmap .mmap (f .fileno (), 0 , access = mmap .ACCESS_READ ) as mm :
19+ data = bytes (mm )
20+ size = len (data )
21+
22+ # Find record boundaries without copying data
23+ offsets = [0 ]
24+ pos = data .find (b">" , 1 )
25+ while pos != - 1 :
26+ offsets .append (pos )
27+ pos = data .find (b">" , pos + 1 )
28+ offsets .append (size )
29+
30+ def process_record (
31+ start : int , end : int , idx : int
32+ ) -> tuple [int , tuple [str , list [int ]]] | None :
33+ chunk = data [start :end ]
34+ lines = chunk .split (b"\n " )
35+ record_id = lines [0 ][1 :].strip ().decode ("ascii" )
2536 sequence = b"" .join (lines [1 :]).replace (b" " , b"" )
2637 positions = [m .start () for m in regex .finditer (sequence )]
2738 if positions :
28- return (record_id , positions )
39+ return (idx , ( record_id , positions ) )
2940 return None
3041
31- # Split on b'>' — first chunk is empty for well-formed files.
32- records = data .split (b">" )[1 :] # skip leading empty chunk
33-
34- results : list [tuple [str , list [int ]]] = []
35-
36- # re operations release the GIL, so ThreadPoolExecutor gives real parallelism.
37- with ThreadPoolExecutor () as executor :
38- # Submit in order, preserve file order via index.
39- futures = {executor .submit (process_record , r ): i for i , r in enumerate (records )}
40- ordered : list [tuple [int , tuple [str , list [int ]]]] = []
41- for future in as_completed (futures ):
42- result = future .result ()
43- if result is not None :
44- ordered .append ((futures [future ], result ))
42+ max_workers = min (32 , (os .cpu_count () or 1 ) * 2 )
43+ with ThreadPoolExecutor (max_workers = max_workers ) as executor :
44+ futures = [
45+ executor .submit (process_record , offsets [i ], offsets [i + 1 ], i )
46+ for i in range (len (offsets ) - 1 )
47+ ]
48+ results = [r for f in as_completed (futures ) if (r := f .result ()) is not None ]
4549
46- ordered .sort (key = lambda x : x [0 ])
47- results = [r for _ , r in ordered ]
48- return results
50+ results .sort (key = lambda x : x [0 ])
51+ return [r for _ , r in results ]
0 commit comments