Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,4 @@ scripts/
```

Each round's `data/` directory is generated locally and gitignored.
This is jsummer10's PR
41 changes: 35 additions & 6 deletions rounds/1_histogram/solution.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,43 @@
"""Your Round 1 solution — byte-pair histogram.

**Edit this file.** It currently delegates to ``baseline.py`` so everything
passes out of the box. Replace the body of ``compute_histogram`` with your
own faster implementation.
This version keeps the same contract as ``baseline.py`` but replaces the
per-bigram Python loop with NumPy operations over the whole byte buffer.
"""

from __future__ import annotations
from pathlib import Path

import numpy as np

DATA_DIR = Path(__file__).parent / "data"
FIXTURE_PATH = DATA_DIR / "fixture_payload.bin"


def compute_histogram(path: str) -> dict[bytes, int]:
"""Frequency of every 2-byte bigram in the file at ``path``."""
# TODO: remove this delegation and write your own implementation here.
from .baseline import compute_histogram as _baseline

return _baseline(path)
# Read the whole file into memory as a single bytes object
with open(path, 'rb') as f:
data = f.read()

# Expose the bytes object as a uint8 NumPy array without copying
byte_values = np.frombuffer(data, dtype=np.uint8)

# Encode each overlapping 2-byte window as a uint16 token
bigrams = byte_values[:-1].astype(np.uint16)
bigrams <<= 8
bigrams |= byte_values[1:]

# Count the uint16 tokens directly
counts = np.bincount(bigrams, minlength=1 << 16)

# Convert back into the return format
return {
int(token).to_bytes(2, "big"): int(count)
for token, count in enumerate(counts)
if count
}


if __name__ == '__main__':
compute_histogram(str(FIXTURE_PATH))
62 changes: 53 additions & 9 deletions rounds/2_corruption/solution.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,58 @@
"""Your Round 2 solution corruption scanner.
"""Your Round 2 solution - corruption scanner."""

**Edit this file.** It currently delegates to ``baseline.py`` so everything
passes out of the box. Replace the body of ``find_corruptions`` with your
own faster implementation.
"""
from __future__ import annotations

from .baseline import find_corruptions as _baseline
import mmap


_BLOCK_SIZE = 4096


def find_corruptions(ref_path: str, cor_path: str) -> list[tuple[int, int]]:
"""Return ``[(offset, length), ...]`` for every differing byte range."""
# TODO: remove this delegation and write your own implementation here.
return _baseline(ref_path, cor_path)
""" Return ``[(offset, length), ...]`` for every differing byte range. """

with open(ref_path, "rb") as ref_file, open(cor_path, "rb") as cor_file:
# Use the file size as the single source of truth before mapping.
size = ref_file.seek(0, 2)
if size != cor_file.seek(0, 2):
raise ValueError("reference and corrupted files differ in length")
if size == 0:
return []

ref_file.seek(0)
cor_file.seek(0)

with mmap.mmap(ref_file.fileno(), 0, access=mmap.ACCESS_READ) as ref:
with mmap.mmap(cor_file.fileno(), 0, access=mmap.ACCESS_READ) as cor:
ranges: list[tuple[int, int]] = []
# -1 means there is no currently open corruption range.
run_start = -1
append = ranges.append
block_size = _BLOCK_SIZE

for block_start in range(0, size, block_size):
block_end = min(block_start + block_size, size)

# Most blocks are identical, so skip them with a C-level
# bytes comparison instead of a Python loop over each byte.
if ref[block_start:block_end] == cor[block_start:block_end]:
if run_start != -1:
append((run_start, block_start - run_start))
run_start = -1
continue

# Only scan inside blocks that actually differ. Keeping
# run_start outside this loop lets ranges cross block edges.
for pos in range(block_start, block_end):
if ref[pos] != cor[pos]:
if run_start == -1:
run_start = pos
elif run_start != -1:
append((run_start, pos - run_start))
run_start = -1

# Close a corruption range that reaches the end of the file.
if run_start != -1:
append((run_start, size - run_start))

return ranges
72 changes: 68 additions & 4 deletions rounds/3_dna/solution.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,77 @@
own faster implementation.
"""

from .baseline import find_matches as _baseline
from concurrent.futures import ThreadPoolExecutor
from mmap import mmap, ACCESS_READ
from os import fstat


def _scan_record(record: bytes, pattern: bytes) -> tuple[str, list[int]] | None:
""" Scan one FASTA record for all occurrences of ``pattern``.

Returns the record id and every zero-based match position, or ``None`` if
the record is empty or does not contain the pattern.
"""

if not record.strip():
return None

# Parition DNA record into header and DNA sequence
header, _, body = record.partition(b'\n')
record_id = header.strip().decode('ascii')

# Clean up data before parsing
sequence = (
body.replace(b'\n', b'')
.replace(b'\r', b'')
.replace(b' ', b'')
)

positions: list[int] = []
start = 0

# Advance by one after each hit so overlapping matches are included.
while True:
pos = sequence.find(pattern, start)
if pos == -1:
break
positions.append(pos)
start = pos + 1

if not positions:
return None

return record_id, positions


def find_matches(fasta_path: str, pattern: bytes) -> list[tuple[str, list[int]]]:
"""Find every FASTA record whose sequence contains ``pattern``.
""" Find every FASTA record whose sequence contains ``pattern``.

Returns ``[(record_id, [positions...]), ...]`` in file order.
"""
# TODO: remove this delegation and write your own implementation here.
return _baseline(fasta_path, pattern)

with open(fasta_path, 'rb') as f:
if fstat(f.fileno()).st_size == 0:
return []

with mmap(f.fileno(), 0, access=ACCESS_READ) as text:
# Read the file as an mmap and break it up into DNA records
records: list[bytes] = []
start = text.find(b'>')
while start != -1:
end = text.find(b'>', start + 1)
if end == -1:
record = text[start + 1:]
start = -1
else:
record = text[start + 1:end]
start = end

if record.strip():
records.append(record)

# Scan records concurrently
with ThreadPoolExecutor() as executor:
results = executor.map(lambda record: _scan_record(record, pattern), records)

return [result for result in results if result is not None]
Loading