|
4 | 4 | //! QualiaColumn SoA layout introduced by D-CSV-5b. |
5 | 5 | //! |
6 | 6 | //! Yields `(row_index, &QualiaI4Row)` tuples. Pure iterator scaffold; the |
7 | | -//! `par_qualia_stream` rayon-parallel variant is sprint-13+ once rayon is |
8 | | -//! wired into the ndarray feature gate. |
| 7 | +//! `par_qualia_stream` rayon-parallel variant is wired in sprint-13 (D-CSV-17) |
| 8 | +//! behind `#[cfg(feature = "rayon")]`. |
9 | 9 |
|
10 | 10 | // NOTE: do NOT import lance-graph-contract here (would create circular dep |
11 | 11 | // since contract is *consumer* of ndarray). Define a minimal local mirror |
@@ -108,6 +108,50 @@ impl<'a> ExactSizeIterator for QualiaStream<'a> { |
108 | 108 | } |
109 | 109 | } |
110 | 110 |
|
| 111 | +// ─── Rayon-parallel variant (D-CSV-17, sprint-13) ───────────────────────────── |
| 112 | + |
| 113 | +#[cfg(feature = "rayon")] |
| 114 | +use rayon::prelude::*; |
| 115 | + |
| 116 | +/// Rayon-parallel forward-iterator over a borrowed `&[QualiaI4Row]` slice. |
| 117 | +/// |
| 118 | +/// Yields `(row_index, &QualiaI4Row)` tuples. Unlike the scalar |
| 119 | +/// `QualiaStream`, iteration order is **not** guaranteed to be ascending |
| 120 | +/// by index; rayon's work-stealing scheduler may process chunks |
| 121 | +/// out-of-order. See §6 of pr-sprint-13-rayon-streams.md for the |
| 122 | +/// determinism contract callers must respect. |
| 123 | +/// |
| 124 | +/// Returns `impl IndexedParallelIterator` (not bare `ParallelIterator`) so |
| 125 | +/// callers can use `enumerate()`, `zip()`, and `collect()` with guaranteed |
| 126 | +/// index-order preservation (rayon's `IndexedParallelIterator::collect` is |
| 127 | +/// contract-guaranteed to preserve original order). |
| 128 | +/// |
| 129 | +/// # Chunk-size note |
| 130 | +/// |
| 131 | +/// `QualiaI4Row` is 8 bytes (`repr(C, align(8))`), so 8 rows fit one 64-byte |
| 132 | +/// cache line. For folds into ordered structures, call `.with_min_len(8)` to |
| 133 | +/// align chunks to cache-line boundaries (see OQ-CSV-8). |
| 134 | +/// |
| 135 | +/// # Example |
| 136 | +/// |
| 137 | +/// ``` |
| 138 | +/// # #[cfg(feature = "rayon")] { |
| 139 | +/// use ndarray::hpc::stream::qualia::{QualiaI4Row, par_qualia_stream}; |
| 140 | +/// use rayon::prelude::*; |
| 141 | +/// |
| 142 | +/// let rows: Vec<QualiaI4Row> = (0u64..1024).map(QualiaI4Row).collect(); |
| 143 | +/// let total_nonzero: usize = par_qualia_stream(&rows) |
| 144 | +/// .filter(|(_, r)| r.0 != 0) |
| 145 | +/// .count(); |
| 146 | +/// assert_eq!(total_nonzero, 1023); // QualiaI4Row(0) is the lone zero |
| 147 | +/// # } |
| 148 | +/// ``` |
| 149 | +#[cfg(feature = "rayon")] |
| 150 | +#[inline] |
| 151 | +pub fn par_qualia_stream(rows: &[QualiaI4Row]) -> impl IndexedParallelIterator<Item = (usize, &QualiaI4Row)> { |
| 152 | + rows.par_iter().enumerate() |
| 153 | +} |
| 154 | + |
111 | 155 | // ─── Tests ──────────────────────────────────────────────────────────────────── |
112 | 156 |
|
113 | 157 | #[cfg(test)] |
@@ -199,3 +243,79 @@ mod tests { |
199 | 243 | assert_eq!(ExactSizeIterator::len(&stream), 0); |
200 | 244 | } |
201 | 245 | } |
| 246 | + |
| 247 | +// ─── Rayon par_* tests (D-CSV-17) ───────────────────────────────────────────── |
| 248 | + |
| 249 | +#[cfg(all(test, feature = "rayon"))] |
| 250 | +mod par_tests { |
| 251 | + use super::{par_qualia_stream, QualiaI4Row, QualiaStream}; |
| 252 | + use rayon::prelude::*; |
| 253 | + use std::sync::atomic::{AtomicUsize, Ordering}; |
| 254 | + |
| 255 | + /// T-P-Q-1: par_qualia_stream yields all N items. |
| 256 | + #[test] |
| 257 | + fn test_par_qualia_yields_all() { |
| 258 | + let rows: Vec<QualiaI4Row> = (0u64..1024).map(QualiaI4Row).collect(); |
| 259 | + let count = par_qualia_stream(&rows).count(); |
| 260 | + assert_eq!(count, 1024); |
| 261 | + } |
| 262 | + |
| 263 | + /// T-P-Q-2: par_qualia_stream on empty slice yields zero items. |
| 264 | + #[test] |
| 265 | + fn test_par_qualia_empty() { |
| 266 | + let rows: Vec<QualiaI4Row> = vec![]; |
| 267 | + let count = par_qualia_stream(&rows).count(); |
| 268 | + assert_eq!(count, 0); |
| 269 | + } |
| 270 | + |
| 271 | + /// T-P-Q-3: par_iter result equals serial iter result (as sorted sets). |
| 272 | + /// Both iterators produce the same (index XOR value) pairs; sorting |
| 273 | + /// makes the comparison order-independent. |
| 274 | + #[test] |
| 275 | + fn test_par_qualia_matches_serial() { |
| 276 | + let rows: Vec<QualiaI4Row> = (0u64..256).map(QualiaI4Row).collect(); |
| 277 | + let mut par: Vec<u64> = par_qualia_stream(&rows) |
| 278 | + .map(|(i, r)| (i as u64) ^ r.0) |
| 279 | + .collect(); |
| 280 | + let mut ser: Vec<u64> = QualiaStream::new(&rows) |
| 281 | + .map(|(i, r)| (i as u64) ^ r.0) |
| 282 | + .collect(); |
| 283 | + par.sort_unstable(); |
| 284 | + ser.sort_unstable(); |
| 285 | + assert_eq!(par, ser); |
| 286 | + } |
| 287 | + |
| 288 | + /// T-P-Q-4: par_iter with filter is correct (count of even-valued rows). |
| 289 | + #[test] |
| 290 | + fn test_par_qualia_with_filter() { |
| 291 | + let rows: Vec<QualiaI4Row> = (0u64..512).map(QualiaI4Row).collect(); |
| 292 | + let count_even = par_qualia_stream(&rows) |
| 293 | + .filter(|(_, r)| r.0 % 2 == 0) |
| 294 | + .count(); |
| 295 | + // Values 0, 2, 4, ..., 510 → 256 even values. |
| 296 | + assert_eq!(count_even, 256); |
| 297 | + } |
| 298 | + |
| 299 | + /// T-P-Q-5: with_min_len(8) knob compiles and yields all items. |
| 300 | + /// 8 rows × 8 B = 64 B = one cache line (OQ-CSV-8 fixed chunk for QualiaI4). |
| 301 | + #[test] |
| 302 | + fn test_par_qualia_min_len() { |
| 303 | + let rows: Vec<QualiaI4Row> = (0u64..1024).map(QualiaI4Row).collect(); |
| 304 | + let count = par_qualia_stream(&rows).with_min_len(8).count(); |
| 305 | + assert_eq!(count, 1024); |
| 306 | + } |
| 307 | + |
| 308 | + /// T-P-Q-6: thread-safety — QualiaI4Row is Send + Sync; verified by |
| 309 | + /// mutating an AtomicUsize from the parallel for_each closure. |
| 310 | + #[test] |
| 311 | + fn test_par_qualia_send_sync() { |
| 312 | + fn assert_send_sync<T: Send + Sync>() {} |
| 313 | + assert_send_sync::<QualiaI4Row>(); |
| 314 | + let rows: Vec<QualiaI4Row> = (0u64..1024).map(QualiaI4Row).collect(); |
| 315 | + let counter = AtomicUsize::new(0); |
| 316 | + par_qualia_stream(&rows).for_each(|_| { |
| 317 | + counter.fetch_add(1, Ordering::Relaxed); |
| 318 | + }); |
| 319 | + assert_eq!(counter.load(Ordering::Relaxed), 1024); |
| 320 | + } |
| 321 | +} |
0 commit comments