|
| 1 | +// SPDX-License-Identifier: Apache-2.0 |
| 2 | +// SPDX-FileCopyrightText: Copyright the Vortex contributors |
| 3 | +// |
| 4 | +//! End-to-end smoke test on a realistically-sized input. Not part of the unit |
| 5 | +//! suite; run with `cargo test -p vortex-onpair --test big_data -- --nocapture`. |
| 6 | +
|
| 7 | +use std::sync::LazyLock; |
| 8 | +use std::time::Instant; |
| 9 | + |
| 10 | +use vortex_array::IntoArray; |
| 11 | +use vortex_array::VortexSessionExecute; |
| 12 | +use vortex_array::accessor::ArrayAccessor; |
| 13 | +use vortex_array::arrays::VarBinArray; |
| 14 | +use vortex_array::arrays::VarBinViewArray; |
| 15 | +use vortex_array::dtype::DType; |
| 16 | +use vortex_array::dtype::Nullability; |
| 17 | +use vortex_array::session::ArraySession; |
| 18 | +use vortex_onpair::DEFAULT_DICT12_CONFIG; |
| 19 | +use vortex_onpair::onpair_compress; |
| 20 | +use vortex_session::VortexSession; |
| 21 | + |
| 22 | +static SESSION: LazyLock<VortexSession> = |
| 23 | + LazyLock::new(|| VortexSession::empty().with::<ArraySession>()); |
| 24 | + |
| 25 | +/// Fake-but-realistic corpus: 100k log/URL-like rows drawn from a handful of |
| 26 | +/// templates with varying tail content. Models the kind of column OnPair |
| 27 | +/// actually targets (high lexical repetition, short-to-medium strings). |
| 28 | +fn corpus(n: usize) -> Vec<String> { |
| 29 | + let templates: &[&str] = &[ |
| 30 | + "GET /api/v1/users/{id}/profile HTTP/1.1", |
| 31 | + "POST /api/v1/users/{id}/sessions HTTP/1.1", |
| 32 | + "GET /static/js/app.{id}.js HTTP/1.1", |
| 33 | + "GET /static/css/app.{id}.css HTTP/1.1", |
| 34 | + "https://www.example.com/products/{id}", |
| 35 | + "https://cdn.example.com/img/{id}.webp", |
| 36 | + "https://api.example.com/v2/orders/{id}", |
| 37 | + "ftp://files.example.com/dump/{id}.tar.gz", |
| 38 | + "ssh://deploy@build-{id}.internal:22", |
| 39 | + "redis://cache-{id}.svc.cluster.local:6379", |
| 40 | + "INFO request_id={id} method=GET status=200", |
| 41 | + "WARN request_id={id} method=POST status=429", |
| 42 | + "ERROR request_id={id} method=PUT status=500", |
| 43 | + ]; |
| 44 | + let mut out = Vec::with_capacity(n); |
| 45 | + let mut state = 0x9e37_79b9_7f4a_7c15_u64; |
| 46 | + for _ in 0..n { |
| 47 | + state = state |
| 48 | + .wrapping_mul(6364136223846793005) |
| 49 | + .wrapping_add(1442695040888963407); |
| 50 | + let pick = (state as usize) % templates.len(); |
| 51 | + let id = state as u32; |
| 52 | + out.push(templates[pick].replace("{id}", &format!("{:08x}", id))); |
| 53 | + } |
| 54 | + out |
| 55 | +} |
| 56 | + |
| 57 | +#[test] |
| 58 | +#[cfg_attr(miri, ignore)] |
| 59 | +fn smoke_100k_rows() { |
| 60 | + let n = 100_000; |
| 61 | + let strings = corpus(n); |
| 62 | + let raw_bytes: usize = strings.iter().map(|s| s.len()).sum(); |
| 63 | + |
| 64 | + let varbin = VarBinArray::from_iter( |
| 65 | + strings.iter().map(|s| Some(s.as_bytes())), |
| 66 | + DType::Utf8(Nullability::NonNullable), |
| 67 | + ); |
| 68 | + |
| 69 | + let t0 = Instant::now(); |
| 70 | + let arr = onpair_compress(&varbin, varbin.len(), varbin.dtype(), DEFAULT_DICT12_CONFIG) |
| 71 | + .expect("compress"); |
| 72 | + let compress_elapsed = t0.elapsed(); |
| 73 | + |
| 74 | + let column_bytes = arr.column_bytes().len(); |
| 75 | + let ratio = raw_bytes as f64 / column_bytes as f64; |
| 76 | + eprintln!( |
| 77 | + "compressed {} rows ({} bytes) -> {} bytes (ratio {:.2}x) in {:?}", |
| 78 | + n, raw_bytes, column_bytes, ratio, compress_elapsed |
| 79 | + ); |
| 80 | + eprintln!("dict_size={} bits={}", arr.dict_size(), arr.bits()); |
| 81 | + |
| 82 | + let mut ctx = SESSION.create_execution_ctx(); |
| 83 | + |
| 84 | + // Full canonicalisation round-trip. |
| 85 | + let t0 = Instant::now(); |
| 86 | + let decoded = arr |
| 87 | + .clone() |
| 88 | + .into_array() |
| 89 | + .execute::<VarBinViewArray>(&mut ctx) |
| 90 | + .expect("canonicalize"); |
| 91 | + let decompress_elapsed = t0.elapsed(); |
| 92 | + eprintln!("canonicalized in {:?}", decompress_elapsed); |
| 93 | + |
| 94 | + assert_eq!(decoded.len(), n); |
| 95 | + decoded |
| 96 | + .with_iterator(|iter| { |
| 97 | + for (i, got) in iter.enumerate() { |
| 98 | + let want = strings[i].as_bytes(); |
| 99 | + assert_eq!(got, Some(want), "row {} mismatch", i); |
| 100 | + } |
| 101 | + Ok::<_, vortex_error::VortexError>(()) |
| 102 | + }) |
| 103 | + .unwrap(); |
| 104 | + eprintln!("roundtrip OK on all {} rows", n); |
| 105 | + |
| 106 | + // Predicate spot-checks: numbers must match a brute-force scan. |
| 107 | + let column = arr.column().expect("materialize column"); |
| 108 | + |
| 109 | + let needle_eq = strings[42].as_bytes(); |
| 110 | + let want_eq = strings.iter().filter(|s| s.as_bytes() == needle_eq).count(); |
| 111 | + let bits = column.equals_bitmap(needle_eq).unwrap(); |
| 112 | + let got_eq = popcount(&bits, n); |
| 113 | + eprintln!( |
| 114 | + "equals('row 42 payload') expected={} got={}", |
| 115 | + want_eq, got_eq |
| 116 | + ); |
| 117 | + assert_eq!(got_eq, want_eq); |
| 118 | + |
| 119 | + let prefix = b"https://www."; |
| 120 | + let want_prefix = strings |
| 121 | + .iter() |
| 122 | + .filter(|s| s.as_bytes().starts_with(prefix)) |
| 123 | + .count(); |
| 124 | + let bits = column.starts_with_bitmap(prefix).unwrap(); |
| 125 | + let got_prefix = popcount(&bits, n); |
| 126 | + eprintln!( |
| 127 | + "starts_with('https://www.') expected={} got={}", |
| 128 | + want_prefix, got_prefix |
| 129 | + ); |
| 130 | + assert_eq!(got_prefix, want_prefix); |
| 131 | + |
| 132 | + let needle_sub = b"status=500"; |
| 133 | + let want_sub = strings |
| 134 | + .iter() |
| 135 | + .filter(|s| { |
| 136 | + s.as_bytes() |
| 137 | + .windows(needle_sub.len()) |
| 138 | + .any(|w| w == needle_sub) |
| 139 | + }) |
| 140 | + .count(); |
| 141 | + let bits = column.contains_bitmap(needle_sub).unwrap(); |
| 142 | + let got_sub = popcount(&bits, n); |
| 143 | + eprintln!( |
| 144 | + "contains('status=500') expected={} got={}", |
| 145 | + want_sub, got_sub |
| 146 | + ); |
| 147 | + assert_eq!(got_sub, want_sub); |
| 148 | +} |
| 149 | + |
| 150 | +fn popcount(bits: &[u8], n: usize) -> usize { |
| 151 | + let mut c = 0; |
| 152 | + for i in 0..n { |
| 153 | + if (bits[i / 8] >> (i % 8)) & 1 == 1 { |
| 154 | + c += 1; |
| 155 | + } |
| 156 | + } |
| 157 | + c |
| 158 | +} |
0 commit comments