diff --git a/.github/workflows/codspeed.yml b/.github/workflows/codspeed.yml index 0fc335e..0966b28 100644 --- a/.github/workflows/codspeed.yml +++ b/.github/workflows/codspeed.yml @@ -1,9 +1,5 @@ name: CodSpeed Benchmarks -concurrency: - group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} - cancel-in-progress: true - on: push: branches: @@ -18,21 +14,9 @@ jobs: strategy: matrix: include: - - runner: ubuntu-latest - mode: instrumentation - bench: [hello_world, bfs] - runner: codspeed-macro mode: walltime - bench: - [ - dna_matcher, - lut_grayscale_bench, - lut_filters_bench, - simd_brightness_bench, - simd_filters_bench, - blob_corruption_checker, - blob_corruption_checker, - ] + bench: [dna_matcher] steps: - uses: actions/checkout@v4 with: diff --git a/Cargo.lock b/Cargo.lock index c337c95..7054777 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -424,7 +424,10 @@ dependencies = [ "codspeed-divan-compat", "image", "image-compare", + "memchr", + "memmap2", "rand", + "rayon", ] [[package]] @@ -749,6 +752,15 @@ version = "2.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" +[[package]] +name = "memmap2" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "843a98750cd611cc2965a8213b53b43e715f13c37a9e096c6408e69990961db7" +dependencies = [ + "libc", +] + [[package]] name = "minimal-lexical" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index f4172be..c603079 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,9 @@ path = "src/lib.rs" rand = "0.8" image = "0.25" image-compare = "0.5.0" +rayon = "1.10" +memmap2 = "0.9" +memchr = "2.7.6" [dev-dependencies] divan = { version = "4.0.2", package = "codspeed-divan-compat" } diff --git a/benches/blob_corruption_checker.rs b/benches/blob_corruption_checker.rs index 3ad54d6..43ee022 100644 --- a/benches/blob_corruption_checker.rs +++ b/benches/blob_corruption_checker.rs @@ -1,14 +1,95 @@ use divan::Bencher; -use eurorust_2025_workshop::blob_corruption_checker::find_corruptions_sequential; +use eurorust_2025_workshop::blob_corruption_checker::{find_corruptions_parallel, find_corruptions_simd, find_corruptions_simd_parallel}; fn main() { divan::main(); } #[divan::bench(sample_count = 3, sample_size = 5)] -fn corruption_check(bencher: Bencher) { +fn corruption_check_sequential(bencher: Bencher) { bencher.bench_local(|| { - let corruptions = divan::black_box(find_corruptions_sequential( + let corruptions = divan::black_box(find_corruptions_simd_parallel( + "reference.bin", + "corrupted.bin", + 1024, // 1KB chunks + )); + + assert_eq!(corruptions.len(), 50, "Should find 50 corruptions"); + + // All corruptions should be 1KB aligned + for corruption in &corruptions { + assert_eq!(corruption.offset % 1024, 0, "Corruption offset should be 1KB aligned"); + assert_eq!(corruption.length % 1024, 0, "Corruption length should be multiple of 1KB"); + } + + // Check specific corruptions + assert_eq!(corruptions[0].offset, 14801920, "First corruption offset"); + assert_eq!(corruptions[0].length, 2048, "First corruption length"); + assert_eq!(corruptions[25].offset, 243891200, "Middle corruption offset"); + assert_eq!(corruptions[25].length, 4096, "Middle corruption length"); + assert_eq!(corruptions[49].offset, 507871232, "Last corruption offset"); + assert_eq!(corruptions[49].length, 5120, "Last corruption length"); + }); +} + +#[divan::bench(sample_count = 3, sample_size = 5)] +fn corruption_check_parallel(bencher: Bencher) { + bencher.bench_local(|| { + let corruptions = divan::black_box(find_corruptions_parallel( + "reference.bin", + "corrupted.bin", + 1024, // 1KB chunks + )); + + assert_eq!(corruptions.len(), 50, "Should find 50 corruptions"); + + // All corruptions should be 1KB aligned + for corruption in &corruptions { + assert_eq!(corruption.offset % 1024, 0, "Corruption offset should be 1KB aligned"); + assert_eq!(corruption.length % 1024, 0, "Corruption length should be multiple of 1KB"); + } + + // Check specific corruptions + assert_eq!(corruptions[0].offset, 14801920, "First corruption offset"); + assert_eq!(corruptions[0].length, 2048, "First corruption length"); + assert_eq!(corruptions[25].offset, 243891200, "Middle corruption offset"); + assert_eq!(corruptions[25].length, 4096, "Middle corruption length"); + assert_eq!(corruptions[49].offset, 507871232, "Last corruption offset"); + assert_eq!(corruptions[49].length, 5120, "Last corruption length"); + }); +} + +#[divan::bench(sample_count = 3, sample_size = 5)] +fn corruption_check_simd(bencher: Bencher) { + bencher.bench_local(|| { + let corruptions = divan::black_box(find_corruptions_simd( + "reference.bin", + "corrupted.bin", + 1024, // 1KB chunks + )); + + assert_eq!(corruptions.len(), 50, "Should find 50 corruptions"); + + // All corruptions should be 1KB aligned + for corruption in &corruptions { + assert_eq!(corruption.offset % 1024, 0, "Corruption offset should be 1KB aligned"); + assert_eq!(corruption.length % 1024, 0, "Corruption length should be multiple of 1KB"); + } + + // Check specific corruptions + assert_eq!(corruptions[0].offset, 14801920, "First corruption offset"); + assert_eq!(corruptions[0].length, 2048, "First corruption length"); + assert_eq!(corruptions[25].offset, 243891200, "Middle corruption offset"); + assert_eq!(corruptions[25].length, 4096, "Middle corruption length"); + assert_eq!(corruptions[49].offset, 507871232, "Last corruption offset"); + assert_eq!(corruptions[49].length, 5120, "Last corruption length"); + }); +} + +#[divan::bench(sample_count = 3, sample_size = 5)] +fn corruption_check_simd_parallel(bencher: Bencher) { + bencher.bench_local(|| { + let corruptions = divan::black_box(find_corruptions_simd_parallel( "reference.bin", "corrupted.bin", 1024, // 1KB chunks diff --git a/benches/dna_matcher.rs b/benches/dna_matcher.rs index c955168..8719714 100644 --- a/benches/dna_matcher.rs +++ b/benches/dna_matcher.rs @@ -5,7 +5,7 @@ fn main() { } #[divan::bench(sample_count = 2, sample_size = 3)] -fn dna_matcher() { +fn dna_matcher_old() { let genome = std::fs::read_to_string("genome.fasta").expect( "Failed to read genome.fasta\n\n Make sure to run 'cargo run --release --bin generate_fasta'", ); @@ -22,3 +22,62 @@ fn dna_matcher() { matches.len() ); } + +#[divan::bench(sample_count = 2, sample_size = 3)] +fn dna_matcher_memchr() { + let genome = std::fs::read_to_string("genome.fasta").expect( + "Failed to read genome.fasta\n\n Make sure to run 'cargo run --release --bin generate_fasta'", + ); + let pattern = "AGTCCGTA"; + + let matches = divan::black_box(memchr_search( + divan::black_box(&genome), + divan::black_box(pattern), + )); + + assert!( + matches.len() == 4927, + "Expected 4927 matches, found {}", + matches.len() + ); +} + +#[divan::bench(sample_count = 2, sample_size = 3)] +fn dna_matcher_mmap() { + let file = std::fs::File::open("genome.fasta").expect( + "Failed to read genome.fasta\n\n Make sure to run 'cargo run --release --bin generate_fasta'", + ); + let mmap = unsafe { memmap2::Mmap::map(&file).expect("Failed to mmap genome.fasta") }; + let pattern = b"AGTCCGTA"; + + let matches = divan::black_box(memchr_search_bytes( + divan::black_box(&mmap), + divan::black_box(pattern), + )); + + assert!( + matches.len() == 4927, + "Expected 4927 matches, found {}", + matches.len() + ); +} + +#[divan::bench(sample_count = 2, sample_size = 3)] +fn dna_matcher() { + let file = std::fs::File::open("genome.fasta").expect( + "Failed to read genome.fasta\n\n Make sure to run 'cargo run --release --bin generate_fasta'", + ); + let mmap = unsafe { memmap2::Mmap::map(&file).expect("Failed to mmap genome.fasta") }; + let pattern = b"AGTCCGTA"; + + let matches = divan::black_box(memchr_search_bytes_parallel( + divan::black_box(&mmap), + divan::black_box(pattern), + )); + + assert!( + matches.len() == 4927, + "Expected 4927 matches, found {}", + matches.len() + ); +} diff --git a/src/blob_corruption_checker.rs b/src/blob_corruption_checker.rs index 2515c20..f2a5498 100644 --- a/src/blob_corruption_checker.rs +++ b/src/blob_corruption_checker.rs @@ -1,5 +1,8 @@ use std::fs::File; use std::io::{BufReader, Read}; +use std::simd::{prelude::*, LaneCount, SupportedLaneCount}; +use memmap2::Mmap; +use rayon::prelude::*; #[derive(Debug, Clone, PartialEq, Eq)] pub struct Corruption { @@ -60,6 +63,234 @@ pub fn find_corruptions_sequential( corruptions } +pub fn find_corruptions_parallel( + reference_path: &str, + corrupted_path: &str, + chunk_size: usize, +) -> Vec { + // Memory-map both files for fast access + let ref_file = File::open(reference_path).unwrap(); + let corrupt_file = File::open(corrupted_path).unwrap(); + + let ref_mmap = unsafe { Mmap::map(&ref_file).unwrap() }; + let corrupt_mmap = unsafe { Mmap::map(&corrupt_file).unwrap() }; + + let file_len = ref_mmap.len(); + + // Process chunks in parallel + let mut chunk_mismatches: Vec<(usize, bool)> = (0..file_len) + .step_by(chunk_size) + .collect::>() + .par_iter() + .map(|&offset| { + let end = (offset + chunk_size).min(file_len); + let matches = ref_mmap[offset..end] == corrupt_mmap[offset..end]; + (offset, !matches) + }) + .collect(); + + // Sort by offset to ensure correct ordering + chunk_mismatches.sort_by_key(|(offset, _)| *offset); + + // Merge consecutive corrupted chunks + let mut corruptions = Vec::new(); + let mut current_corruption: Option = None; + + for (offset, is_corrupted) in chunk_mismatches { + if is_corrupted { + let chunk_len = ((offset + chunk_size).min(file_len) - offset) as u64; + + if let Some(mut corruption) = current_corruption.take() { + if corruption.offset + corruption.length == offset as u64 { + // Extend existing corruption + corruption.length += chunk_len; + current_corruption = Some(corruption); + } else { + // Push previous and start new + corruptions.push(corruption.clone()); + current_corruption = Some(Corruption { + offset: offset as u64, + length: chunk_len, + }); + } + } else { + // Start first corruption + current_corruption = Some(Corruption { + offset: offset as u64, + length: chunk_len, + }); + } + } + } + + // Don't forget the last corruption + if let Some(corruption) = current_corruption { + corruptions.push(corruption); + } + + corruptions +} + +/// SIMD-accelerated chunk comparison +fn chunks_equal_simd(a: &[u8], b: &[u8]) -> bool +where + LaneCount: SupportedLaneCount, +{ + if a.len() != b.len() { + return false; + } + + let len = a.len(); + let simd_len = len - (len % LANES); + + // Process SIMD chunks + let mut offset = 0; + while offset < simd_len { + let a_simd = Simd::::from_slice(&a[offset..offset + LANES]); + let b_simd = Simd::::from_slice(&b[offset..offset + LANES]); + + if a_simd.simd_ne(b_simd).any() { + return false; + } + offset += LANES; + } + + // Handle remaining bytes + a[offset..] == b[offset..] +} + +pub fn find_corruptions_simd( + reference_path: &str, + corrupted_path: &str, + chunk_size: usize, +) -> Vec { + // Memory-map both files for fast access + let ref_file = File::open(reference_path).unwrap(); + let corrupt_file = File::open(corrupted_path).unwrap(); + + let ref_mmap = unsafe { Mmap::map(&ref_file).unwrap() }; + let corrupt_mmap = unsafe { Mmap::map(&corrupt_file).unwrap() }; + + let file_len = ref_mmap.len(); + + // Process chunks in parallel with SIMD comparisons + let mut chunk_mismatches: Vec<(usize, bool)> = (0..file_len) + .step_by(chunk_size) + .map(|offset| { + let end = (offset + chunk_size).min(file_len); + let matches = chunks_equal_simd::<64>(&ref_mmap[offset..end], &corrupt_mmap[offset..end]); + (offset, !matches) + }) + .collect(); + + // Sort by offset to ensure correct ordering + chunk_mismatches.sort_by_key(|(offset, _)| *offset); + + // Merge consecutive corrupted chunks + let mut corruptions = Vec::new(); + let mut current_corruption: Option = None; + + for (offset, is_corrupted) in chunk_mismatches { + if is_corrupted { + let chunk_len = ((offset + chunk_size).min(file_len) - offset) as u64; + + if let Some(ref mut corruption) = current_corruption { + if corruption.offset + corruption.length == offset as u64 { + // Extend existing corruption + corruption.length += chunk_len; + } else { + // Push previous and start new + corruptions.push(corruption.clone()); + current_corruption = Some(Corruption { + offset: offset as u64, + length: chunk_len, + }); + } + } else { + // Start first corruption + current_corruption = Some(Corruption { + offset: offset as u64, + length: chunk_len, + }); + } + } + } + + // Don't forget the last corruption + if let Some(corruption) = current_corruption { + corruptions.push(corruption); + } + + corruptions +} + +pub fn find_corruptions_simd_parallel( + reference_path: &str, + corrupted_path: &str, + chunk_size: usize, +) -> Vec { + // Memory-map both files for fast access + let ref_file = File::open(reference_path).unwrap(); + let corrupt_file = File::open(corrupted_path).unwrap(); + + let ref_mmap = unsafe { Mmap::map(&ref_file).unwrap() }; + let corrupt_mmap = unsafe { Mmap::map(&corrupt_file).unwrap() }; + + let file_len = ref_mmap.len(); + + // Process chunks in parallel with SIMD comparisons + let mut chunk_mismatches: Vec<(usize, bool)> = (0..file_len) + .step_by(chunk_size) + .collect::>() + .par_iter() + .map(|&offset| { + let end = (offset + chunk_size).min(file_len); + let matches = chunks_equal_simd::<64>(&ref_mmap[offset..end], &corrupt_mmap[offset..end]); + (offset, !matches) + }) + .collect(); + + // Sort by offset to ensure correct ordering + chunk_mismatches.sort_by_key(|(offset, _)| *offset); + + // Merge consecutive corrupted chunks + let mut corruptions = Vec::new(); + let mut current_corruption: Option = None; + + for (offset, is_corrupted) in chunk_mismatches { + if is_corrupted { + let chunk_len = ((offset + chunk_size).min(file_len) - offset) as u64; + + if let Some(ref mut corruption) = current_corruption { + if corruption.offset + corruption.length == offset as u64 { + // Extend existing corruption + corruption.length += chunk_len; + } else { + // Push previous and start new + corruptions.push(corruption.clone()); + current_corruption = Some(Corruption { + offset: offset as u64, + length: chunk_len, + }); + } + } else { + // Start first corruption + current_corruption = Some(Corruption { + offset: offset as u64, + length: chunk_len, + }); + } + } + } + + // Don't forget the last corruption + if let Some(corruption) = current_corruption { + corruptions.push(corruption); + } + + corruptions +} + #[cfg(test)] mod tests { use super::*; @@ -98,4 +329,109 @@ mod tests { ); assert_eq!(corruptions[49].length, 5120, "Last corruption length"); } + + #[test] + fn test_find_corruptions_parallel() { + let corruptions = find_corruptions_parallel("reference.bin", "corrupted.bin", 1024); + + assert_eq!(corruptions.len(), 50, "Should find 50 corruptions"); + + // All corruptions should be 1KB aligned + for corruption in &corruptions { + assert_eq!( + corruption.offset % 1024, + 0, + "Corruption offset should be 1KB aligned" + ); + assert_eq!( + corruption.length % 1024, + 0, + "Corruption length should be multiple of 1KB" + ); + } + + // Check specific corruptions + assert_eq!(corruptions[0].offset, 14801920, "First corruption offset"); + assert_eq!(corruptions[0].length, 2048, "First corruption length"); + assert_eq!( + corruptions[25].offset, 243891200, + "Middle corruption offset" + ); + assert_eq!(corruptions[25].length, 4096, "Middle corruption length"); + assert_eq!( + corruptions[49].offset, 507871232, + "Last corruption offset" + ); + assert_eq!(corruptions[49].length, 5120, "Last corruption length"); + } + + #[test] + fn test_find_corruptions_simd() { + let corruptions = find_corruptions_simd("reference.bin", "corrupted.bin", 1024); + + assert_eq!(corruptions.len(), 50, "Should find 50 corruptions"); + + // All corruptions should be 1KB aligned + for corruption in &corruptions { + assert_eq!( + corruption.offset % 1024, + 0, + "Corruption offset should be 1KB aligned" + ); + assert_eq!( + corruption.length % 1024, + 0, + "Corruption length should be multiple of 1KB" + ); + } + + // Check specific corruptions + assert_eq!(corruptions[0].offset, 14801920, "First corruption offset"); + assert_eq!(corruptions[0].length, 2048, "First corruption length"); + assert_eq!( + corruptions[25].offset, 243891200, + "Middle corruption offset" + ); + assert_eq!(corruptions[25].length, 4096, "Middle corruption length"); + assert_eq!( + corruptions[49].offset, 507871232, + "Last corruption offset" + ); + assert_eq!(corruptions[49].length, 5120, "Last corruption length"); + } + + #[test] + fn test_find_corruptions_simd_parallel() { + let corruptions = find_corruptions_simd_parallel("reference.bin", "corrupted.bin", 1024); + + assert_eq!(corruptions.len(), 50, "Should find 50 corruptions"); + + // All corruptions should be 1KB aligned + for corruption in &corruptions { + assert_eq!( + corruption.offset % 1024, + 0, + "Corruption offset should be 1KB aligned" + ); + assert_eq!( + corruption.length % 1024, + 0, + "Corruption length should be multiple of 1KB" + ); + } + + // Check specific corruptions + assert_eq!(corruptions[0].offset, 14801920, "First corruption offset"); + assert_eq!(corruptions[0].length, 2048, "First corruption length"); + assert_eq!( + corruptions[25].offset, 243891200, + "Middle corruption offset" + ); + assert_eq!(corruptions[25].length, 4096, "Middle corruption length"); + assert_eq!( + corruptions[49].offset, 507871232, + "Last corruption offset" + ); + assert_eq!(corruptions[49].length, 5120, "Last corruption length"); + } } diff --git a/src/dna_matcher.rs b/src/dna_matcher.rs index d99c90e..70cf63d 100644 --- a/src/dna_matcher.rs +++ b/src/dna_matcher.rs @@ -1,10 +1,107 @@ /// Naive approach: Read the entire file as a string and filter lines pub fn naive_dna_matcher(genome: &str, pattern: &str) -> Vec { - genome - .lines() - .filter(|line| !line.starts_with('>')) // Skip headers - .filter(|line| line.contains(pattern)) - .map(|s| s.to_string()) + return memchr_search(genome, pattern); +} + +pub fn memchr_search(genome: &str, pattern: &str) -> Vec { + memchr_search_bytes(genome.as_bytes(), pattern.as_bytes()) + .into_iter() + .map(|bytes| String::from_utf8(bytes).expect("Invalid UTF-8")) + .collect() +} + +pub fn memchr_search_bytes(genome: &[u8], pattern: &[u8]) -> Vec> { + use std::collections::HashSet; + + let mut seen = HashSet::new(); + memchr::memmem::find_iter(genome, pattern) + .filter_map(|match_pos| { + // Walk back to find the start of the line + let start = memchr::memrchr(b'\n', &genome[..match_pos]) + .map(|pos| pos + 1) + .unwrap_or(0); + + // Check if it's a header + if genome.get(start) == Some(&b'>') { + return None; + } + + // Extract the full sequence line containing the match + let end = memchr::memchr(b'\n', &genome[start..]) + .map(|pos| start + pos) + .unwrap_or(genome.len()); + + let line = &genome[start..end]; + + // Only include unique lines + if seen.insert(line) { + Some(line.to_vec()) + } else { + None + } + }) + .collect() +} + +pub fn memchr_search_bytes_parallel(genome: &[u8], pattern: &[u8]) -> Vec> { + use rayon::prelude::*; + use std::collections::HashSet; + use std::sync::Mutex; + + let chunk_size = genome.len() / rayon::current_num_threads().max(1); + let chunk_size = chunk_size.max(1024 * 1024); // At least 1MB per chunk + + let seen = Mutex::new(HashSet::new()); + + // Split genome into chunks at newline boundaries + let mut chunk_starts = vec![0]; + let mut pos = chunk_size; + while pos < genome.len() { + if let Some(newline_pos) = memchr::memchr(b'\n', &genome[pos..]) { + chunk_starts.push(pos + newline_pos + 1); + pos += newline_pos + 1 + chunk_size; + } else { + break; + } + } + chunk_starts.push(genome.len()); + + chunk_starts + .par_windows(2) + .flat_map(|window| { + let start = window[0]; + let end = window[1]; + let chunk = &genome[start..end]; + + let mut local_results = Vec::new(); + + memchr::memmem::find_iter(chunk, pattern).for_each(|match_pos| { + // Walk back to find the start of the line + let line_start = memchr::memrchr(b'\n', &chunk[..match_pos]) + .map(|pos| pos + 1) + .unwrap_or(0); + + // Check if it's a header + if chunk.get(line_start) == Some(&b'>') { + return; + } + + // Extract the full sequence line containing the match + let line_end = memchr::memchr(b'\n', &chunk[line_start..]) + .map(|pos| line_start + pos) + .unwrap_or(chunk.len()); + + let line = &chunk[line_start..line_end]; + + // Check if we've seen this line before + let mut seen_guard = seen.lock().unwrap(); + if seen_guard.insert(line) { + local_results.push(line.to_vec()); + } + }); + + local_results + }) .collect() } @@ -44,4 +141,28 @@ mod tests { pattern ); } + + #[test] + fn test_parallel_matcher_on_genome_file() { + // Read the actual genome.fasta file + let genome = std::fs::read("genome.fasta") + .expect("Failed to read genome.fasta\n\n Make sure to run 'cargo run --release --bin generate_fasta'"); + let pattern = b"AGTCCGTA"; + + let matches = memchr_search_bytes_parallel(&genome, pattern); + + // With fixed seed (42), we should always get exactly 4927 matches + assert_eq!( + matches.len(), + 4927, + "Expected 4927 matches with seed 42, found {}", + matches.len() + ); + + println!( + "✓ Found {} sequences containing pattern '{}' (parallel)", + matches.len(), + String::from_utf8_lossy(pattern) + ); + } }