-
Notifications
You must be signed in to change notification settings - Fork 37
Expand file tree
/
Copy pathsolution.py
More file actions
81 lines (61 loc) · 2.44 KB
/
solution.py
File metadata and controls
81 lines (61 loc) · 2.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
"""Your Round 3 solution — DNA sequence matcher.
**Edit this file.** It currently delegates to ``baseline.py`` so everything
passes out of the box. Replace the body of ``find_matches`` with your
own faster implementation.
"""
from concurrent.futures import ThreadPoolExecutor
from mmap import mmap, ACCESS_READ
from os import fstat
def _scan_record(record: bytes, pattern: bytes) -> tuple[str, list[int]] | None:
""" Scan one FASTA record for all occurrences of ``pattern``.
Returns the record id and every zero-based match position, or ``None`` if
the record is empty or does not contain the pattern.
"""
if not record.strip():
return None
# Parition DNA record into header and DNA sequence
header, _, body = record.partition(b'\n')
record_id = header.strip().decode('ascii')
# Clean up data before parsing
sequence = (
body.replace(b'\n', b'')
.replace(b'\r', b'')
.replace(b' ', b'')
)
positions: list[int] = []
start = 0
# Advance by one after each hit so overlapping matches are included.
while True:
pos = sequence.find(pattern, start)
if pos == -1:
break
positions.append(pos)
start = pos + 1
if not positions:
return None
return record_id, positions
def find_matches(fasta_path: str, pattern: bytes) -> list[tuple[str, list[int]]]:
""" Find every FASTA record whose sequence contains ``pattern``.
Returns ``[(record_id, [positions...]), ...]`` in file order.
"""
with open(fasta_path, 'rb') as f:
if fstat(f.fileno()).st_size == 0:
return []
with mmap(f.fileno(), 0, access=ACCESS_READ) as text:
# Read the file as an mmap and break it up into DNA records
records: list[bytes] = []
start = text.find(b'>')
while start != -1:
end = text.find(b'>', start + 1)
if end == -1:
record = text[start + 1:]
start = -1
else:
record = text[start + 1:end]
start = end
if record.strip():
records.append(record)
# Scan records concurrently
with ThreadPoolExecutor() as executor:
results = executor.map(lambda record: _scan_record(record, pattern), records)
return [result for result in results if result is not None]