diff --git a/api_changes/update_260424_next_stable_chunk_boundary.md b/api_changes/update_260424_next_stable_chunk_boundary.md new file mode 100644 index 00000000..6f9aba3d --- /dev/null +++ b/api_changes/update_260424_next_stable_chunk_boundary.md @@ -0,0 +1,16 @@ +This update adds a new public deduplication helper for computing restart-safe chunk boundaries from existing chunk boundary metadata. + +What changed +- Added `xet_data::deduplication::next_stable_chunk_boundary(starting_position, chunk_boundaries) -> Option`. +- Re-exported it from `xet_data::deduplication` so downstream crates can use it directly. +- The function scans forward from `starting_position` and returns the next chunk boundary that satisfies the stable-boundary condition: + - two consecutive chunk sizes in `[2 * min_chunk, max_chunk - min_chunk)`, + - where `min_chunk` and `max_chunk` are derived from chunking constants. + +Why this matters +- Callers that already have chunk-boundary metadata can locate a stable resume boundary without re-reading file bytes. +- This enables deterministic alignment behavior for resumed/partial workflows that need chunk boundaries robust to prefix changes. + +Usage notes +- `chunk_boundaries` should be monotonically increasing chunk-end offsets produced by the same chunking configuration. +- `starting_position` may be any byte offset (not necessarily a chunk boundary) and is used as the reference offset from which to search for the next stable chunk boundary. diff --git a/xet_data/src/deduplication/chunking.rs b/xet_data/src/deduplication/chunking.rs index 399f1daf..82976e7d 100644 --- a/xet_data/src/deduplication/chunking.rs +++ b/xet_data/src/deduplication/chunking.rs @@ -282,9 +282,15 @@ impl Chunker { /// partition_scan_bytes is the number of bytes to scan at each /// proposed partition boundary in search of a valid chunk. /// -/// Due to a known issue in how we do chunking, note that these -/// partitions are not 100% guaranteed to align. See the -/// parallel_chunking.pdf for details. +/// Partition alignment is guaranteed by the hash warmup fix: the +/// chunker feeds `min_chunk - 64 - 1` bytes before scanning for +/// boundaries, ensuring the gear hash window is fully warmed (purely +/// data-dependent) at all accepted trigger positions. This function +/// additionally verifies the absence of hidden triggers by re-chunking +/// with `min_chunk = 0`. See `parallel chunking.lyx` for the proof. +/// +/// For finding stable chunk boundaries from existing chunk boundaries (without +/// data access), see [`next_stable_chunk_boundary`]. pub fn find_partitions( reader: &mut R, file_size: usize, @@ -353,6 +359,53 @@ pub fn find_partitions( Ok(partitions) } +/// Given a list of chunk boundaries in a file and an arbitrary reference position, +/// returns the next stable chunk boundary at or after that position. +/// +/// `starting_position` may be any byte offset in the file; it does not need to +/// be an existing chunk boundary. The search starts at the first chunk boundary +/// `>= starting_position`. +/// +/// A stable chunk boundary is defined such that any possible changes in the data +/// before `starting_position` would produce the same chunk boundaries at the +/// stable boundary and later. The fixed data between `starting_position` and +/// the returned stable boundary is always sufficient to restore the chunker to +/// its original chunk boundaries. +/// +/// The stability condition requires two consecutive chunks after `starting_position`, +/// both with sizes in `[2 * min_chunk, max_chunk - min_chunk)`. The boundary +/// at the end of the second such chunk is the stable chunk boundary. +/// +/// The lower bound is `2 * min_chunk` rather than `min_chunk` (as used in +/// [`find_partitions`]) because this function operates on existing chunk +/// boundaries without data access, and cannot verify the absence of hidden +/// hash triggers in the `[c_k, c_k + min_chunk)` skip zone. A shadow-zone +/// trigger can advance a modified chunker by up to `min_chunk`, so the next +/// chunk must be at least `2 * min_chunk` to remain reachable. +/// +/// See `parallel chunking.lyx` for the full proof and `find_stable_start` in +/// `merkle_hash_subtree.rs` for the analogous construction in merkle hashing. +pub fn next_stable_chunk_boundary(starting_position: usize, chunk_boundaries: &[usize]) -> Option { + let minimum_chunk = *TARGET_CHUNK_SIZE / *MINIMUM_CHUNK_DIVISOR; + let maximum_chunk = *TARGET_CHUNK_SIZE * *MAXIMUM_CHUNK_MULTIPLIER; + + let start_idx = chunk_boundaries.partition_point(|&x| x < starting_position); + + for i in start_idx..chunk_boundaries.len().saturating_sub(2) { + let size_a = chunk_boundaries[i + 1] - chunk_boundaries[i]; + let size_b = chunk_boundaries[i + 2] - chunk_boundaries[i + 1]; + + if size_a >= 2 * minimum_chunk + && size_a < maximum_chunk - minimum_chunk + && size_b >= 2 * minimum_chunk + && size_b < maximum_chunk - minimum_chunk + { + return Some(chunk_boundaries[i + 2]); + } + } + None +} + #[cfg(test)] mod tests { use std::collections::HashSet; diff --git a/xet_data/src/deduplication/mod.rs b/xet_data/src/deduplication/mod.rs index b7e9ebfe..f2614984 100644 --- a/xet_data/src/deduplication/mod.rs +++ b/xet_data/src/deduplication/mod.rs @@ -5,7 +5,7 @@ mod defrag_prevention; mod file_deduplication; mod interface; -pub use chunking::{Chunker, find_partitions}; +pub use chunking::{Chunker, find_partitions, next_stable_chunk_boundary}; pub use data_aggregator::DataAggregator; pub use dedup_metrics::DeduplicationMetrics; pub use file_deduplication::FileDeduper; diff --git a/xet_data/src/deduplication/parallel chunking.lyx b/xet_data/src/deduplication/parallel chunking.lyx index 881cbafe..d95427e0 100644 --- a/xet_data/src/deduplication/parallel chunking.lyx +++ b/xet_data/src/deduplication/parallel chunking.lyx @@ -827,9 +827,58 @@ Now, with this implementation, can we still perform parallel chunking? \end_layout \begin_layout Standard -I do not believe so. - Due to the hash disagreement, under adverserial settings it is possible - to construct two chunk sequences which will *never* align. + +\series bold +Update: +\series default + The implementation has been fixed. + Instead of skipping +\begin_inset Formula $m$ +\end_inset + + bytes (setting HashStreamStart = +\begin_inset Formula $i+m$ +\end_inset + +), the chunker now starts feeding the hash at +\begin_inset Formula $m-k-1$ +\end_inset + + bytes from the chunk start. + This ensures the hash window has been fed at least +\begin_inset Formula $k+1$ +\end_inset + + bytes by position +\begin_inset Formula $m$ +\end_inset + +, so the hash output at every accepted trigger position ( +\begin_inset Formula $\ge m$ +\end_inset + + from chunk start) depends only on the last +\begin_inset Formula $k$ +\end_inset + + bytes of data, independent of the chunk starting point. + Therefore +\begin_inset Formula $H(a_{1},b)=H(a_{2},b)$ +\end_inset + + for all +\begin_inset Formula $b$ +\end_inset + + where +\begin_inset Formula $b-a_{1}\ge m$ +\end_inset + + and +\begin_inset Formula $b-a_{2}\ge m$ +\end_inset + +, and the parallel chunking proof in Section 2 holds. \end_layout \end_body diff --git a/xet_data/src/deduplication/parallel chunking.pdf b/xet_data/src/deduplication/parallel chunking.pdf index 607ec3b7..b6efd9ee 100644 Binary files a/xet_data/src/deduplication/parallel chunking.pdf and b/xet_data/src/deduplication/parallel chunking.pdf differ diff --git a/xet_data/tests/test_stable_chunk_boundary_detection.rs b/xet_data/tests/test_stable_chunk_boundary_detection.rs new file mode 100644 index 00000000..8198f3aa --- /dev/null +++ b/xet_data/tests/test_stable_chunk_boundary_detection.rs @@ -0,0 +1,312 @@ +use std::collections::HashSet; + +use rand::rngs::StdRng; +use rand::{RngExt, SeedableRng}; +use xet_data::deduplication::constants::TARGET_CHUNK_SIZE; +use xet_data::deduplication::{Chunk, Chunker, next_stable_chunk_boundary}; +use xet_runtime::test_set_constants; + +test_set_constants! { + TARGET_CHUNK_SIZE = 1024; +} + +fn make_random_data(seed: u64, len: usize) -> Vec { + let mut rng = StdRng::seed_from_u64(seed); + let mut data = vec![0u8; len]; + rng.fill(&mut data[..]); + data +} + +fn chunk_data(data: &[u8]) -> Vec { + let mut chunker = Chunker::default(); + chunker.next_block(data, true) +} + +fn get_chunk_boundaries(chunks: &[Chunk]) -> Vec { + chunks + .iter() + .scan(0usize, |pos, c| { + *pos += c.data.len(); + Some(*pos) + }) + .collect() +} + +fn verify_alignment( + original_boundaries: &[usize], + new_boundaries: &[usize], + stable: usize, + file_size: usize, + starting_position: usize, + mutation_seed: u64, +) { + let orig_set: HashSet = original_boundaries.iter().copied().collect(); + let new_set: HashSet = new_boundaries.iter().copied().collect(); + + for &oc in original_boundaries { + if oc >= stable && oc < file_size { + assert!( + new_set.contains(&oc), + "Original chunk boundary {oc} (>= stable {stable}) missing from new chunk boundaries. \ + starting_position={starting_position}, mutation_seed={mutation_seed}" + ); + } + } + + for &nc in new_boundaries { + if nc >= stable && nc < file_size { + assert!( + orig_set.contains(&nc), + "New chunk boundary {nc} (>= stable {stable}) not in original chunk boundaries. \ + starting_position={starting_position}, mutation_seed={mutation_seed}" + ); + } + } +} + +/// For a given data buffer, exercise `next_stable_chunk_boundary` at random +/// starting positions across the full data range with random mutations. +fn stress_test_stable_chunk_boundaries(data: &[u8], seed: u64, num_positions: usize, num_mutations: u64) { + let file_size = data.len(); + let chunks = chunk_data(data); + let chunk_boundaries = get_chunk_boundaries(&chunks); + + assert!( + chunk_boundaries.len() > 10, + "Need enough chunks for meaningful testing, got {}", + chunk_boundaries.len() + ); + + let mut rng = StdRng::seed_from_u64(seed); + let mut tested_stable = 0u64; + + for trial in 0..num_positions { + let starting_position = rng.random_range(1..file_size); + + let stable = match next_stable_chunk_boundary(starting_position, &chunk_boundaries) { + Some(s) => s, + None => continue, + }; + + assert!( + chunk_boundaries.contains(&stable), + "Stable chunk boundary {stable} is not a member of original chunk boundaries" + ); + assert!( + stable >= starting_position, + "Stable chunk boundary {stable} must be at or after starting_position {starting_position}" + ); + + tested_stable += 1; + + for mutation_seed in 0..num_mutations { + let combined_seed = (trial as u64) * 10000 + mutation_seed + 1; + let mut modified = data.to_vec(); + let mut mrng = StdRng::seed_from_u64(combined_seed); + mrng.fill(&mut modified[..starting_position]); + + let new_chunks = chunk_data(&modified); + let new_boundaries = get_chunk_boundaries(&new_chunks); + + verify_alignment(&chunk_boundaries, &new_boundaries, stable, file_size, starting_position, combined_seed); + } + } + + let min_expected = (num_positions / 4).max(1); + assert!( + tested_stable >= min_expected as u64, + "Too few starting positions had stable chunk boundaries: {tested_stable} (expected >= {min_expected})" + ); +} + +#[test] +fn test_stable_chunk_boundary_edge_cases() { + let data = make_random_data(42, 50_000); + let chunks = chunk_data(&data); + let chunk_boundaries = get_chunk_boundaries(&chunks); + + // starting_position at 0: should still return a valid point + let stable_0 = next_stable_chunk_boundary(0, &chunk_boundaries); + if let Some(s) = stable_0 { + assert!(chunk_boundaries.contains(&s)); + } + + // starting_position past all chunk boundaries: should return None + let past_end = *chunk_boundaries.last().unwrap() + 1; + assert!(next_stable_chunk_boundary(past_end, &chunk_boundaries).is_none()); + + // starting_position near end with too few remaining points + if chunk_boundaries.len() >= 2 { + let near_end = chunk_boundaries[chunk_boundaries.len() - 2]; + assert!(next_stable_chunk_boundary(near_end + 1, &chunk_boundaries).is_none()); + } + + // Degenerate inputs + assert!(next_stable_chunk_boundary(0, &[]).is_none()); + assert!(next_stable_chunk_boundary(0, &[100]).is_none()); + assert!(next_stable_chunk_boundary(0, &[100, 200]).is_none()); +} + +#[test] +fn test_stable_chunk_boundary_with_constant_data() { + // Constant data produces max-chunk-sized chunks (forced boundaries). + // With target=1024: max_chunk=2048, min_chunk=128, so max-min=1920. + // Forced boundaries at size 2048 fail the upper bound check of < 1920. + let data = vec![0u8; 50_000]; + let chunks = chunk_data(&data); + let chunk_boundaries = get_chunk_boundaries(&chunks); + + let stable = next_stable_chunk_boundary(0, &chunk_boundaries); + assert!(stable.is_none(), "Constant data should have no stable chunk boundary (all forced cuts)"); +} + +#[test] +fn test_stable_chunk_boundary_smoke_stress() { + let data = make_random_data(42, 50_000); + stress_test_stable_chunk_boundaries(&data, 42, 5, 5); +} + +#[test] +fn test_stable_chunk_boundary_smoke_varied_seeds() { + for seed in [1, 7, 255] { + let data = make_random_data(seed, 50_000); + stress_test_stable_chunk_boundaries(&data, seed + 77, 5, 5); + } +} + +#[cfg(not(feature = "smoke-test"))] +#[test] +fn test_stable_chunk_boundary_stress() { + let data = make_random_data(42, 256_000); + stress_test_stable_chunk_boundaries(&data, 42, 100, 20); +} + +#[cfg(not(feature = "smoke-test"))] +#[test] +fn test_stable_chunk_boundary_stress_varied_seeds() { + for seed in [1, 7, 13, 100, 255, 1024, 42424, 999999] { + let data = make_random_data(seed, 100_000); + stress_test_stable_chunk_boundaries(&data, seed + 77, 50, 20); + } +} + +#[cfg(not(feature = "smoke-test"))] +#[test] +fn test_stable_chunk_boundary_mutation_types() { + let data = make_random_data(42, 100_000); + let chunks = chunk_data(&data); + let chunk_boundaries = get_chunk_boundaries(&chunks); + let file_size = data.len(); + + let mid_idx = chunk_boundaries.len() / 2; + let starting_position = chunk_boundaries[mid_idx]; + + let stable = match next_stable_chunk_boundary(starting_position, &chunk_boundaries) { + Some(s) => s, + None => return, + }; + + // Zero-fill + { + let mut modified = data.to_vec(); + modified[..starting_position].fill(0); + let new_boundaries = get_chunk_boundaries(&chunk_data(&modified)); + verify_alignment(&chunk_boundaries, &new_boundaries, stable, file_size, starting_position, 0); + } + + // 0xFF-fill + { + let mut modified = data.to_vec(); + modified[..starting_position].fill(0xFF); + let new_boundaries = get_chunk_boundaries(&chunk_data(&modified)); + verify_alignment(&chunk_boundaries, &new_boundaries, stable, file_size, starting_position, 1); + } + + // Reverse the prefix + { + let mut modified = data.to_vec(); + modified[..starting_position].reverse(); + let new_boundaries = get_chunk_boundaries(&chunk_data(&modified)); + verify_alignment(&chunk_boundaries, &new_boundaries, stable, file_size, starting_position, 2); + } + + // XOR with a pattern + { + let mut modified = data.to_vec(); + for (i, byte) in modified[..starting_position].iter_mut().enumerate() { + *byte ^= (i & 0xFF) as u8; + } + let new_boundaries = get_chunk_boundaries(&chunk_data(&modified)); + verify_alignment(&chunk_boundaries, &new_boundaries, stable, file_size, starting_position, 3); + } + + // Many different random fills + for seed in 0..200 { + let mut modified = data.to_vec(); + let mut rng = StdRng::seed_from_u64(seed + 5000); + rng.fill(&mut modified[..starting_position]); + let new_boundaries = get_chunk_boundaries(&chunk_data(&modified)); + verify_alignment(&chunk_boundaries, &new_boundaries, stable, file_size, starting_position, seed + 5000); + } +} + +#[cfg(not(feature = "smoke-test"))] +#[test] +fn test_stable_chunk_boundary_is_tight() { + // Verify the stable chunk boundary is actually needed: the chunk boundary + // just before stable should NOT always be stable (there should exist some + // mutation that breaks it). + let data = make_random_data(42, 256_000); + let chunks = chunk_data(&data); + let chunk_boundaries = get_chunk_boundaries(&chunks); + + let mut found_non_stable_predecessor = false; + + for &starting_position in chunk_boundaries.iter().take(chunk_boundaries.len() / 2) { + if starting_position == 0 { + continue; + } + + let stable = match next_stable_chunk_boundary(starting_position, &chunk_boundaries) { + Some(s) => s, + None => continue, + }; + + let stable_idx = chunk_boundaries.iter().position(|&x| x == stable).unwrap(); + if stable_idx == 0 { + continue; + } + let predecessor = chunk_boundaries[stable_idx - 1]; + if predecessor <= starting_position { + continue; + } + + let orig_set: HashSet = chunk_boundaries.iter().copied().collect(); + for seed in 0..200 { + let mut modified = data.to_vec(); + let mut rng = StdRng::seed_from_u64(seed + 90000); + rng.fill(&mut modified[..starting_position]); + let new_boundaries = get_chunk_boundaries(&chunk_data(&modified)); + let new_set: HashSet = new_boundaries.iter().copied().collect(); + + if !new_set.contains(&predecessor) + || new_boundaries + .iter() + .any(|&nc| nc >= predecessor && nc < stable && !orig_set.contains(&nc)) + { + found_non_stable_predecessor = true; + break; + } + } + + if found_non_stable_predecessor { + break; + } + } + + assert!( + found_non_stable_predecessor, + "Could not find any case where the predecessor of a stable chunk boundary was actually unstable. \ + This suggests the stability condition may be too conservative." + ); +}