Skip to content

Commit 53c3ea4

Browse files
committed
OnPair: filter shares dict (TPC-H Q22 SF=10 fix) + token-aware predicates + memchr contains
Three connected changes that drop the SF=10 regression and accelerate predicate pushdown. OnPair::filter — share the dictionary (was the SF=10 cause) ----------------------------------------------------------- The previous implementation decoded the whole array, filtered the canonical bytes, and re-trained a brand-new OnPair dictionary on the surviving rows. TPC-H Q22 customer.c_phone goes through two consecutive filters (`SUBSTRING(c_phone,1,2) IN (...)` and `c_acctbal > avg`), each of which paid full `Column::compress` training overhead — a ~50–100 ms constant cost per call that vanishes below noise at SF=1 but dominates at SF=10. The rewrite is FSST-shape: keep `dict_bytes` + `dict_offsets` byte- identical to the input; rebuild only `codes`, `codes_offsets`, `uncompressed_lengths`, and validity by walking the mask. No decode, no retrain, no C++ on the read path. New unit test `test_onpair_filter_shares_dict` asserts the dict is byte-identical post-filter. Bench (UrlLog 1 M, --sample-count 30, release): filter_share_dict 4.8 ms median (vs. ~70 ms estimated for the old recompress path) Token-aware Eq pushdown (no row decode) --------------------------------------- New `lpm.rs` greedy longest-prefix-match tokeniser. OnPair's dictionary is sorted lexicographically, so a 257-entry first-byte index gives O(1) bucket lookup per byte; the inner loop scans the small bucket to pick the longest matching dict entry. Two byte strings have equal LPM token sequences iff they have equal bytes (LPM is deterministic under the same dict), so `compute/compare.rs::compare(Eq)` LPM-tokenises the needle once and then for each row compares `codes[lo..hi]` against the tokenised needle as `&[u16]` — direct slice eq, no decode at all. If the needle contains a byte that has no dict entry, no row can match (every row was compressed against the same dict) — we leave the bitmap zeroed and `NotEq` inverts. Bench (UrlLog 1 M): eq_constant 6.8 ms median (mostly OwnedDecodeInputs::collect; the actual token compare is sub-millisecond) LIKE pushdown ------------- * `'literal'` — same token-aware path as Eq. * `'prefix%'` — byte-streaming via `for_each_dict_slice`. The naive "tokenise the prefix and compare token prefix" trick is **wrong** for LIKE: the LPM of the row's leading bytes may merge tokens past the literal prefix's boundary. Streaming dict slices and comparing prefix-wise is the correct minimum-work option. * `'%substring%'` — `memchr::memmem::Finder` (SSE2/AVX2 on x86_64, NEON on aarch64, Two-Way underneath). Built once per kernel call, reused across every row. Everything else (escapes, `_`, mid-pattern wildcards, case-insensitive) returns `None` so the framework decompresses + runs the scalar `LIKE`. Bench (UrlLog 1 M): like_prefix 14.8 ms median like_contains 36.4 ms median Bench surface ------------- * New corpus shapes: `UrlLog`, `Short`, `Long`, `HighCard` × 2 row counts (100 K, 1 M). * New compute benches: `eq_constant`, `like_prefix`, `like_contains`, `filter_share_dict`. Verified * `cargo test -p vortex-onpair` 19 / 19 * `cargo test -p vortex-btrblocks` 35 / 35 * `cargo test -p vortex-file --features onpair,tokio --test test_onpair_string_roundtrip` — 5 / 5 * `cargo clippy -p vortex-onpair --all-targets` clean Signed-off-by: Claude <noreply@anthropic.com>
1 parent adeda19 commit 53c3ea4

9 files changed

Lines changed: 573 additions & 103 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

encodings/onpair/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ version = { workspace = true }
1717
workspace = true
1818

1919
[dependencies]
20+
memchr = { version = "2.8.0" }
2021
parking_lot = { workspace = true }
2122
prost = { workspace = true }
2223
vortex-array = { workspace = true }

encodings/onpair/benches/decode.rs

Lines changed: 88 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,31 @@
2020
clippy::panic,
2121
clippy::tests_outside_test_module,
2222
clippy::redundant_clone,
23-
clippy::missing_safety_doc
23+
clippy::missing_safety_doc,
24+
clippy::unwrap_used,
25+
clippy::expect_used
2426
)]
2527

2628
use std::sync::LazyLock;
2729

2830
use divan::Bencher;
2931
use vortex_array::IntoArray;
3032
use vortex_array::VortexSessionExecute;
33+
use vortex_array::arrays::ConstantArray;
3134
use vortex_array::arrays::VarBinArray;
3235
use vortex_array::arrays::VarBinViewArray;
36+
use vortex_array::arrays::filter::FilterKernel;
3337
use vortex_array::dtype::DType;
3438
use vortex_array::dtype::Nullability;
39+
use vortex_array::scalar_fn::fns::binary::CompareKernel;
40+
use vortex_array::scalar_fn::fns::like::LikeKernel;
41+
use vortex_array::scalar_fn::fns::like::LikeOptions;
42+
use vortex_array::scalar_fn::fns::operators::CompareOperator;
3543
use vortex_array::session::ArraySession;
44+
use vortex_mask::Mask;
3645
use vortex_onpair::DEFAULT_DICT12_CONFIG;
3746
use vortex_onpair::MAX_TOKEN_SIZE;
47+
use vortex_onpair::OnPair;
3848
use vortex_onpair::OnPairArray;
3949
use vortex_onpair::decode::OwnedDecodeInputs;
4050
use vortex_onpair::onpair_compress;
@@ -83,8 +93,7 @@ fn corpus(n: usize, shape: Shape) -> Vec<String> {
8393
}
8494
}
8595
Shape::Short => {
86-
let templates: &[&str] =
87-
&["alpha", "beta", "gamma", "delta", "eps", "zeta", "eta"];
96+
let templates: &[&str] = &["alpha", "beta", "gamma", "delta", "eps", "zeta", "eta"];
8897
for _ in 0..n {
8998
let s = next();
9099
out.push(templates[(s as usize) % templates.len()].to_string());
@@ -179,6 +188,82 @@ fn canonicalize_to_varbinview(bencher: Bencher, case: (Shape, usize)) {
179188
});
180189
}
181190

191+
// ─── Compute kernels ─────────────────────────────────────────────────────
192+
193+
const COMPUTE_CASES: &[(Shape, usize)] = &[(Shape::UrlLog, 100_000), (Shape::UrlLog, 1_000_000)];
194+
195+
/// `Eq` against a literal (token-aware fast path: no row decode, just
196+
/// `&[u16]` comparison).
197+
#[divan::bench(args = COMPUTE_CASES)]
198+
fn eq_constant(bencher: Bencher, case: (Shape, usize)) {
199+
let (shape, n) = case;
200+
let arr = compress(n, shape);
201+
let strings = corpus(n, shape);
202+
// Pick the very first row's value as the needle so we always hit at
203+
// least one match.
204+
let needle = strings[0].clone();
205+
bencher.bench_local(|| {
206+
let mut ctx = SESSION.create_execution_ctx();
207+
let result = <OnPair as CompareKernel>::compare(
208+
arr.as_view(),
209+
&ConstantArray::new(needle.as_str(), n).into_array(),
210+
CompareOperator::Eq,
211+
&mut ctx,
212+
)
213+
.unwrap()
214+
.unwrap();
215+
divan::black_box(result);
216+
});
217+
}
218+
219+
/// `LIKE 'prefix%'` — byte-streaming row prefix check.
220+
#[divan::bench(args = COMPUTE_CASES)]
221+
fn like_prefix(bencher: Bencher, case: (Shape, usize)) {
222+
let (shape, n) = case;
223+
let arr = compress(n, shape);
224+
bencher.bench_local(|| {
225+
let mut ctx = SESSION.create_execution_ctx();
226+
let pattern = ConstantArray::new("https://www.%", n).into_array();
227+
let result =
228+
<OnPair as LikeKernel>::like(arr.as_view(), &pattern, LikeOptions::default(), &mut ctx)
229+
.unwrap()
230+
.unwrap();
231+
divan::black_box(result);
232+
});
233+
}
234+
235+
/// `LIKE '%substring%'` — `memchr::memmem::Finder` over decoded rows.
236+
#[divan::bench(args = COMPUTE_CASES)]
237+
fn like_contains(bencher: Bencher, case: (Shape, usize)) {
238+
let (shape, n) = case;
239+
let arr = compress(n, shape);
240+
bencher.bench_local(|| {
241+
let mut ctx = SESSION.create_execution_ctx();
242+
let pattern = ConstantArray::new("%example.com%", n).into_array();
243+
let result =
244+
<OnPair as LikeKernel>::like(arr.as_view(), &pattern, LikeOptions::default(), &mut ctx)
245+
.unwrap()
246+
.unwrap();
247+
divan::black_box(result);
248+
});
249+
}
250+
251+
/// Filter — share-dict path. Builds a 1-in-7 mask so we keep ~14 % of
252+
/// rows; the cost is dominated by the `codes` segment copy + offsets.
253+
#[divan::bench(args = COMPUTE_CASES)]
254+
fn filter_share_dict(bencher: Bencher, case: (Shape, usize)) {
255+
let (shape, n) = case;
256+
let arr = compress(n, shape);
257+
let mask = Mask::from_iter((0..n).map(|i| i % 7 == 0));
258+
bencher.bench_local(|| {
259+
let mut ctx = SESSION.create_execution_ctx();
260+
let result = <OnPair as FilterKernel>::filter(arr.as_view(), &mask, &mut ctx)
261+
.unwrap()
262+
.unwrap();
263+
divan::black_box(result);
264+
});
265+
}
266+
182267
fn main() {
183268
divan::main();
184269
}
Lines changed: 33 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,19 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33
//
4-
//! `Eq` / `NotEq` against a constant. Each row's decoded bytes are streamed
5-
//! through `DecodeView::for_each_dict_slice`, comparing prefix-wise against
6-
//! the needle, so most non-matches short-circuit before any decode work.
4+
//! `Eq` / `NotEq` against a constant via **token-aware** comparison.
5+
//!
6+
//! OnPair's compressor encodes every byte string deterministically via
7+
//! greedy LPM against the same dictionary, so two byte strings are
8+
//! equal **iff** their LPM token sequences are equal. We tokenise the
9+
//! needle once and then compare the row's `codes[lo..hi]` slice
10+
//! directly against the tokenised needle as `&[u16]` — no row decode.
11+
//!
12+
//! Edge case: if the needle contains a byte that has no dict entry at
13+
//! all (degenerate dict; OnPair training normally guarantees every
14+
//! single-byte token), no row can possibly equal the needle, since
15+
//! every row was compressed against the same dict. We return an
16+
//! all-zeros bitmap (or all-ones for `NotEq`).
717
818
use vortex_array::ArrayRef;
919
use vortex_array::ArrayView;
@@ -19,8 +29,9 @@ use vortex_buffer::ByteBuffer;
1929
use vortex_error::VortexResult;
2030

2131
use crate::OnPair;
22-
use crate::decode::DecodeView;
2332
use crate::decode::OwnedDecodeInputs;
33+
use crate::lpm::DictIndex;
34+
use crate::lpm::tokenize_needle;
2435

2536
impl CompareKernel for OnPair {
2637
fn compare(
@@ -43,11 +54,26 @@ impl CompareKernel for OnPair {
4354
let dv = inputs.view();
4455
let n = lhs.array().len();
4556
let mut bytes = vec![0u8; n.div_ceil(8)];
46-
for row in 0..n {
47-
if row_equals_needle(&dv, row, &needle) {
48-
bytes[row / 8] |= 1u8 << (row % 8);
57+
58+
let index = DictIndex::build(&dv);
59+
if let Some(needle_toks) = tokenize_needle(&dv, &index, &needle) {
60+
let codes = dv.codes;
61+
let codes_offsets = dv.codes_offsets;
62+
for r in 0..n {
63+
let lo = codes_offsets[r] as usize;
64+
let hi = codes_offsets[r + 1] as usize;
65+
// SAFETY: codes_offsets validated at construction time.
66+
let row_toks = unsafe { codes.get_unchecked(lo..hi) };
67+
if row_toks == needle_toks.as_slice() {
68+
bytes[r / 8] |= 1u8 << (r % 8);
69+
}
4970
}
5071
}
72+
// If `tokenize_needle` returned None, no row can equal the
73+
// needle (every row was compressed against the same dict, so
74+
// any byte not in the dict can't appear in any row either).
75+
// Leave the bitmap zeroed.
76+
5177
let mut bool_buf = BitBuffer::new(ByteBuffer::from(bytes), n);
5278
if operator == CompareOperator::NotEq {
5379
bool_buf = !bool_buf;
@@ -67,31 +93,3 @@ fn needle_bytes(scalar: &Scalar) -> Option<Vec<u8>> {
6793
_ => None,
6894
}
6995
}
70-
71-
/// True iff row `r` decodes to exactly `needle`.
72-
fn row_equals_needle(dv: &DecodeView<'_>, r: usize, needle: &[u8]) -> bool {
73-
let mut pos = 0usize;
74-
let n = needle.len();
75-
let needle_ptr = needle.as_ptr();
76-
let ok = dv.for_each_dict_slice(r, |slice| {
77-
let take = slice.len();
78-
// Fast-path: bail on length overflow first so we never compare a
79-
// partial slice that would walk past `needle`.
80-
if pos + take > n {
81-
return false;
82-
}
83-
// SAFETY: `pos + take <= n`, `take == slice.len()`. Compares
84-
// `needle[pos..pos+take]` with `slice` via raw `memcmp`-style
85-
// pointer math. The branch on length above is the only check.
86-
let eq = unsafe {
87-
let lhs = needle_ptr.add(pos);
88-
std::slice::from_raw_parts(lhs, take) == slice
89-
};
90-
if !eq {
91-
return false;
92-
}
93-
pos += take;
94-
true
95-
});
96-
ok && pos == n
97-
}
Lines changed: 83 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,108 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33
//
4-
//! Filter is implemented as a re-compress through canonical because OnPair's
5-
//! `codes` for surviving rows would also need to be re-laid out (the codes
6-
//! belong to whole rows, not single elements), and re-training keeps the
7-
//! resulting dictionary tight to the surviving data. Slice is cheaper — see
8-
//! `slice.rs` — because we can just sub-slice `codes_offsets` /
9-
//! `uncompressed_lengths`.
4+
//! Filter that **shares the dictionary**. The previous implementation
5+
//! decoded the whole array, filtered the canonical bytes, and re-trained
6+
//! a brand-new OnPair dictionary on the surviving rows — order-of-
7+
//! magnitude regressions on TPC-H Q22 at SF=10 traced back to that cost
8+
//! (the customer table's `c_phone` column gets two consecutive filters,
9+
//! each of which was paying full `Column::compress` training overhead).
10+
//!
11+
//! FSST-shape filter: keep `dict_bytes` + `dict_offsets` **identical**
12+
//! to the input; rebuild only `codes`, `codes_offsets`,
13+
//! `uncompressed_lengths`, and validity by walking the mask. No decode,
14+
//! no retrain, no C++ call on the read path.
1015
1116
use vortex_array::ArrayRef;
1217
use vortex_array::ArrayView;
13-
use vortex_array::Canonical;
1418
use vortex_array::ExecutionCtx;
1519
use vortex_array::IntoArray;
20+
use vortex_array::arrays::PrimitiveArray;
1621
use vortex_array::arrays::filter::FilterKernel;
22+
use vortex_array::match_each_integer_ptype;
23+
use vortex_buffer::BufferMut;
1724
use vortex_error::VortexResult;
25+
use vortex_error::vortex_err;
1826
use vortex_mask::Mask;
1927

2028
use crate::OnPair;
21-
use crate::compress::DEFAULT_DICT12_CONFIG;
22-
use crate::compress::onpair_compress_array;
29+
use crate::OnPairArrayExt;
2330

2431
impl FilterKernel for OnPair {
2532
fn filter(
2633
array: ArrayView<'_, Self>,
2734
mask: &Mask,
2835
ctx: &mut ExecutionCtx,
2936
) -> VortexResult<Option<ArrayRef>> {
30-
let canonical = array
31-
.array()
37+
let n_in = array.array().len();
38+
let n_out = mask.true_count();
39+
40+
// Materialise the per-row offset arrays we walk during filtering.
41+
// The codes themselves we read through whatever ptype the
42+
// cascading compressor narrowed to — match_each_integer_ptype
43+
// dispatches on it below.
44+
let codes_offsets_arr = array
45+
.codes_offsets()
3246
.clone()
33-
.execute::<Canonical>(ctx)?
34-
.into_array();
35-
let filtered = canonical.filter(mask.clone())?;
47+
.execute::<PrimitiveArray>(ctx)?;
48+
let codes_arr = array.codes().clone().execute::<PrimitiveArray>(ctx)?;
49+
let codes_offsets = codes_offsets_arr.as_slice::<u32>();
50+
51+
// First pass: sum the surviving token count so we reserve once.
52+
let mut new_codes_len: usize = 0;
53+
for r in 0..n_in {
54+
if mask.value(r) {
55+
new_codes_len += (codes_offsets[r + 1] - codes_offsets[r]) as usize;
56+
}
57+
}
58+
59+
let mut new_codes_offsets = BufferMut::<u32>::with_capacity(n_out + 1);
60+
// SAFETY: capacity reserved.
61+
unsafe { new_codes_offsets.push_unchecked(0u32) };
62+
63+
let new_codes: ArrayRef = match_each_integer_ptype!(codes_arr.ptype(), |P| {
64+
let codes = codes_arr.as_slice::<P>();
65+
let mut out = BufferMut::<P>::with_capacity(new_codes_len);
66+
let mut cursor: u32 = 0;
67+
for r in 0..n_in {
68+
if mask.value(r) {
69+
let lo = codes_offsets[r] as usize;
70+
let hi = codes_offsets[r + 1] as usize;
71+
// SAFETY: codes_offsets validated at construction.
72+
let segment = unsafe { codes.get_unchecked(lo..hi) };
73+
out.extend_from_slice(segment);
74+
let segment_len = u32::try_from(hi - lo)
75+
.map_err(|_| vortex_err!("token segment overflows u32"))?;
76+
cursor = cursor
77+
.checked_add(segment_len)
78+
.ok_or_else(|| vortex_err!("codes_offsets overflow u32"))?;
79+
// SAFETY: capacity reserved (n_out + 1 entries).
80+
unsafe { new_codes_offsets.push_unchecked(cursor) };
81+
}
82+
}
83+
out.freeze().into_array()
84+
});
85+
86+
// uncompressed_lengths + validity flow through the standard
87+
// primitive filter — these are short integer arrays so the cost
88+
// is negligible compared to the (avoided) recompress.
89+
let uncompressed_lengths = array.uncompressed_lengths().clone().filter(mask.clone())?;
90+
let validity = array.array_validity().filter(mask)?;
91+
3692
Ok(Some(
37-
onpair_compress_array(&filtered, DEFAULT_DICT12_CONFIG, ctx)?.into_array(),
93+
unsafe {
94+
OnPair::new_unchecked(
95+
array.dtype().clone(),
96+
array.dict_bytes_handle().clone(),
97+
array.dict_offsets().clone(),
98+
new_codes,
99+
new_codes_offsets.freeze().into_array(),
100+
uncompressed_lengths,
101+
validity,
102+
array.bits(),
103+
)
104+
}
105+
.into_array(),
38106
))
39107
}
40108
}

0 commit comments

Comments
 (0)